ETL Process Using Airflow and Docker
Introduction
In this article, I will create an ETL process for extracting Forex data and wrap the whole thing as a data pipeline using Airflow and Docker.
The ETL process will extract data from fixer.io API, transform it, and load it to a PostgreSQL database. This project aims to have an automated process that constantly feeds the PostgreSQL database with data. Every 2 minutes, the ETL process will load an updated batch of Forex data.
Note that this article assumes some knowledge of Airflow, Docker, Python, and SQL. I won't go into too many details to keep this article short.
Project Steps
1. Setting up Airflow Architecture
2. 1st Dag - Check if API is available
3. 2nd Dag - Create a table
4. 3rd Dag - Extract
5. 4th Dag - Transform
6. 5th Dag - Load
7. Query data in the pgAdmin UI
Step #1 - Setting up Airflow Architecture
The first thing we should do is set up the basic airflow architecture. I will use a docker-compose file to deploy all the necessary components for settings up the Airflow environment. With Docker, settings up an architecture with various components will be as easy as running a single command called docker-compose up.
The components of the architecture are:
1. Airflow Webserver
2. Airflow Scheduler
3. Airflow Worker
4. Airflow Trigger
5. Flower
6. PostgreSQL database
7. pgAdmin UI
8. Redis Database
I'll create a new directory for the project called "Airflow_Forex_ETL":
In the directory, create two files: .env and docker-compose.yaml.
The .env will contain these two environmental variables:
Then I'll create a file called: docker-compose.yaml and copy-paste the following code:
Now to deploy Airflow, we just need to run the command:
Now Airflow is up and running. We can check the UI at localhost:8080.
Username: airflow
Password: airflow
As we can see, Airflow is running, but there are no tasks as we haven't written anything yet. Now that Airflow is running, we can start with the ETL process.
Note: After deploying Airflow for the first time, we should see the following directories added to the project: dags, logs, and plugins.
Step #2 - 1st Dag - Check if API is available
Let's write the first dag in the data pipeline. The first task will be to simply check if the API that will be used to pull data is available to connections.
The API we will use to pull data belongs to Fixer.io. They provide a free and lightweight API that can be used to extract Forex data. Before we begin, you should log in to the site and get an API key (for free).
Next, we need to create a new connection in Airflow. Go to Admin -> Connections -> New Connection.
This will allow you to fill out a form to create the new connection to fixer.io. Just fill the form with the following details:
Now we can create the dag. First, let's import some basic packages and set default settings that will be applied for all of the dags in the pipeline.
I'll use Airflow's HttpSensor, which will use the new connection that we created to check if the API of the host (fixer.io) is available for GET request.
I will also set the schedule_interval argument to be timedelta (minutes=2), meaning that Airflow will run the ETL process every two minutes and grab updated Forex rates.
This dag will send a GET request to fixer.io using our new connection. If the response has "EUR" in it, then mark this dag as a success.
Tip
Before going to the UI and checking the new dag, we should test it first. Let's check if the Airflow Scheduler can run this dag. Run the docker ps command and grab the container id of the airflow-scheduler:
In my case, the container id is: fdebb803a94d. Run these commands to test the dag inside the airflow-worker container:
Looks at the last row, if it looks like this then the test is successful.
As we can see, the criteria for success are met. That means the response contained the string 'EUR', and the dag should run without errors.
Let's check the UI for the new dag:
As we can see, the new dag is ready to be deployed. Let's check it in the graph view and run it:
Success. The dag is working. That means that the API is available.
Step #3 - 2nd Dag - Create a table
The next step is to create a table in a PostgreSQL database. First, we need to create another connection to the database. It will have the following details:
To create a table, we need to use the PostgresOperator, provide it the con_id of the new connection that we just made, and provide an SQL query for creating the table:
As before, let's go inside the container and verify this. This time let's connect to the container: airflow-postgres and check if the table "rates" has been created.
Run the docker ps command and grab the container id of the airflow-postgres container.
My container id is: 230f5c259ec6. Now let's connect to the container and check the new table:
Output
Great. As we can see, the table has been created inside the airflow-postgres container, but, It is not populated yet with data.
Let's check the UI and move on to actually extracting the data.
Step #4 - 3rd Dag - Extract
In this part, I'll use SimpleHttpOperator to extract the data into a JSON object which will be stored in Airflow's internal metadata in a component called XCom.
This dag will use the first connection we made, called: "is_api_available", and will extract the current day's rates. Then it will store it in XCom. Let's go to the UI and run the dags:
Seems like everything is successful. Let's check the extracted data in the XCom. Click on the dag: "extract_user" and go to the XCom.
As we can see, the data has been extracted and is stored in XCom.
Step #5 - 4th Dag - Transform
In this step, I will transform the data into a Pandas DataFrame and export it as a CSV file. I will use a PythonOperator to pull the extracted data from XCom, store it as a DataFrame, and export it to CSV.
The CSV files will be temporarily stored in the airflow-worker container until the next step.
Let's check the UI and run the pipeline.
Looks like everything is working. To verify this, let's check the CSV file in the airflow-worker container. As usual, grab the container id, go inside the container using docker exec -it command, and go to the location: /tmp/processed_data.csv.
My container id is: 01e04c68bf5d.
Output
Great. Now we have the data in a CSV file, let's load it into the airflow-postgres container.
Step #6 - 5th Dag - Load
I will use a PostgresHook to load the data from /tmp/processed_data.csv
Let's check the UI and run the pipeline:
Looks like it's working. Let's check the airflow-postgres container and see if the data is stored in the table rates:
Output
So now we have the data stored in the PostgreSQL database. Let's launch a pgAdmin UI to query this data.
Step #7 - Query data in the pgAdmin UI
To launch pgAdmin, simply go to localhost:HTTP:/localhost:5050. The UI should already be running because of the docker-compose file.
To log in, enter the Username and Password from the docker-compose file.
Username: admin@admin.com
Password: root
To query the data, we need to create a new server, which will be connected to the PostgreSQL container.
Click on "Add New Server"
The name of the server will be my_db.
Next, we need to connect this server to PostgreSQL, the port will be 5432, User name and Password are "airflow".
We need to put the IP address of the airflow-postgres container in the Host Name field. Check the IP of the container:
So in my case, the hostname will be 172.21.0.3:
Click save, and let's try to query the data from PostgreSQL:
And that's it. Now we have an automated pipeline that will run every 2 minutes and will populate the database with updated data. Thanks for reading!
Comments
Post a Comment