My target is to do basic data transformation using python in Airflow dag.

Below script is divided into 3 tasks:

  • 1) connect to postgres database and extract data to dataframe
  • 2) transform dataframe
  • 3) write log information into csv file, and dataframe results into json file

Data (dataframe), is sharing between tasks by writing the file localy to parquet format.

#~/airflow/dags/employee_transformation.py
import datetime
import pendulum
from airflow.decorators import dag, task
import pandas as pd
import numpy as np
import csv
import os
from airflow.providers.postgres.hooks.postgres import PostgresHook

@dag(
    dag_id="employee_transformation",
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
)

def transform_data():

    """
    ### Documentation
    To share data between tasks, I'm writing the file localy to parquet format.
    See => [link](https://php.awilewski.pl/home/basic-airflow-transformation)
    """

    @task
    def get_data():

        #connect to db, and extract data to dataframe
        postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
        conn = postgres_hook.get_conn()
        cur = conn.cursor()
        df = postgres_hook.get_pandas_df(sql="SELECT * FROM public.employeehistory")
        #changing object data type, to appropriate: datetime or string
        df['birthdate'] = pd.to_datetime(df['birthdate'])
        df['hiredate'] = pd.to_datetime(df['hiredate'])
        df['rowguid'] = df['rowguid'].astype("string")
        #write dataframe to a file
        df.to_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')

    @task
    def manipulate_data():

        df = pd.read_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')
        # print count df rows, into csv
        data_print = df.shape[0]
        # convert int to list
        data_print = [str(datetime.datetime.now()) + ' Rows count: ' + str(data_print)]
        with open('/home/wilek666/Desktop/employeehistory.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f, delimiter=',')
            # write the data
            writer.writerow(data_print)

        # filter data frame
        df = df[(df['birthdate'] >= np.datetime64('1969-01-01 00:00:00'))] # datetime.date(1969, 1, 1))]
        df = df[(df['birthdate'] >= np.datetime64('1970-01-01 00:00:00'))] # datetime.date(1970, 1, 1))]

        # print count df rows, into csv
        data_print = df.shape[0]
        # convert int to list
        data_print = [str(datetime.datetime.now()) + ' Rows count after filter: ' + str(data_print)]
        with open('/home/wilek666/Desktop/employeehistory.csv', 'a', encoding='UTF8', newline='') as f:
            writer = csv.writer(f, delimiter=',')
            # write the data
            writer.writerow(data_print)

        # add column if_boss by left outer join
        if_boss_dict = {
            'if_boss': ['yes'],
            'value': ['Chief Executive Officer']
        }
        if_boss = pd.DataFrame(if_boss_dict)
        df = pd.merge(df, if_boss, how='left', left_on='jobtitle', right_on='value')
        # add column if_boss_boolean
        df = df.assign(if_boss_boolean=np.where(df['if_boss'] == 'yes', True, False))

        # select columns
        df = df[['birthdate', 'gender', 'if_boss', 'if_boss_boolean']]

        df.to_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')

    @task
    def write_data():

        df = pd.read_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')

        # print sample output, into csv
        data_print = df.head(5)
        data_print.to_csv('/home/wilek666/Desktop/employeehistory.csv', mode='a', sep='\t', encoding='utf-8', header=True)

        # convert dataframe to json file
        df.to_json('/home/wilek666/Desktop/employeehistory.json', orient='records', lines=True)

    get_data() >> manipulate_data() >> write_data()

dag = transform_data()

Tasks status =>

Files are ready =>

Next Post