Sources - PostgreSQL - Self-hosted PostgreSQL

Enable Logical Replication

Set wal_level to logical by running

ALTER SYSTEM SET wal_level TO 'logical';

Restart the DB instance.

Create role for sync

Create a sync user for Integrate.io by executing

CREATE ROLE flydata WITH PASSWORD '<your_password>' LOGIN;

Assign the replication role to the user

ALTER ROLE flydata WITH REPLICATION;

Grant necessary privileges

Grant the privileges for the database and schema by running the following queries
GRANT CREATE ON SCHEMA <enter schema> TO flydata;
GRANT USAGE ON SCHEMA <enter schema> TO flydata;
GRANT CREATE ON DATABASE <enter database> TO flydata;
GRANT SELECT ON ALL TABLES IN SCHEMA <enter schema> TO flydata;
GRANT REFERENCES ON ALL TABLES IN SCHEMA <enter schema> TO flydata;

Assign table ownership

Create a replication group role which will allow for shared ownership of the tables by the original owner as well as the flydata user.

CREATE ROLE replication_group;

Add flydata user to replication_group.

GRANT replication_group to flydata;

Then, create the following function which adds the existing table owners in a given schema to the replication_group role. This lets the original users retain ownership of the tables.

CREATE OR REPLACE FUNCTION public.add_existing_owners_to_replication_group(
    newowner text,
    pschem text)
    RETURNS TABLE
            (
                added_users text
            )
AS
$BODY$
DECLARE
    tblnames CURSOR FOR
        SELECT DISTINCT tableowner
        FROM pg_tables
        WHERE schemaname = pschem
          AND tableowner <> newowner;
    is_already_member boolean;
    added_users       text[] := array []::text[];
BEGIN
    FOR stmt IN tblnames
        LOOP
            EXECUTE $$SELECT * from pg_has_role('$$ || stmt.tableowner || $$', '$$ || newowner ||
                    $$', 'member');$$ INTO is_already_member;
            IF NOT is_already_member THEN
                EXECUTE 'GRANT ' || newowner || ' TO ' || stmt.tableowner || ';';
                EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
                        ' GRANT SELECT ON TABLES TO ' || newowner || ';';
                EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
                        ' GRANT REFERENCES ON TABLES TO ' || newowner || ';';
                added_users = array_append(added_users, stmt.tableowner::text);
            END IF;
        END LOOP;

    RETURN QUERY SELECT unnest(added_users::text[]);
END
$BODY$
    LANGUAGE plpgsql VOLATILE
                     COST 100;

Execute the function

SELECT public.add_existing_owners_to_replication_group('replication_group', '<schema_name>');

Now create the function which will change the owners of all tables in a given schema to replication_group

CREATE OR REPLACE FUNCTION public.change_schema_tables_owner_to_replication_group(
    newowner text,
    pschem text, numtables int default 999999)
    RETURNS TABLE
            (
                changed_tables text
            )
AS
$BODY$
DECLARE
    changed_tables text[] := array []::text[];
    tblnames CURSOR FOR
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = pschem
          and tableowner <> newowner
        limit numtables;
BEGIN
    FOR stmt IN tblnames
        LOOP
            EXECUTE 'alter table ' || pschem || '.' || stmt.tablename || ' owner to ' || newowner || ';';
            changed_tables = array_append(changed_tables, stmt.tablename::text);
        END LOOP;
    RETURN QUERY SELECT unnest(changed_tables::text[]);
END
$BODY$
    LANGUAGE plpgsql VOLATILE
                     COST 100;

The function accepts an optional third parameter which specifies the number of tables to change ownership of. You can use this parameter to change ownership in batches if the number of tables is very high.

Execute the function

SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>');

Or batch it (10 tables in the example below)

SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>', 10);