ETL Pipelines With Airflow

Introduction

In this blog post, I want to go over the operations of data engineering called Extract, Transform, Load (ETL) and show how they can be automated and scheduled using Apache Airflow.

Extracting data can be done in a multitude of ways, but one of the most common ways is to query a WEB API. If the query is successful, then we will receive data back from the API's server. Often times the data we get back is in the form of JSON. JSON can pretty much be thought of as semi-structured data or as a dictionary where the dictionary keys and values are strings. Since the data is a dictionary of strings this means we must transform it before storing or loading it into a database. Airflow is a platform to schedule and monitor workflows and in this post, I will show you how to use it to extract the daily weather in Ha Noi from the OpenWeatherMap API, convert the temperature to Celsius, and load the data in a simple PostgreSQL database.

Let's first get started with how to query an API.

Calling An API In Python

To use a Web API to get data, you make a request to a remote web server and retrieve the data you need. In Python, this is done using the requests module. Below I wrote a module, getWeather.py, that uses a GET request to obtain the weather for Ha Noi, VN. To get a better feel for how the request below works, check out the OpenWeatherMap API documentation page here.

Notice that I keep my API key in a separate file called config.py. In order to use this code yourself you would have to obtain your own API key and either substitute it into the code directly or have a variable API_KEY = your-API-key in a config.py file. After the request has been made, I check to see if it was successful by checking the status_code,

otherwise, I print an error. Proper exception handling here is definitely something I will add in the future. If the request is successful, then weather data is returned and is then dumped into a JSON file with a name that is the current date using the JSON package.

The above code is stored in a file titled getWeather.py and be run from the command line by typing from the appropriate directory:

Note that this is the exact Bash command that I'll use to have Airflow collect daily weather data. A great introduction to using APIs with Python can be found here. Now, let's go over how to set up a PostgreSQL database.

Setting Up PostgreSQL Database

I went over the basics of how to use PostgreSQL in a previous blog post, so I'll just present the code I used to make one here. The code below creates a table called weather_table in a local PostgreSQL database named WeatherDB.

I only take a subset of the data that is returned from OpenWeatherMap. Specifically, I transform and load the following into the database,

This script is stored in a file name makeTable.py and can be run using the command,

From the appropriate directory and before we set up our Airflow job. Since this is just one single table, I'm not going to worry about such things as primary and foreign keys. Now we can dive into Airflow!

Introduction To Airflow

As mentioned in the introduction Airflow is a platform to schedule and monitor workflows as well as a method to set up data pipelines. Data pipelines in Airflow are made up of DAGs (Directed Acyclic Graphs) that are scheduled to be completed at specific times. Each node in the DAG is a task that needs to be completed. Tasks that are dependent upon the completion of other tasks are run sequentially and tasks that are not dependent upon one another can be run in parallel.

The main components of Airflow are:

  • Metadata DB (database): Keeps track of tasks, how long each run took, etc.
  • Webserver (Flask-based UI): The web server talks to metadata database to get information to present.
  • Scheduler: This scrolls the file system and puts things into the queue.
  • Workers: These are the machines that actually do the tasks; they can be separate machines from the scheduler or the same.

Airflow will dump all information about your DAGs into logs. The logs are going to be dumped into a file or database as well. Just for simplicity, I made a local directory with the path,

Notice the choice of the directory to dump the logs is decided by what base_log_folder is set to in the airflow.cfg file. You can change it to store the logs remotely by setting the remote_base_log_folder variable in the airflow.cfg file.

Installing Airflow

To install airflow first set your airflow home directly by typing the following into your terminal,

I chose to set AIRFLOW_HOME=~/airflow which is the default setting. We can now install airflow with PostgreSQL using pip:

Metadata DB

We then initialize the metadata database by typing,

Out of the box, Airflow uses an SQLite database, which you should outgrow fairly quickly since no parallelization is possible using this database backend. The default SQLite database file will be located in the directory: AIRFLOW_HOME/airflow.db. You can change the database choice using the sql_alchemy_conn variable in the airflow.cfg file.

Webserver

We can start the webserver locally using the command,

Then plug in http://0.0.0.0:8080/ into browser and you will get the Airflow UI. The web server is extremely helpful to understand what DAGS are running, how long they ran, when they ran, etc. It is also helpful in setting up connections to databases as I will show you how to do it later.

Scheduler

The Airflow scheduler monitors all tasks and all DAGs and triggers the tasks to run. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it contains The Airflow scheduler is designed to run as a service in an Airflow production environment. To kick it off, all you need to do is type,


Workers

In this example, I won't be using any separate workers since I'm running this on my personal computer. Let's now get into how to use Airflow to set up an ETL pipeline!

An Example ETL Pipeline With Airflow

Let's go over an example of an Airflow DAG that calls the OpenWeatherMap API daily to get the weather in Ha Noi, VN and stores the data in the Postgres database that we created. The first thing we need to do is to create a connection to the database (postgres_conn_id). We do this by going the Airflow Webserver in our web-browser and clicking on Admin tab and then choosing connections as shown below:


Next, click on the create link and enter the information relevant to create our Postgres connection. You can see what I did below,

We then click save and can now use weather_id as our postgres_conn_id connection id.

Now, let's jump into creating our DAG; this will be stored in a python file we will call by convention the DAG definition file. In this file, the first thing we do is import all the necessary libraries:

You can see that we import both BashOperator and PythonOperator. While DAGs describes how to run a workflow and what tasks need to be completed before other operators determine what operations get done in those tasks. DAG operators generally run independently of other operators unless there are dependencies between the two. Once an operator is instantiated, it is referred to as a “task”.

Now we define the Python function that will transform and load our JSON data object into our database. This can be seen below in the function, load_data:


You can see that we instantiate a PostgresHook object and pass our postgres connection id, weather_id, to the constructor. We then get the current day's date so that we can load the appropriate JSON data from the API request of this day. Once we load the data, we can observe that it is a dictionary with string key-value pairs. We then transform the values in this dictionary and check to make sure that the numerical values are not NaNs using NumPy's isnan function. If there are any NaNs in the numerical data we flag this data as invalid.

Next, we cast all the individual data field values into a tuple which we then pass as a parameter along with the SQL insertion command, insert_cmd, into the PostgresHook object's run method. The run method then inserts the data into the database. More checks on the validity and quality of our data would be better, but for the purposes of this blog post what we have done is sufficient. Also, note that as we have done things now, we enter one row into our database at a time, ideally to be efficient, we would load more than one data point at a time.

Now let's dive into a DAG definition below,

You can see that we first define our default parameters, default_paramter. This is a dictionary that includes such information as the owner of the DAG and how many times, and how frequently to retry running the DAG if it fails. Next, we instantiate our dag in the command,


The dag_id needs to be unique. The dag_id will be passed off to all the tasks which need to be completed during their instantiation. Every DAG will have a datetime object called the start_date which should be a future date as well as timedelta object, schedule_interval, that dictates how often to run the DAG. I set my DAG to run every 1440 minutes, i.e. every day.

Next, we instantiate a BashOperator which becomes a task that executes the API call as discussed before:

Notice that we pass the DAG object in through the operator's constructor. We note that task_id has to be unique within the DAG pipeline. We next instantiate a PythonOperator, task2, that transforms and loads the JSON data that was pulled from API into the database:


Notice that we pass the function, load_data through the constructor as a keyword parameter, python_callable.

Finally, we set up the DAG pipeline by saying one task depends on the other being completed. You can declare that task1 needs to be completed before task2 can be started by using the following notation at the bottom of your .py script (DAG definition file):

or,

Tasks that don't have dependencies with each other can be run concurrently.

That's it! Pretty cool right? The last thing I'll go over is some tricks I used to debug code while working with Airflow.

Debugging Airflow Codes

Debugging code is always difficult and I found some useful tricks to help you figure out why your DAGs aren't working properly. First, you can see if there is a Python syntax error by "compiling it,"

You can then test an individual task within a dag by using the command,

You can also test the whole dag by doing a backfill,

Sometimes, in order to notify Airflow of an update you may need to delete the .pyc files or even the DAGs themselves. If you need to delete a dag, first delete the DAG data from the metadata_db database:

Then you can delete DAGs by clearing the task instance states:

Some results

Task run successfully in Airflow web server,


And you can see the log result for details


Finally, data has been updated in the database automatically every day.


Conclusions

Airflow is an extremely useful tool for building data pipelines and scheduling jobs in Python. It is simple to use and in this post, I went over an example of how to perform ETL using Airflow. There are definitely more things Airflow can do for you and I encourage you to learn more about it. For a great overview video of Airflow check out this talk and for a more detailed introduction to Airflow check out the documentation page.

Comments

Popular posts from this blog

Sending Emails using Apache Airflow Email Operator

Built a working Hadoop-Spark-Hive-Superset cluster on Docker

ETL Process Using Airflow and Docker