Sources - PostgreSQL - Aurora PosgtreSQL

Requirement

  • Aurora cluster connecting to WRITER endpoint.

Enable Logical Replication

  1. Go to AWS RDS dashboard
  2. Create and configure parameter group - select your Aurora PostgreSQL version (e.g. shown below is postgres12). Choose aurora-postgres10 or above.
  3. Chose type as DB Cluster Parameter Group.

    NOTE: You can skip this step if you have already created a parameter group.

undefined

Next, go to the parameter group page, search for rds.logical_replication. Select this row and click on the Edit parameters button. Set this value to 1

undefined

Apply the parameter group to the database

undefined

Create role for sync

Create a sync user for Integrate.io by executing,
CREATE ROLE flydata WITH PASSWORD '<your_password>' LOGIN;

Assign the replication role to the user,

GRANT rds_replication TO flydata

Grant necessary privileges

Grant the privileges for the database and schema by running the following queries,

GRANT CREATE ON SCHEMA <enter schema> TO flydata;
GRANT USAGE ON SCHEMA <enter schema> TO flydata;
GRANT CREATE ON DATABASE <enter database> TO flydata;
GRANT SELECT ON ALL TABLES IN SCHEMA <enter schema> TO flydata;
GRANT REFERENCES ON ALL TABLES IN SCHEMA <enter schema> TO flydata;

Assign table ownership

Create a replication group role which will allow for shared ownership of the tables by the original owner as well as the flydata user.

CREATE ROLE replication_group;

Add user to replication_group.

GRANT replication_group to flydata;

Then, create the following function which adds the existing table owners in a given schema to the replication_group role. This lets the original users retain ownership of the tables.

CREATE OR REPLACE FUNCTION public.add_existing_owners_to_replication_group(
    newowner text,
    pschem text)
    RETURNS TABLE
            (
                added_users text
            )
AS
$BODY$
DECLARE
    tblnames CURSOR FOR
        SELECT DISTINCT tableowner
        FROM pg_tables
        WHERE schemaname = pschem
          AND tableowner <> newowner;
    is_already_member boolean;
    added_users       text[] := array []::text[];
BEGIN
    FOR stmt IN tblnames
        LOOP
            EXECUTE $$SELECT * from pg_has_role('$$ || stmt.tableowner || $$', '$$ || newowner ||
                    $$', 'member');$$ INTO is_already_member;
            IF NOT is_already_member THEN
                EXECUTE 'GRANT ' || newowner || ' TO ' || stmt.tableowner || ';';
                EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
                        ' GRANT SELECT ON TABLES TO ' || newowner || ';';
                EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
                        ' GRANT REFERENCES ON TABLES TO ' || newowner || ';';
                added_users = array_append(added_users, stmt.tableowner::text);
            END IF;
        END LOOP;

    RETURN QUERY SELECT unnest(added_users::text[]);
END
$BODY$
    LANGUAGE plpgsql VOLATILE
                     COST 100;

Execute the function,

SELECT public.add_existing_owners_to_replication_group('replication_group', '<schema_name>');

Now create the function which will change the owners of all tables in a given schema to replication_group,

CREATE OR REPLACE FUNCTION public.change_schema_tables_owner_to_replication_group(
    newowner text,
    pschem text, numtables int default 999999)
    RETURNS TABLE
            (
                changed_tables text
            )
AS
$BODY$
DECLARE
    changed_tables text[] := array []::text[];
    tblnames CURSOR FOR
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = pschem
          and tableowner <> newowner
        limit numtables;
BEGIN
    FOR stmt IN tblnames
        LOOP
            EXECUTE 'alter table ' || pschem || '.' || stmt.tablename || ' owner to ' || newowner || ';';
            changed_tables = array_append(changed_tables, stmt.tablename::text);
        END LOOP;
    RETURN QUERY SELECT unnest(changed_tables::text[]);
END
$BODY$
    LANGUAGE plpgsql VOLATILE
                     COST 100;

The function accepts an optional third parameter which specifies the number of tables to change ownership of. You can use this parameter to change ownership in batches if the number of tables is very high.

Execute the function,

SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>');

Or batch it (10 tables in the example below),

SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>', 10);