본문 바로가기

Minding's Programming/Airflow

[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)

728x90
반응형

DAG 내에서 task의 순서를 지정하고, 특정 조건 별로 실행시키는 법을 알아보았다. 하지만 task보다 더 큰 범주인 DAG을 순서에 따라 실행시키려면 어떻게 해야할까?

 

DAG 간 의존관계를 설정하는 방법에는 크게 두 가지가 있다고 한다.

 

1) TriggerDagRun Operator 사용

2) ExternalTask Sensor 사용

 

이 두가지 방법의 차이점은 아래 표와 같이 정리할 수 있다.

  TriggerDagRun Operator External Task Sensor
방식 실행할 다른 DAG의 ID를 지정해 수행 본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행
사용시점(권고) Trigger되는 DAG의 선행 DAG이 하나일 경우 Trigger되는 DAG의 선행 DAG이 2개 이상일 경우

위와 같이 두 가지 방법은 수행방식과 적절한 사용 시점에도 차이가 있다. 다른 DAG을 실행하기 전에 체크해야 할 DAG의 개수에 따라 차이가 난다고 보면 된다.

 

 

TriggerDagRun Operator 사용하는 방법

 

TriggerDagRun Operator는 DAG 내의 Task로 설정할 수 있는데, 특정 함수를 실행시키는 것이 아닌 DAG을 실행시키는 오퍼레이터이다.

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

as dag:
    trigger_dag_task = TriggerDagRunOperator(
    	task_id='trigger_dag_task', # 필수값
        trigger_dag_id='dags_python_operator', # 다음에 실행시킬 dag의 ID, 필수값
        trigger_run_id=None,
        execution_date='{{ data_interval_end }}',
        reset_dag_run=True,
        wait_for_completion=False,
        poke_interval=60,
        allowed_states=['success'],
        failed_states=None
    )

TriggerDagRunOperator에는 꽤 많은 파라미터가 존재하는데, 이 중 필수로 들어가야하는 값은 task_id와 실행시킬 Dag의 ID를 입력하는 trigger_dag_id이다.

 

그리고 바로 하단에 trigger_run_id 라는 파라미터가 보이는데, 여기서 run_id는 DAG의 수행 방식과 시간을 유일하게 식별해주는 Key라고 할 수 있다. 같은 시간에 수행됐더라도 수행 방식이 다를 수 있는데, 그것에 따라 키가 달라진다. 예를 들어 스케줄에 의해 수행된 경우 run_id는 "scheduled_{{data_interval_start}}" 의 형식으로 표기된다.

  • Schedule: 스케줄에 의해 DAG이 수행됐을 때
  • Manual: 직접 수행버튼을 눌러 DAG이 수행됐을 때
  • Backfill: 과거 날짜에 대해 DAG을 수행했을 때

다시 돌아와 trigger_run_id 파라미터에 대해 설명하자면, 위와 같이 자동으로 지정되는 run_id를 직접 지정해줄 수 있는 파라미터이다.

 

다른 파라미터를 설명하면 아래 표와 같다.

execution_date 해당 DAG이 실행될 경우 manual_{{execution_date}}의 id값으로 수행되는데, 여기에 들어갈 날짜 정보
reset_dag_run 동일한 run_id가 있어도 수행할 것인지에 대해 묻는 파라미터, True일 경우 동일한 id가 있어도 dag을 수행한다.
wait_for_completion 현재 task에서 수행되는 DAG이 완료(성공) 후 다음 task를 실행시킬지를 묻는 파라미터, True일 경우 해당 DAG이 모두 완료되어야 다음 task를 실행한다.
poke_interval DAG이 수행 완료되었는지 확인하는 주기 (초)
allowed_states 수행시킬 DAG의 어떤 수행 결과에 따라 해당 task을 성공으로 처리할 지에 대한 파라미터 (ex. DAG의 결과가 성공/실패 모두 task도 성공 = ['success', 'failed']
failed_states 수행시킬 DAG의 어떤 수행 결과에 따라 해당 task을 실패로 처리할 지에 대한 파라미터

 

이제 TriggerDagRunOperator가 포함된 DAG을 서버에 올려 실행시켜보았다.

앞서 그래프를 살펴보면 위와 같이 BashOperator가 있는 start_task 뒤에 tirgger_dag_task라는 이름으로 'dags_python_operator'라는 이름의 DAG을 실행시키는 것으로 연결된 것을 알 수 있다.

 

실행 이후 dags_python_oprator로 이동한 뒤 task의 detail 탭을 살펴보면,

Run ID가 manual로 표시되며 이후 시간이 UTC 기준 00시 30분으로 되어있는 것을 볼 수 있다. execution_date를 {{data_interval_start}} 값으로 주었기 때문에, DAG의 수행 시간인 (한국 기준) 09시 30분이 노출되는 것을 볼 수 있다.

 

 

ExternalTask Sensor 사용하는 방법

 

위에서 본 방법처럼 Operator를 사용하는 것이 아닌 Sensor를 이용해 다른 DAG과의 의존관계를 설정시킬 수도 있다. ( Airflow의 센서 (BashSensor, FileSensor, PythonSensor, ExternalTaskSensor, CustomSensor)

 

ExternalTask Sensor의 파라미터는 굉장히 많은 편이다. 아래 표와 같이 정리해보았다.

파라미터 필수여부 설명
external_dag_id O 센싱할 DAG 명
external_task_id X
(셋 중 하나만 입력 가능,
없으면 dag만 센싱)
센싱할 task_id 명
external_task_ids 센싱할 1개 이상의 task_id명 (list)
external_task_group_id 센싱할 task_group_id 명
allowed_states X
(같은 상태가 입력되면 안됨)
+ from airflow.utils.state import State 필요
센서가 Success 되기 위한 센싱 대상의 상태
skipped_states 센서가 Skipped 되기 위한 센싱 대상의 상태
failed_states 센서가 Failed 되기 위한 센싱 대상의 상태
execution_delta X
(둘 중 하나만 입력 가능)
현재 DAG과 센싱할 DAG의 data_interval_start의 차이를 입력
(본 DAG보다 센싱할 DAG이 얼마나 과거에 있는지 양수로 입력 / ex) 센싱할 DAG이 6시간 앞설 경우 timedelta(hours=6) )
execution_date_fn 센싱할 DAG의 data_interval_start를 구하기 위한 함수
check_existence X 해당 dag_id 또는 task_id가 있는지 확인

위 파라미터 중 가장 유의해야 할 파라미터는 external_dag_id와 execution_delta 두 가지라고 볼 수 있다. 일단 exteranl_dag_id는 우선 필수적으로 들어가야 할 파라미터이기 때문이다.

 

또한 execution_delta는 현재 DAG과 센싱할 DAG의 시간 차이를 입력하는 파라미터인데, 상수로 입력해야 한다는 점에 유의해야한다.(6시간 전에 실행하는 DAG일 경우 hours=-6이 아닌 hours=6)

 

만약 센싱할 DAG이 한 달마다 실행하는 DAG이고, 현재 DAG이 하루마다 실행하는 조건이라면 execution_date_fn을 사용하면 된다. 센싱할 DAG과 현재 DAG의 실행 시간(data_interval_start) 차이를 구하는 함수를 이 곳에 넣은 뒤 활용할 수 있다.

 

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State 

with DAG(
    dag_id='dags_external_task_sensor',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule='0 7 * * *',
    catchup=False
) as dag:
    external_task_sensor_a = ExternalTaskSensor(
        task_id='external_task_sensor_a',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_a',
        allowed_states=[State.SKIPPED], # State 라이브러리 활용해야 함
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

    external_task_sensor_b = ExternalTaskSensor(
        task_id='external_task_sensor_b',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_b',
        failed_states=[State.SKIPPED],
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

    external_task_sensor_c = ExternalTaskSensor(
        task_id='external_task_sensor_c',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_c',
        allowed_states=[State.SUCCESS],
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

3개의 external_task_sensor를 만들었고, 모두 같은 DAG을 센싱하지만 센싱할 task는 각각 다르도록 설정했다. task_a의 경우 skipped 상태일 때 해당 sensor가 success 처리되도록 하는 등 각각 다르게 처리했다. 센싱할 DAG인 dags_branch_python_operator는 random값으로 task_a, task_b, task_c를 리턴해 분기 처리해주는 DAG이다.

 

결과는 모두 Success 처리가 되었다. 센싱할 DAG은 어떻게 처리가 되었을까?

 

센싱 대상인 dags_branch_python_operator는 task_b와 task_c가 success 처리되었고, task_a는 skipped 처리되었다. 우리가 위 코드에서 의도한 바에 따라 task_a는 skipped 처리되었을 경우 sensor에서는 success 처리가 된 것을 알 수 있고, 다른 task도 마찬가지로 의도한 바에 따라 처리된 것을 알 수 있다.

728x90