Apache Airflow Wait Between Tasks

 Apache Airflow allows you to create custom workflows that can be triggered on a recurring basis at specific time intervals based on your needs.

Some concepts to learn are

DAGs - this defines the entire workflow

Tasks - are the Individual tasks that constitute the workflow

Operators - This is an airflow-specific concept, which needs to be understood, Based on what type of tasks you would like to perform there are multiple operators. To run a bash script there is a BashOperator, to run a python code you need to have a Python Operator.


For more details, refer https://airflow.apache.org/docs/apache-airflow/stable/concepts/index.html

Above 3 are sufficient to get started, ofcourse there are others like interprocess communication, variables, process dependency management, etc.

Let's Define a Simple Workflow we want to achieve

DAG = Tasks1 >> Wait for a Day >> Task2

Above is the DAG, where two tasks are executed, second task depends on first task and is executed after a delay of one day.

Lets explore how can we achieve this, 

We will create two operatators for task1 and task2 and a task for delay, that delay task will sleep for the designated period and then executed the task2. 

Like so

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": task1Date.strftime("%Y-%m-%d"),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
}

## create dag
import time
from airflow.operators.python import PythonOperator

dag = DAG(
dag_id=12345,
schedule_interval=@monthly,
default_args=default_args,
catchup=False,
max_active_runs=1,
tags=['dtc-de'],
)
with dag:
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")

task1Def = PythonOperator(
task_id="123444",
python_callable=executeTask1,
op_kwargs={
"arg1": "sample arg"
},

)
task2Def = PythonOperator(
task_id="123457",
python_callable=executeTask2,
op_kwargs={
"arg": "sample_arg"
},
)

      delay_def: PythonOperator = PythonOperator(
          task_id="delay_python_task",
          dag=my_dag,
          python_callable=lambda: time.sleep(300))
        
start >> task1Def >> delay_def >> task2Def >> end
return dag


Comments

Popular posts from this blog

Java J2EE Security Considerations

Java Spring Interview Questions