To iterate API calls over a list of IDs pulled from a Snowflake table, query the Snowflake table to retrieve the IDs, configure a REST API source with a dynamic URL that substitutes each ID into the endpoint path, and let the ETL pipeline make one API call per record and collect the responses into a unified output dataset. This guide is for data engineers who need to enrich or update records in bulk by calling a per-record API endpoint once for each ID stored in a database table. After reading, you will be able to build a configurable pipeline that reads IDs from Snowflake, loops API calls per ID with rate limit handling, and loads all responses to a destination table without custom Python scripts.
The pattern is called per-record API enrichment. Query a Snowflake table for a list of IDs, construct one API request per ID using a dynamic URL (for example, https://api.example.com/portfolios/{id}), collect all responses, and write the aggregated results to a destination table. Integrate.io's REST API source component handles this loop natively using the #{fieldName} URL syntax, substituting each ID from the Snowflake query result into the URL for each request.
The Problem
Many APIs are designed for record-level access. You cannot fetch all client portfolios in one call; you must query /portfolios/{client_id} for each client ID individually. The same pattern applies to CRM contact records, fund performance metrics, and account details: one endpoint, one ID per request.
Teams that hit this requirement typically write a Python script: pull IDs from Snowflake, loop over them, call the API per ID, write results back to a file or a staging table. The script works until it doesn't. IDs change in the source table. The API rate-limits mid-loop and the script exits without completing. A developer edits the loop logic and silently drops a filter condition, causing records to go missing without any error message.
What is needed is a configurable pipeline that reads the ID list from Snowflake, iterates API calls per ID with rate limit handling, and loads all results in a single auditable run.
To learn how Integrate.io can help to automate the API data integrations, reach out to our team to discuss your use case with our Sales engineer.
What You'll Need
- A Snowflake table containing the IDs you want to iterate over (account IDs, client IDs, portfolio IDs, or any entity-level identifier)
- API documentation showing the per-record endpoint URL format (for example, /api/v1/portfolios/{id})
- API credentials: an API key, OAuth token, or basic auth credentials
- The API's rate limit specification (requests per second or per minute), if documented
- An ETL tool that supports dynamic URL construction for per-record API calls; Integrate.io provides this natively through its REST API source component and #{fieldName} URL syntax
How to Iterate API Calls Over a List of IDs Pulled From a Snowflake Table: Step-by-Step
Step 1: Query Snowflake to Retrieve the ID List
This step produces the set of IDs the pipeline will loop over. The quality of this query determines which records get fetched and which get skipped, so filtering decisions made here affect every downstream step.
What to do:
- Write a SELECT query against the Snowflake table that returns the ID column and any other fields you will need to join back to the API response later (for example, a name or type column that does not appear in the API response body)
- Filter out inactive records, test records, and deactivated entities using a WHERE clause on a status column if one exists
- For incremental runs, add a WHERE clause filtering on a last_modified or created_at column so the query returns only IDs added or changed since the last successful pipeline run
- Sort the results by ID if consistent ordering matters for debugging rate limit behavior
Output of this step: A Snowflake query result set containing the filtered list of IDs to be passed to the API, with inactive or already-processed records excluded.
Step 2: Configure the REST API Source with a Dynamic URL
This step wires the Snowflake ID list to the API endpoint by constructing a URL template that substitutes each ID at runtime. One row from the Snowflake result becomes one API request.
What to do:
- In your ETL tool's REST API source component, set the base URL using a placeholder for the ID field; in Integrate.io, the syntax is #{fieldName} where fieldName matches the column name from your Snowflake query result
- For example: if the Snowflake query returns a column named portfolio_id and the API endpoint is /api/v1/portfolios/{id}, configure the URL as https://api.example.com/api/v1/portfolios/#{portfolio_id}
- Configure authentication headers (Authorization: Bearer YOUR_TOKEN, or an X-API-Key header) to apply to every request in the loop
- Set the HTTP method to GET for read operations; use POST or PATCH if the API requires a request body per record
Output of this step: A configured REST API source that makes one API call per ID, with the correct ID substituted into the URL for each record.
Where Integrate.io helps: Integrate.io's REST API source component supports the #{fieldName} URL syntax natively. When a Snowflake source feeds the REST API source, Integrate.io automatically enables single-record mode and makes one API call per row without any custom looping code.
Step 3: Handle API Rate Limits to Avoid Dropped Records
Rate limits are the most common point of failure in per-record API enrichment pipelines. A 429 (Too Many Requests) response means the API rejected that specific request; if the pipeline does not retry, that record is missing from the output with no visible error.
What to do:
- Check the API documentation for rate limit specifications; common limits are 10 requests per second, 100 requests per minute, or 1,000 requests per hour
- Configure a delay between requests in the ETL tool: a 100ms delay stays within 10 requests per second; a 600ms delay stays within 100 requests per minute
- Configure retry logic for 429 responses: wait for the value in the Retry-After response header, or default to 60 seconds if the header is absent, then retry the same record
- Do not treat a 429 as a non-fatal skip; an ID that triggered a 429 was not fetched and will be missing from the output if the pipeline does not retry
Output of this step: A configured rate limit delay and retry rule that prevents 429 errors from silently dropping records from the output dataset.
Step 4: Parse the API Response and Extract the Fields You Need
After a test run, the raw API response is a JSON object (or array) per record. This step maps the fields you need from the response into flat columns the destination table can receive.
What to do:
- Run a test with 10 to 20 IDs before expanding to the full list; inspect the raw response body for one record
- Identify the specific fields needed in the destination table; if the response contains nested objects (for example, a portfolio object with a nested performance sub-object), extract specific subfields using JSON path expressions rather than loading the entire nested structure
- Add the source ID as an explicit column in the output; API response bodies often do not include the ID used to call the endpoint, since it is implicit in the URL; without it, you cannot join the API results back to other Snowflake tables
- Add an ingested_at timestamp column to every record so you can identify when the data was fetched
Output of this step: A flat, destination-ready dataset with API response fields extracted, the source ID included as a column, and an ingested_at timestamp on every record.
Where Integrate.io helps: Integrate.io's SELECT transformation component lets you map API response fields by JSON path expression, add computed columns like ingested_at using the NOW() function, and include the Snowflake source ID alongside the API response fields in the same output row.
Step 5: Handle Errors for Individual Records Without Stopping the Loop
When iterating over hundreds or thousands of IDs, some will return errors: 404 for a deactivated entity, 403 for a record the API token cannot access, 500 for a transient server error. The pipeline should continue past individual failures rather than stopping the entire run.
What to do:
- Configure the pipeline to write failed records to a separate error table rather than halting on the first error; the error table should record the source ID, the HTTP status code returned, the error response body, and the timestamp of the failure
- Successful records should continue loading to the destination regardless of individual record failures
- After each run, review the error table to identify patterns: all 404s from deactivated accounts can be filtered out of the Snowflake source query; repeated 500s from the same ID may indicate a data quality issue in the source record
Output of this step: A completed pipeline run that loads successful records to the destination and writes failed records to an error table, without stopping the loop on individual failures.
Step 6: Load the Aggregated Results to a Destination Table in Snowflake
With all responses collected, this step writes the output to a Snowflake table in a mode that avoids duplicates when the same IDs are refreshed on subsequent runs.
What to do:
- Configure a Snowflake destination table to receive the collected API responses
- Choose upsert (merge) as the load mode if the same IDs will be refreshed on each run; set the source ID as the primary key so existing rows are updated rather than duplicated
- Choose append if you are building a point-in-time history and each run should add a new snapshot row per ID
- After the first full run, verify that the row count in the destination matches the number of IDs in the source query minus the count of records in the error table
Output of this step: A Snowflake destination table loaded with one row per successfully fetched ID, with existing records updated if upsert mode is configured.
Step 7: Schedule the Pipeline and Configure Incremental ID Selection
A one-time run validates the pattern. Scheduling it with incremental ID filtering makes it operational: each subsequent run fetches only the IDs that changed, keeping API usage proportional to actual data change rather than re-fetching the entire ID population every day.
What to do:
- Set the pipeline to run on a schedule that matches how frequently the source ID data changes; daily is standard for most entity-level data
- Modify the Snowflake source query to filter on a last_modified or created_at column, returning only IDs where a record was added or updated since the last successful run
- Store the last successful run timestamp as a pipeline variable and pass it to the WHERE clause; this lets the ID list update automatically each run without manual date changes
- After each scheduled run, verify the error table for new failures and review whether any recurring error patterns should be addressed in the Snowflake source filter
Output of this step: A scheduled pipeline that fetches only new or changed IDs from Snowflake on each run, iterates API calls for those IDs, and loads the results to the destination without re-processing the full population.
Common Mistakes to Avoid
-
Not including the source ID in the API response output: The API response body often does not include the ID used to call it, since the ID is implicit in the URL. If you do not explicitly add the source ID as a column in the output dataset, you cannot join the API results back to other tables in Snowflake. Always add the source ID as a mapped column in the transformation step.
-
Ignoring rate limit responses: A 429 error means the record was not fetched. If the pipeline continues without retrying that record, the output is missing data with no visible indication. Always configure retry logic for 429 responses with a wait period before the retry attempt.
-
Running the full ID list on every pipeline execution: Re-fetching all IDs daily when only a subset changed is wasteful and consumes API rate limit quota unnecessarily. Use incremental ID selection from Snowflake based on a modified timestamp to keep each run proportional to actual data change.
-
Not writing individual record failures to an error table: If the pipeline stops on the first failed record, no subsequent IDs are fetched. If it silently skips failures, data goes missing with no audit trail. Write failures to a dedicated error table and continue the loop; review the error table after each run.
-
Loading results without a primary key for upsert: Without a primary key configured in the destination, re-running the pipeline appends duplicate rows rather than updating existing ones. Always configure a primary key on the destination table when using upsert mode.
-
Not testing with a small ID sample first: Running the full ID list in the first test consumes rate limit quota and loads unvalidated data to the destination. Test with 10 to 20 IDs, inspect the output manually to confirm field mapping and ID inclusion, then expand to the full list.
Conclusion
Iterating API calls over a list of IDs pulled from a Snowflake table is a standard per-record enrichment pattern that replaces fragile Python looping scripts with a configurable, auditable pipeline. The process follows a consistent sequence: query the filtered ID list from Snowflake, configure a dynamic URL with the ID placeholder, set rate limit delays and retry rules, parse the response and add the source ID to the output, write errors to a separate table, load successes to the destination with upsert mode, and schedule with incremental ID filtering.
Integrate.io's REST API source handles the #{fieldName} URL substitution and per-record call loop natively, with the Snowflake source feeding IDs directly into the loop without custom code. The transformation layer maps nested response fields, adds timestamps, and joins the source ID back to the output in the same component.
Once this pattern works for one API endpoint and ID type, the same pipeline structure applies to any other endpoint that follows a per-record call pattern. A single reusable template replaces an entire category of maintenance-heavy Python scripts: one for Addepar portfolio IDs, one for Salesforce contact IDs, one for fund performance IDs. The pipeline becomes the script, and the script never needs to be rewritten again.