Enable Logical Replication
- Go to AWS RDS dashboard
- Create and configure parameter group - select your PostgreSQL version (e.g. shown below is postgres12)
NOTE: You can skip this step if you have already created a parameter group.
Next, go to the parameter group page, search for rds.logical_replication
. Select this row and click on the Edit parameters
button. Set this value to 1
Apply the parameter group to the database
Modify the DB instance and wait until status changes to pending-reboot
. Reboot the database to apply the new parameter group.
Create role for sync
Create a sync user for Integrate.io by executing,
CREATE ROLE flydata WITH PASSWORD '<password_here>' LOGIN;
Assign the replication role to the user,
GRANT rds_replication TO flydata
GRANT rds_replication TO flydata
Grant necessary privileges
GRANT CREATE ON SCHEMA <enter schema> TO flydata;
GRANT USAGE ON SCHEMA <enter schema> TO flydata;
GRANT CREATE ON DATABASE <enter database> TO flydata;
GRANT SELECT ON ALL TABLES IN SCHEMA <enter schema> TO flydata;
GRANT REFERENCES ON ALL TABLES IN SCHEMA <enter schema> TO flydata;
Assign table ownership
Create a replication group role which will allow for shared ownership of the tables by the original owner as well as the flydata
user.
CREATE ROLE replication_group;
Add flydata
user to replication_group
.
GRANT replication_group to flydata;
Then, create the following function which adds the existing table owners in a given schema to the replication_group
role. This lets the original users retain ownership of the tables.
CREATE OR REPLACE FUNCTION public.add_existing_owners_to_replication_group(
newowner text,
pschem text)
RETURNS TABLE
(
added_users text
)
AS
$BODY$
DECLARE
tblnames CURSOR FOR
SELECT DISTINCT tableowner
FROM pg_tables
WHERE schemaname = pschem
AND tableowner <> newowner;
is_already_member boolean;
added_users text[] := array []::text[];
BEGIN
FOR stmt IN tblnames
LOOP
EXECUTE $$SELECT * from pg_has_role('$$ || stmt.tableowner || $$', '$$ || newowner ||
$$', 'member');$$ INTO is_already_member;
IF NOT is_already_member THEN
EXECUTE 'GRANT ' || newowner || ' TO ' || stmt.tableowner || ';';
EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
' GRANT SELECT ON TABLES TO ' || newowner || ';';
EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
' GRANT REFERENCES ON TABLES TO ' || newowner || ';';
added_users = array_append(added_users, stmt.tableowner::text);
END IF;
END LOOP;
RETURN QUERY SELECT unnest(added_users::text[]);
END
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
Execute the function,
SELECT public.add_existing_owners_to_replication_group('replication_group', '<schema_name>');
Now create the function which will change the owners of all tables in a given schema to replication_group
,
CREATE OR REPLACE FUNCTION public.change_schema_tables_owner_to_replication_group(
newowner text,
pschem text, numtables int default 999999)
RETURNS TABLE
(
changed_tables text
)
AS
$BODY$
DECLARE
changed_tables text[] := array []::text[];
tblnames CURSOR FOR
SELECT tablename
FROM pg_tables
WHERE schemaname = pschem
and tableowner <> newowner
limit numtables;
BEGIN
FOR stmt IN tblnames
LOOP
EXECUTE 'alter table ' || pschem || '.' || stmt.tablename || ' owner to ' || newowner || ';';
changed_tables = array_append(changed_tables, stmt.tablename::text);
END LOOP;
RETURN QUERY SELECT unnest(changed_tables::text[]);
END
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
The function accepts an optional third parameter which specifies the number of tables to change ownership of. You can use this parameter to change ownership in batches if the number of tables is very high.
Execute the function,
SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>');
Or batch it (10 tables in the example below),
SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>', 10);