Playing with some Pandas functions and Airflow operators

Recently, I was dealing with a task where I had to import raw information into a staging PostgreSQL Database as part of a task in the Airflow DAG pipeline. I would like to share the approach used and exposes the valuable Pandas.json_normalize function and how it helps to prepare the data before importing.

Reading the official Pandas documentation, we can find a simple and self-descriptive statement about this function which say: “Normalize semi-structured JSON data into a flat table”. The key parameter corresponds to the data which could be a dict or list of dictionaries, more info here.

from pandas import json_normalize

my_user = [{
    'firstname': 'Geo',
    'lastname': 'Hernandez',
    'country': 'Spain',
    'username': 'gh',
    'password': 'nothing',
    'email': 'test@mycol.com'},
    {
        'firstname': 'Octi',
        'lastname': 'Hernandez',
        'country': 'Spain',
        'username': 'oh',
        'password': 'mypass',
        'email': 'testoct@mycol.com'}]
my_user_normalized = json_normalize(my_user)

Once you execute it, you should see a result like this:

      firstname   lastname   country   username password              email
0       Geo      Hernandez     Spain      gh  nothing     test@mycol.com
1       Octi     Hernandez     Spain      oh   mypass     testoct@mycol.com

The output of json_normalize helps us to prepare the data for the next step. We are going to export/save the normalised output as a csv file. It is required to add an additional module (from Pandas) for using the function csv as part of a dataframe. As the output of json_normalize is a dataframe with the normalized format, it is not required to do any additional step more than execute the csv function.

my_path_csv = Path('YourPathHere/user_normalized.csv')
my_user_normalized.to_csv(my_path_csv)

At this point, we have a simple, but valid code for continuing working and now integrating Airflow. The flow that we are going to use in Airflow consists of the following steps:

create db table –> store_user_in_csv –> import_user_into_db

We are aiming to create a new table in PostgreSQL, for this objective, we could perfectly use the Postgres operator, additionally, is important to setup the connection info. You could add by code, in this case, we are going to use the simpler approach, you must go to Airflow UI and click on Admin –> Connections, once in there, it is required to submit the host, port, user and password. In our case, we will use the same PostgreSQL instance that is part of the Airflow configuration (and which stored the config data).

Here is an example:

The first step requires that we use a special operator that helps us to interact against PostgreSQL database, as you can read here, there is a wide list of operators which represent an abstraction and we do not need to concern about how the operations are solved.

from airflow.providers.postgres.operators.postgres import PostgresOperator

create_db_table = PostgresOperator(
        task_id='create_db_table',    
        postgres_conn_id = 'postgres',
        sql = '''
            CREATE TABLE IF NOT EXISTS users (
            firstname TEXT NOT NULL,
            lastname TEXT NOT NULL,
            country TEXT NOT NULL,
            username TEXT NOT NULL,
            password TEXT NOT NULL,
            email TEXT NOT NULL
            );
        '''
    )

As you can see, this operator requires three main parameters, the task_id is always recommended to have the same name as the variable, the postgres_conn_id which should correspond to the connection created in the previous step and finally the SQL code that we are going to execute, it should be idempotent.

The next key component is the import process, we have the possibility of using a special type of component called Hook, which can be defined as an abstraction of a specific API that allows us to interact with an external system, for instance: PostgreSQL, BigQuery, etc. The following example allows us to interact with Postgresq and execute a predefined task (copy_expert) which helps us to copy the content of the CSV file (stored as part of a Python task) into the table defined previously.

from airflow.providers.postgres.hooks.postgres import PostgresHook

def _import_user_into_db():
    hook = PostgresHook(postgres_conn_id='postgres')
    hook.copy_expert(
        sql="COPY users FROM stdin WITH DELIMITER as ','",
        filename="/tmp/user_normalized.csv")

At this point, we are going ready to define the complete DAG code, here is the final version of the code:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

from pandas import json_normalize
from datetime import datetime
from pathlib import Path

def _store_user_in_csv():
    my_path_csv = Path('/tmp/user_normalized.csv')
    my_user = [{
    'firstname': 'Geo',
    'lastname': 'Hernandez',
    'country': 'Spain',
    'username': 'g',
    'password': 'nothing',
    'email': 'test@mycol.com'},
    {
        'firstname': 'Octi',
        'lastname': 'Hernandez',
        'country': 'Spain',
        'username': 'o',
        'password': 'mypass',
        'email': 'testoct@mycol.com'}]
    my_user_normalized = json_normalize(my_user)
    my_user_normalized.to_csv(my_path_csv,index=None, header=False)

def _import_user_into_db():
    hook = PostgresHook(postgres_conn_id='postgres')
    hook.copy_expert(
        sql="COPY users FROM stdin WITH DELIMITER as ','",
        filename="/tmp/user_normalized.csv")


with DAG(
    dag_id='storing_user_postgres',
    start_date = datetime(2023,6,2),
    schedule_interval = '@daily',
    catchup = False
) as dag:
    create_db_table = PostgresOperator(
        task_id='create_db_table',
        postgres_conn_id = 'postgres',
        sql = '''
            CREATE TABLE IF NOT EXISTS users (
            firstname TEXT NOT NULL,
            lastname TEXT NOT NULL,
            country TEXT NOT NULL,
            username TEXT NOT NULL,
            password TEXT NOT NULL,
            email TEXT NOT NULL
            );
        '''
    )
    store_user_in_csv = PythonOperator(
        task_id='store_user_in_csv',
        python_callable = _store_user_in_csv
    )

    import_user_into_db = PythonOperator(
        task_id='import_user_into_db',
        python_callable = _import_user_into_db
    )

    create_db_table >> store_user_in_csv >> import_user_into_db

Run the respective DAG and you should have a diagram and results like this:

In case you need to confirm that everything was stored as expected, you could check the log of the import_user_into_db or connect to the PostgreSQL instance and query the table users. I hope this post would be useful and happy coding!

geohernandez

Recent Posts

Getting Started with Snowflake’s Snowpipe for Data Ingestion on Azure

In this post, I'll explain how to take full advantage of Snowflake's Snowpipe for reliable…

2 weeks ago

Integrating Azure Blob Storage with Snowflake: A Guide to Storage Integration and Staging Data

This article offers a comprehensive, step-by-step guide for integrating Azure Blob Storage with a Snowflake…

2 weeks ago

Getting Started with SnowSQL: Connecting to Your Snowflake Account

In this quick guide, we’ll walk through the essential steps to connect to Snowflake using…

3 months ago

A new step in my career as a Senior Data Architect

I am thrilled to share that I have embarked on a new professional journey as…

7 months ago

Data Modeling and its relevance in the Cloud Era

Since 2005, I've immersed myself in the dynamic world of data and its modeling. It's…

9 months ago

Formatting our Postgres scripts with pgformatter in DBeaver

Are you a PostgreSQL enthusiast using DBeaver on a Windows Platform? If you find yourself…

1 year ago