Python celebrated its 30th birthday earlier this year, and the programming language has never been more popular. With the rise of data science and artificial intelligence, Python is still the go-to choice for data engineers everywhere, including those who build ETL pipelines. 

However, building an ETL pipeline in Python isn't for the faint of heart. You'll encounter challenges like parallelism, logging, job scheduling, and database connections. The good news is that there are various ETL tools and packages that make these processes easier.

Below, learn how to build an ETL pipeline in Python and transform your data integration projects. 

Read more: Top 6 Python ETL Tools for 2021

Table of Contents

Integrate your data now! Schedule a Integrate.io demo.

What You Should Know About Building an ETL Pipeline in Python

An ETL pipeline is the sequence of processes that move data from a source (or several sources) into a database, such as a data warehouse. There are multiple ways to perform ETL. However, Python dominates the ETL space. 

Python arrived on the scene in 1991. Created by Guido von Rossum, this programming language became an instant favorite among developers because of its easy-to-use syntax and readability. Both factors reduced the costs typically associated with program maintenance up to that point, making it even more popular in data science circles. 

That's not to say Python is a simple programming language. Far from it. 

Leveraging Python requires knowledge of relevant frameworks and libraries, so complete newbies might find it difficult to use. The language requires lots of practice before it can automate tasks, develop websites, or analyze data. 

Building an ETL pipeline in Python also requires specific skills. If you have a small company without a data engineering team, you might struggle to create complex pipelines from scratch unless you have a deep knowledge of this programming language. Thankfully, there is now a range of tools that make building Python ETL pipelines much easier. These include Apache Airflow and Luigi for workflow management, Pandas for moving and processing data, and self-contained toolkits like Pygrametl.

Below, learn how to put these resources into action.

Read more: Airflow vs. Luigi: Which ETL is the Best?

Schedule a demo with Integrate.io today and integrate data without breaking a sweat.

Pygrametl

Pygrametl is an open-source Python ETL framework with built-in functionality for common ETL processes. Pygrametl presents each dimension and fact table as a Python object, allowing users to perform many popular ETL operations. Pygrametl released the most recent version of its framework (Version 2.7) in May 2021.

Pygrametl runs on CPython with PostgreSQL by default but you can modify it to run on Python as well. Here's some example source code:

import psycopg2 import pygrametl from pygrametl.datasources import SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" sales_pgconn = psycopg2.connect(sales_string)

This Pygrametl beginner’s guide (updated for 2021) offers an introduction on how to extracting data and loading it into a data warehouse. The source code below demonstrates how to establish a connection to a database:

import psycopg2 import pygrametl from pygrametl.datasources import SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" sales_pgconn = psycopg2.connect(sales_string)

psycopg2 is a Python module that facilitates connections to PostgreSQL databases. Before connecting to the source, you must feed the psycopg2.connect() a string containing the database name, username, and password. You can also use this functionto connect to the target data warehouse:

dw_string = "host='10.0.0.13' dbname='dw' user='dwuser' password='dwpass'" dw_pgconn = psycopg2.connect(dw_string) dw_conn_wrapper = pygrametl.ConnectionWrapper(connection=dw_pgconn)

In the example above, the user connects to a database named “sales.” Below is the code for extracting specific attributes from the database:

name_mapping= 'book', 'genre', 'city', 'timestamp', 'sale' sales_source = SQLSource(connection=sales_pgconn, \ query="SELECT * FROM sales", names=name_mapping)

After extracting data from the source database, you can enter the transformation stage of ETL. In this example code, the user defines a function to perform a simple transformation. The function takes a row from the database as input and splits a timestamp string into its three constituent parts (year, month, and day):

def split_timestamp(row): timestamp = row['timestamp'] timestamp_split = timestamp.split('/') row['year'] = timestamp_split[0] row['month'] = timestamp_split[1] row['day'] = timestamp_split[2]

As mentioned above, Pygrametl treats every dimension and fact table as a separate Python object. Below, the user creates three dimension objects for the “book" and “time” dimensions, as well as a FactTable object to store these two dimensions:

book_dimension = Dimension(name='book', key='bookid', attributes= ['book', 'genre']) time_dimension = Dimension(name='time', key='timeid', attributes=['day', 'month', 'year']) fact_table = FactTable(name='facttable', keyrefs=['bookid', 'timeid'], measures=['sale'])

We now iterate through each row of the source sales database, storing the relevant information in each dimension object. The "ensure" function checks to see if the row already exists within the dimension, and if not, inserts it.

for row in sales_source: split_timestamp(row) row['bookid'] = book_dimension.ensure(row) row['timeid'] = time_dimension.ensure(row) fact_table.insert(row)

Finally, you can commit this data to the data warehouse and close the connection:

dw_conn_wrapper.commit() dw_conn_wrapper.close()

Pygrametl provides a powerful ETL toolkit with many pre-built functions, combined with the power and expressiveness of regular Python. 

Airflow

While Pygrametl is a full-fledged Python ETL framework, Airflow has one purpose: To execute data pipelines through workflow automation. First developed by Airbnb, Airflow is now an open-source project maintained by the Apache Software Foundation. The basic unit of Airflow is the directed acyclic graph (DAG), which defines the relationships and dependencies between the ETL tasks you want to run.

Airflow's developers have provided a simple tutorial to demonstrate the tool's functionality. (The tutorial covers all versions of Airflow up to the most recent iteration, 2.1.3, which came out in December 2020.) First, the user needs to import the necessary libraries and define the default arguments for each task in the DAG:

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }

The meaning of these arguments is:

  • owner: The owner of the task (often the owner's username in the operating system).
  • depends_on_past: If true, this argument stops a task from occurring if it has not succeeded in the previous attempt.
  • start_date: The date and time at which the task should begin executing.
  • email: The contact email for the task owner.
  • email_on_failure, email_on_retry: These arguments control whether the task owner receives an email notification when the task fails or retires.
  • retries: The number of times to retry a task after it fails.
  • retry_delay: The time in between retries.

Next, the user creates the DAG object that will store the various tasks in the ETL workflow:

dag = DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1), )

The schedule_interval parameter controls the time between executions of the DAG workflow. Here it is set to one day, which effectively means that Airflow loads data into the target data warehouse daily.

Finally, the user defines a few simple tasks and adds them to the DAG:

t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag,) t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, dag=dag, )

Here, the task "t1" executes the Bash command "date" (which prints the current date and time to the command line), while t2 executes the Bash command "sleep 5" (which directs the current program to pause execution for 5 seconds).

Airflow makes it easy to schedule command-line ETL jobs, ensuring that your pipelines consistently and reliably extract, transform, and load the data you require. The good news is that it's easy to integrate Airflow with other ETL tools and platforms like Integrate.io, letting you create and schedule automated pipelines for cloud data integration.

Using Airflow makes the most sense when you perform long ETL jobs or when a project involves multiple steps. You can restart from any point within the ETL process. However, Airflow is not a library. Since you have to deploy it, Airflow is not an optimal choice for small ETL jobs. 

Read more: Apache Airflow: Explained 

Pandas

Pandas is a Python library for data analysis, making it an excellent addition to your ETL toolkit. The most recent version, 1.3.2, came out in August 2021. The Pandas library includes functionality for reading and writing many file formats, including:

  • Text files
  • CSV files
  • JSON files
  • XML/HTML files
  • Excel (.xlsb) files
  • HDF5 files
  • Parquet files
  • SQL queries
  • Google BigQuery

The code below shows just how easy it is to import data from a JSON file:

import pandas as pd pd.read_json('test.json')

The basic unit of Pandas is the DataFrame, a two-dimensional data structure that stores tabular data in rows and columns. Once you load data into the DataFrame, Pandas allows you to perform a variety of transformations. For example, the widely used "merge" function in pandas performs a join operation between two DataFrames:

pd.merge(left, right, how='inner', on=None, left_on=None, right_on=None, 

left_index=False, right_index=False, sort=True)

The meaning of these arguments is:

  • left, right: The two DataFrames you need to join.
  • how: The type of join operation ('inner', 'outer', 'left', or 'right').
  • on, left_on, right_on: The columns or index levels to use as join keys, possibly from the left or right DataFrames.
  • left_index, right_index: If True, these arguments use the index (row labels) from the left or right DataFrames as join keys.
  • sort: If True, sorts the resulting DataFrame by its join keys.

Use Pandas when extracting data, cleaning and transforming it, and writing it to a CSV file, Excel, or an SQL database.

Luigi

Luigi is an open-source tool that allows you to build complex pipelines. Although Luigi has many applications, it was tailor-made for Spotify, which means it may not be well-suited for your unique needs. However, some companies have adopted it over the years, including Deliveroo. 

Luigi handles:

  • Workflow management
  • Visualization
  • Dependency resolution 
  • Command-line integration

When using Luigi, there are “tasks” and “targets” — alternative names for "nodes" and "edges." Tasks consume targets, creating a chain reaction. 

To leverage Luigi, familiarize yourself with tasks as they are the basic building blocks. To create a task, create a class containing one or all of:

  • run()
  • requires()
  • output()
  • targets 

Unlike many pipeline systems, Luigi reverses the process of transferring information to the next node. The program begins with the last task and then checks if it's ready for execution. 

This option is best for simple ETL processes, such as logs. Since Luigi’s structure is rather strict, it limits more complex tasks. If you are building an enterprise solution, Luigi may be a good choice. 

How Integrate.io Helps With Building an ETL Pipeline in Python

Instead of devoting valuable time and effort to building ETL pipelines in Python, more organizations are opting for low-code ETL data integration platforms like Integrate.io. With an incredible range of pre-built integrations and a straightforward drag-and-drop visual interface, Integrate.io makes it easier than ever to build simple yet powerful ETL pipelines to your data warehouse.

The good news is that you don't have to choose between Integrate.io and Python. You can use them both with the Integrate.io Python wrapper, which lets you access the Integrate.io REST API within a Python program.

Getting started with the Integrate.io Python Wrapper is easy. Simply import the Integrate.io package and provide your account ID and API key:

from Integrate.io import Integrate.ioClient account_id ="MyAccountID" api_key = "V4eyfgNqYcSasXGhzNxS" client = Integrate.ioClient(account_id,api_key)

Next, you need to instantiate a cluster, a group of machines that you have allocated for ETL jobs:

cluster_type = "production" nodes = 2 name ="New Cluster #199999" description ="New Cluster's Description" terminate_on_idle = False time_to_idle = 3600 cluster = client.create_cluster(cluster_type, nodes, name, description, terminate_on_idle, time_to_idle)

The meaning of these arguments is:

  • cluster_type: Either "production" or "sandbox", depending on the use case.
  • nodes: The number of nodes in the cluster (between 2 and the maximum allowed for your account).
  • terminate_on_idle: If true, it terminates the cluster when it becomes idle.
  • time_to_idle: The amount of time (in seconds) after which the cluster becomes idle.

Clusters in Integrate.io contain jobs. The code below demonstrates how to create and run a new Integrate.io job:

cluster_id = 83 package_id = 782 variables = {} variables['OUTPUTPATH']="test/job_vars.csv" variables['Date']="09-10-2020"    job = client.add_job(cluster_id, package_id, variables)

There are other ways Integrate.io helps with building an ETL pipeline in Python. You can cleanse data to reduce the size of source data before you start your ETL project, schedule jobs for times that best suit your needs, and monitor jobs on a clear, user-friendly dashboard. Integrate.io also complies with data privacy regulations such as GDPR, CCPA, and HIPAA. 

Other benefits include an enormous range of pre-built data integrations, a simple pricing model that charges you for the number of connectors you use, and world-class customer service.

Are you building an ETL pipeline in Python in 2022? Integrate.io’s powerful data integration platform can help. Schedule a demo now!