Integrate.io provides features to efficiently extract, transform, and store data from various sources. Chartio provides Visual SQL features that let us explore and analyze data. Furthermore, it includes functionality to arrange charts and metrics in dashboards that can be shared. Both these tools can be used synergically.
In this post, we will cover how you to configured Integrate.io to use Chartio data. In a subsequent post, we will explain how to visualize the data provided by Integrate.io in Chartio.
Table of Contents:
- Why Use Integrate.io with Chartio?
- Using Integrate.io and Chartio Together
- Implementing the Solution
- Testing the Pipeline
Why Use Integrate.io with Chartio?
Integrate.io lets you create (even complex) ETL pipelines through an intuitive graphic interface. Integrate.io also provides functionality to pull data from Rest APIs and various integrations. For example, you can use Integrate.io to fetch data from third-party platforms like Facebook, Salesforce, Google Analytics, Linkedin, etc., transform it based on your use-case, and then save it to a data source supported by Chartio. The good thing is that Integrate.io supports many of these destinations, which means it is quite easy to connect Chartio to any data pipeline built with Integrate.io. This opens a wide range of possibilities for data that can be visualized through Chartio.
Using Integrate.io and Chartio Together
To get an idea of the steps involved, let's set up a real-world task and try to solve it through these two platforms.
Consider a hypothetical marketplace platform that merchants use to sell their products. Like most platforms, this platform provides the merchant with relevant data (about their customers, traffic, and orders) through a RESTful API. We want to visualize this data in Chartio from an analytical standpoint.
The API uses the Basic Authentication scheme for authorization, and its resource endpoints support these two query parameters,
- the "since" parameter; whose value is an ISO-formatted date timestamp. The purpose of this parameter is to retrieve the data created after the specified time.
- the "limit" parameter; to specify the number of data objects to receive in every paginated API call.
For pagination, the API follows the Link header standard. The API also has throttling mechanisms in place and only allows ten calls per minute, and has a maximum of 1000 requests in a day. The API requires the Content-Type header to be passed, and the JSON response returned by the API for the "Orders" endpoint looks something like this,
// These objects themselves are nested and so on.
You can see a complete sample of the API response here.
Implementing the Solution
The High-Level Plan
We will fetch data from the marketplace platform using its API (as described above) and store it in a data warehouse supported by Chartio. There are various options for the data destination. We'll go with SingleStore (previously known as MemSQL), which is known to provide fast performance for analytical and Business Intelligence systems. SingleStore is compatible with MySQL client software. It uses the same wire protocol and supports similar SQL syntax as MySQL. We'll be using a SingleStore managed cluster to seamlessly serve as the Integrate.io pipeline destination and the source for Chartio.
Since the API response contains many fields, we will have to filter the relevant fields and discard the rest. Since the data consists of fields containing array as values (for example, the tax_lines field), we will have to apply a flatten operation on them and store them into a separate SQL table in a normalized fashion. To be completely fair, Chartio also has some JSON extracting features, but it's much more efficient to parse the JSON within the data pipeline once before storing the data so that the data can be used elsewhere without any further preprocessing. For the visualizations discussed in this post, we'll need to extract the top-level fields in the "Orders" resource response and the tax line information, which is nested in an array. We can extract other fields as well, following a similar approach.
So the first milestone for us is implementing a pipeline in Integrate.io that can fetch data from the marketplace API, apply relevant transformations, and save it to a SingleStore cluster.
Implementing the Integrate.io Pipeline
Pulling data from the above-described REST API requires us to implement authentication, pagination, rate limiting, and handling nested JSON data. Fortunately, the REST API source component of Integrate.io provides all of these features out of the box, and we can configure it through the user-friendly interface in a few minutes. Apart from the REST API Source component, we'll be needing three more kinds of components in our pipeline,
- Select transformation; To filter relevant fields and extract nested information from the JSON.
- Clone transformation; To apply different select transformations on the same data object.
- Database destination; To finally write our data to a SingleStore database, which will be later used as the source for Chartio.
Combining these components appropriately, the final pipeline should look something like this:
This pipeline writes the data into two tables: orders and order_tax_lines. Following are the configurations of each of the components used in the pipeline:
1) The Rest API source component
Username and password details are provided for API authentication.
We've added the URL for the API and the content-type header value. Since the API uses link headers for pagination, the pagination scheme is set to "Link Headers". For rate-limiting, the sleep interval and maximum limits for the paginated requests are specified. We've added a base level JSON path expression to capture only the items in the "orders" array from the Base response. Next, the fields in the "Response Schema" section are automatically detected by Integrate.io along with their data types.
If you observe carefully, the URL makes use of three package variables; api_url which is the base URL of my API, limit_size which is the number of records a single page will return, and finally, the last_updated variable which we'll use for incremental loading of data when running the package frequently. The following screenshot contains the values of these variables:
The value of the last_updated variable is set to the following:
ToString(ToDate(CASE WHEN (COALESCE($_PACKAGE_LAST_SUCCESSFUL_JOB_SUBMISSION_TIMESTAMP,'')=='' OR $full_load==1) THEN '1900-01-01T00:00:00Z' ELSE $_PACKAGE_LAST_SUCCESSFUL_JOB_SUBMISSION_TIMESTAMP END),'yyyy-MM-dd\\'T\\'HH:mm:ss')
If it seems too complicated at first, you can find a detailed explanation about the method in the docs. This variable stores the date of the last successful load which is then used in the incremental fetching of the data from API.
2) The Select Transformations
The two components named parse_orders and parse_tax in our pipeline are used to select the desired fields from the data object and specify their data type.
The component flatten_tax and map_order_tax, as the name suggests, are used to extract the items from the array in the tax_lines field and associate order_id with them so that order and tax records can later be mapped to each other in the database.
The flatten function takes in an array of values and removes a level of nesting so that each item in the array will appear in a record of its own.
The JsonStringToMap function takes in a string object (tax_line in our case) and converts it into a map of key-value pairs from which we can extract the fields using the map#'fieldname' syntax.
3) Database Destinations
For the database destination, we need to configure the SingleStore database connection in Integrate.io. We can set up a SingleStore database either by installing the self-managed version on a machine or by launching a fully managed instance in the cloud. We're going with the latter approach for this solution as it's quicker. For managed SingleStore clusters, the connection details (hostname, username, etc) can be found on the SingleStore portal:
As mentioned earlier, SingleStore uses the same protocol as MySQL, so we can add our cluster details in the MySQL connection wizard.
Once the connection is set up, we need to configure the database destination components.
I've added the table names and specified the operation type as "Merge with existing data using the delete and insert" since we want the pipeline to incrementally update the data into our database. The columns to write to the database can be populated using the Auto-fill option. We just need to select the columns comprising the Primary Key or Composite Key that can uniquely identify an entry in the database table.
Testing the Pipeline
Once implemented, we can validate and then execute the package by creating a job. The job status can be monitored from the Integrate.io dashboard, and once the job is finished, we can verify if the data is saved properly by running SQL queries on the console of SingleStore studio.
You have now completed your Integrate.io configuration for Chartio. For the next step, refer to Part 2: Visualizing the Data with Chartio.