> ## Documentation Index
> Fetch the complete documentation index at: https://www.integrate.io/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Aurora PostgreSQL source for ELT & CDC

> Configure Aurora PostgreSQL as a source in Integrate.io ELT & CDC. Set up logical replication from your Aurora PostgreSQL database instances.

## Requirements

* Aurora cluster connecting to WRITER endpoint.

## Enable Logical Replication

<Steps>
  <Step>
    Go to AWS RDS dashboard
  </Step>

  <Step>
    Create and configure parameter group - select your Aurora PostgreSQL version (e.g. shown below is postgres12). Choose **aurora-postgres10 or above**.
  </Step>

  <Step>
    Chose type as **DB Cluster Parameter Group**.

    <Note>
      **NOTE:** 

      You can skip this step if you have already created a parameter group.
    </Note>

    <Frame>
      <img src="https://mintcdn.com/integrateio/SIDFEDRgXpxG0yrn/images/cdc/sources/image-79.png?fit=max&auto=format&n=SIDFEDRgXpxG0yrn&q=85&s=86cf91005438a0788a7ad19646a69ac9" alt="Creating a DB Cluster Parameter Group for Aurora PostgreSQL in AWS RDS" width="2234" height="1196" data-path="images/cdc/sources/image-79.png" />
    </Frame>

    Next, go to the parameter group page, search for `rds.logical_replication`. Select this row and click on the `Edit parameters` button. Set this value to `1`

    <Frame>
      <img src="https://mintcdn.com/integrateio/SIDFEDRgXpxG0yrn/images/cdc/sources/image-80.png?fit=max&auto=format&n=SIDFEDRgXpxG0yrn&q=85&s=1113525cc45622671a348e66fe8c12ca" alt="Setting rds.logical_replication parameter to 1 in the parameter group" width="1326" height="582" data-path="images/cdc/sources/image-80.png" />
    </Frame>

    Apply the parameter group to the database

    <Frame>
      <img src="https://mintcdn.com/integrateio/SIDFEDRgXpxG0yrn/images/cdc/sources/image-81.png?fit=max&auto=format&n=SIDFEDRgXpxG0yrn&q=85&s=1d9eb78a27356e5e339c596ce60b7ae2" alt="Applying the parameter group to the Aurora PostgreSQL database instance" width="1530" height="604" data-path="images/cdc/sources/image-81.png" />
    </Frame>
  </Step>
</Steps>

## Create role for sync

Create a sync user for Integrate.io by executing,

```bash theme={null}
CREATE ROLE flydata WITH PASSWORD '<your_password>' LOGIN;
```

Assign the replication role to the user,

```bash theme={null}
GRANT rds_replication TO flydata
```

## Grant necessary privileges

Grant the privileges for the database and schema by running the following queries,

```bash theme={null}
GRANT CREATE ON SCHEMA <enter schema> TO flydata;
GRANT USAGE ON SCHEMA <enter schema> TO flydata;
GRANT CREATE ON DATABASE <enter database> TO flydata;
GRANT SELECT ON ALL TABLES IN SCHEMA <enter schema> TO flydata;
GRANT REFERENCES ON ALL TABLES IN SCHEMA <enter schema> TO flydata;
```

## Assign table ownership

Create a replication group role which will allow for shared ownership of the tables by the original owner as well as the `flydata` user.

```bash theme={null}
CREATE ROLE replication_group;
```

Add user to `replication_group`.

```bash theme={null}
GRANT replication_group to flydata;
```

Then, create the following function which adds the existing table owners in a given schema to the `replication_group` role. This lets the original users **retain ownership of the tables**.

```sql expandable theme={null}
CREATE OR REPLACE FUNCTION public.add_existing_owners_to_replication_group(
    newowner text,
    pschem text)
    RETURNS TABLE
            (
                added_users text
            )
AS
$BODY$
DECLARE
    tblnames CURSOR FOR
        SELECT DISTINCT tableowner
        FROM pg_tables
        WHERE schemaname = pschem
          AND tableowner <> newowner;
    is_already_member boolean;
    added_users       text[] := array []::text[];
BEGIN
    FOR stmt IN tblnames
        LOOP
            EXECUTE $$SELECT * from pg_has_role('$$ || stmt.tableowner || $$', '$$ || newowner ||
                    $$', 'member');$$ INTO is_already_member;
            IF NOT is_already_member THEN
                EXECUTE 'GRANT ' || newowner || ' TO ' || stmt.tableowner || ';';
                EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
                        ' GRANT SELECT ON TABLES TO ' || newowner || ';';
                EXECUTE 'ALTER DEFAULT PRIVILEGES FOR ROLE ' || stmt.tableowner || ' IN SCHEMA ' || pschem ||
                        ' GRANT REFERENCES ON TABLES TO ' || newowner || ';';
                added_users = array_append(added_users, stmt.tableowner::text);
            END IF;
        END LOOP;

    RETURN QUERY SELECT unnest(added_users::text[]);
END
$BODY$
    LANGUAGE plpgsql VOLATILE
                     COST 100;
```

Execute the function,

```sql expandable theme={null}
SELECT public.add_existing_owners_to_replication_group('replication_group', '<schema_name>');
```

Now create the function which will change the owners of all tables in a given schema to `replication_group`,

```sql theme={null}
CREATE OR REPLACE FUNCTION public.change_schema_tables_owner_to_replication_group(
    newowner text,
    pschem text, numtables int default 999999)
    RETURNS TABLE
            (
                changed_tables text
            )
AS
$BODY$
DECLARE
    changed_tables text[] := array []::text[];
    tblnames CURSOR FOR
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = pschem
          and tableowner <> newowner
        limit numtables;
BEGIN
    FOR stmt IN tblnames
        LOOP
            EXECUTE 'alter table ' || pschem || '.' || stmt.tablename || ' owner to ' || newowner || ';';
            changed_tables = array_append(changed_tables, stmt.tablename::text);
        END LOOP;
    RETURN QUERY SELECT unnest(changed_tables::text[]);
END
$BODY$
    LANGUAGE plpgsql VOLATILE
                     COST 100;
```

The function accepts an optional third parameter which specifies the number of tables to change ownership of. You can use this parameter to change ownership in batches if the number of tables is very high.

Execute the function,

```sql theme={null}
SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>');
```

Or batch it (10 tables in the example below),

```sql theme={null}
SELECT public.change_schema_tables_owner_to_replication_group('replication_group', '<schema_name>', 10);
```
