Airflow might be used for data transformation in python, as well as scheduler to execute SQL statement.
My target is, to set up airflow, to create table with historical employee list, trough PLPGSQL script, refreshed every 15 min.
Imagine we have table employee with 290 rows =>
1) First I'm creating a simple procedure in postgres, to create data snapshot =>
create or replace procedure p_employeehistory()
language plpgsql
as $$
begin
CREATE TABLE IF NOT EXISTS public.employeehistory (LIKE humanresources.employee);
ALTER TABLE public.employeehistory ADD COLUMN IF NOT EXISTS report_date TIMESTAMP;
INSERT INTO public.employeehistory
SELECT e.*,
(now() - interval '2 hour') as report_date
FROM humanresources.employee e;
end;$$;
2) Next I'm defining very simple Airflow DAG file - with schedule every 15 min, and calling a procedure with SQLExecuteQueryOperator =>
#~/airflow/dags/employee_history.py
import datetime
import pendulum
import requests
from airflow.decorators import dag
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@dag(
dag_id="employee_history",
schedule_interval="*/15 * * * *", #schedule every 15 min
start_date=pendulum.datetime(2024, 8, 19, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def EmployeeHistory():
create_employees_table = SQLExecuteQueryOperator(
task_id="employee_history",
conn_id="tutorial_pg_conn",
sql="""
CALL public.p_employeehistory(); --run procedure
""",
)
dag = EmployeeHistory()
3) After some time, you can see, our dag schedule is working, and multiple snapshots has been taken every 15 min =>
--Database side
SELECT report_date, count(*)
FROM public.employeehistory
group by report_date
order by 1 desc