My target is to do basic data transformation using python in Airflow dag.
Below script is divided into 3 tasks:
Data (dataframe), is sharing between tasks by writing the file localy to parquet format.
#~/airflow/dags/employee_transformation.py
import datetime
import pendulum
from airflow.decorators import dag, task
import pandas as pd
import numpy as np
import csv
import os
from airflow.providers.postgres.hooks.postgres import PostgresHook
@dag(
dag_id="employee_transformation",
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def transform_data():
"""
### Documentation
To share data between tasks, I'm writing the file localy to parquet format.
See => [link](https://php.awilewski.pl/home/basic-airflow-transformation)
"""
@task
def get_data():
#connect to db, and extract data to dataframe
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
df = postgres_hook.get_pandas_df(sql="SELECT * FROM public.employeehistory")
#changing object data type, to appropriate: datetime or string
df['birthdate'] = pd.to_datetime(df['birthdate'])
df['hiredate'] = pd.to_datetime(df['hiredate'])
df['rowguid'] = df['rowguid'].astype("string")
#write dataframe to a file
df.to_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')
@task
def manipulate_data():
df = pd.read_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')
# print count df rows, into csv
data_print = df.shape[0]
# convert int to list
data_print = [str(datetime.datetime.now()) + ' Rows count: ' + str(data_print)]
with open('/home/wilek666/Desktop/employeehistory.csv', 'a', encoding='UTF8', newline='') as f:
writer = csv.writer(f, delimiter=',')
# write the data
writer.writerow(data_print)
# filter data frame
df = df[(df['birthdate'] >= np.datetime64('1969-01-01 00:00:00'))] # datetime.date(1969, 1, 1))]
df = df[(df['birthdate'] >= np.datetime64('1970-01-01 00:00:00'))] # datetime.date(1970, 1, 1))]
# print count df rows, into csv
data_print = df.shape[0]
# convert int to list
data_print = [str(datetime.datetime.now()) + ' Rows count after filter: ' + str(data_print)]
with open('/home/wilek666/Desktop/employeehistory.csv', 'a', encoding='UTF8', newline='') as f:
writer = csv.writer(f, delimiter=',')
# write the data
writer.writerow(data_print)
# add column if_boss by left outer join
if_boss_dict = {
'if_boss': ['yes'],
'value': ['Chief Executive Officer']
}
if_boss = pd.DataFrame(if_boss_dict)
df = pd.merge(df, if_boss, how='left', left_on='jobtitle', right_on='value')
# add column if_boss_boolean
df = df.assign(if_boss_boolean=np.where(df['if_boss'] == 'yes', True, False))
# select columns
df = df[['birthdate', 'gender', 'if_boss', 'if_boss_boolean']]
df.to_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')
@task
def write_data():
df = pd.read_parquet('/home/wilek666/Desktop/employeehistory.parquet', engine='fastparquet')
# print sample output, into csv
data_print = df.head(5)
data_print.to_csv('/home/wilek666/Desktop/employeehistory.csv', mode='a', sep='\t', encoding='utf-8', header=True)
# convert dataframe to json file
df.to_json('/home/wilek666/Desktop/employeehistory.json', orient='records', lines=True)
get_data() >> manipulate_data() >> write_data()
dag = transform_data()
Tasks status =>
Files are ready =>