Categories
Mastering Development

Airflow: Master Dag with ExternalTaskSensor gets stuck forever

The requirement is to have DAG run one after the other and on success of each DAG

I have a Master DAG in which I am calling all the DAG to get executed one after the other in sequence

Also in each of the dag_A, dag_B, dag_C I have to given schedule_interval = None and manually turn ON in GUI

I am using ExternalTaskSensor, coz even before all the tasks in the first dag_A gets completed, it kicks off the second dag_B, to avoid such issues I am using ExternalTaskSensor.If any better implementation please kindly let me know

Don’t know What I am missing here

Code: master_dag.py

import datetime
import os
from datetime import timedelta

from airflow.models import DAG, Variable

from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.sensors import ExternalTaskSensor

default_args = {
        'owner': 'airflow',
        'start_date': datetime.datetime(2020, 1, 7),
        'provide_context': True,
        'execution_timeout': None,
        'retries': 0,
        'retry_delay': timedelta(minutes=3),
        'retry_exponential_backoff': True,
        'email_on_retry': False,
    }


dag = DAG(
        dag_id='master_dag',
        schedule_interval='7 3 * * *',
        default_args=default_args,
        max_active_runs=1,
        catchup=False,
    )

trigger_dag_A = TriggerDagRunOperator(
    task_id='trigger_dag_A',
        trigger_dag_id='dag_A',
        dag=dag,
    )

wait_for_dag_A = ExternalTaskSensor(
    task_id='wait_for_dag_A',
    external_dag_id='dag_A',
    external_task_id='proc_success',
    poke_interval=60,
    allowed_states=['success'],
    dag=dag,
    )

trigger_dag_B = TriggerDagRunOperator(
        task_id='trigger_dag_B',
        trigger_dag_id='dag_B',
        dag=dag,
    )

wait_for_dag_B = ExternalTaskSensor(
    task_id='wait_for_dag_B',
    external_dag_id='dag_B',
    external_task_id='proc_success',
    poke_interval=60,
    allowed_states=['success'],
    dag=dag)

trigger_dag_C = TriggerDagRunOperator(
        task_id='trigger_dag_C',
        trigger_dag_id='dag_C',
        dag=dag,
    )

trigger_dag_A >> wait_dag_A >> trigger_dag_B >> wait_dag_B >> trigger_dag_C

Each of the DAG has multiple tasks running with last task been proc_success

Leave a Reply

Your email address will not be published. Required fields are marked *