[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)

2024. 7. 25. 15:31·Minding's Programming/Airflow
728x90
반응형

DAG 내에서 task의 순서를 지정하고, 특정 조건 별로 실행시키는 법을 알아보았다. 하지만 task보다 더 큰 범주인 DAG을 순서에 따라 실행시키려면 어떻게 해야할까?

 

DAG 간 의존관계를 설정하는 방법에는 크게 두 가지가 있다고 한다.

 

1) TriggerDagRun Operator 사용

2) ExternalTask Sensor 사용

 

이 두가지 방법의 차이점은 아래 표와 같이 정리할 수 있다.

  TriggerDagRun Operator External Task Sensor
방식 실행할 다른 DAG의 ID를 지정해 수행 본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행
사용시점(권고) Trigger되는 DAG의 선행 DAG이 하나일 경우 Trigger되는 DAG의 선행 DAG이 2개 이상일 경우

위와 같이 두 가지 방법은 수행방식과 적절한 사용 시점에도 차이가 있다. 다른 DAG을 실행하기 전에 체크해야 할 DAG의 개수에 따라 차이가 난다고 보면 된다.

 

 

TriggerDagRun Operator 사용하는 방법

 

TriggerDagRun Operator는 DAG 내의 Task로 설정할 수 있는데, 특정 함수를 실행시키는 것이 아닌 DAG을 실행시키는 오퍼레이터이다.

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

as dag:
    trigger_dag_task = TriggerDagRunOperator(
    	task_id='trigger_dag_task', # 필수값
        trigger_dag_id='dags_python_operator', # 다음에 실행시킬 dag의 ID, 필수값
        trigger_run_id=None,
        execution_date='{{ data_interval_end }}',
        reset_dag_run=True,
        wait_for_completion=False,
        poke_interval=60,
        allowed_states=['success'],
        failed_states=None
    )

TriggerDagRunOperator에는 꽤 많은 파라미터가 존재하는데, 이 중 필수로 들어가야하는 값은 task_id와 실행시킬 Dag의 ID를 입력하는 trigger_dag_id이다.

 

그리고 바로 하단에 trigger_run_id 라는 파라미터가 보이는데, 여기서 run_id는 DAG의 수행 방식과 시간을 유일하게 식별해주는 Key라고 할 수 있다. 같은 시간에 수행됐더라도 수행 방식이 다를 수 있는데, 그것에 따라 키가 달라진다. 예를 들어 스케줄에 의해 수행된 경우 run_id는 "scheduled_{{data_interval_start}}" 의 형식으로 표기된다.

  • Schedule: 스케줄에 의해 DAG이 수행됐을 때
  • Manual: 직접 수행버튼을 눌러 DAG이 수행됐을 때
  • Backfill: 과거 날짜에 대해 DAG을 수행했을 때

다시 돌아와 trigger_run_id 파라미터에 대해 설명하자면, 위와 같이 자동으로 지정되는 run_id를 직접 지정해줄 수 있는 파라미터이다.

 

다른 파라미터를 설명하면 아래 표와 같다.

execution_date 해당 DAG이 실행될 경우 manual_{{execution_date}}의 id값으로 수행되는데, 여기에 들어갈 날짜 정보
reset_dag_run 동일한 run_id가 있어도 수행할 것인지에 대해 묻는 파라미터, True일 경우 동일한 id가 있어도 dag을 수행한다.
wait_for_completion 현재 task에서 수행되는 DAG이 완료(성공) 후 다음 task를 실행시킬지를 묻는 파라미터, True일 경우 해당 DAG이 모두 완료되어야 다음 task를 실행한다.
poke_interval DAG이 수행 완료되었는지 확인하는 주기 (초)
allowed_states 수행시킬 DAG의 어떤 수행 결과에 따라 해당 task을 성공으로 처리할 지에 대한 파라미터 (ex. DAG의 결과가 성공/실패 모두 task도 성공 = ['success', 'failed']
failed_states 수행시킬 DAG의 어떤 수행 결과에 따라 해당 task을 실패로 처리할 지에 대한 파라미터

 

이제 TriggerDagRunOperator가 포함된 DAG을 서버에 올려 실행시켜보았다.

앞서 그래프를 살펴보면 위와 같이 BashOperator가 있는 start_task 뒤에 tirgger_dag_task라는 이름으로 'dags_python_operator'라는 이름의 DAG을 실행시키는 것으로 연결된 것을 알 수 있다.

 

실행 이후 dags_python_oprator로 이동한 뒤 task의 detail 탭을 살펴보면,

Run ID가 manual로 표시되며 이후 시간이 UTC 기준 00시 30분으로 되어있는 것을 볼 수 있다. execution_date를 {{data_interval_start}} 값으로 주었기 때문에, DAG의 수행 시간인 (한국 기준) 09시 30분이 노출되는 것을 볼 수 있다.

 

 

ExternalTask Sensor 사용하는 방법

 

위에서 본 방법처럼 Operator를 사용하는 것이 아닌 Sensor를 이용해 다른 DAG과의 의존관계를 설정시킬 수도 있다. ( Airflow의 센서 (BashSensor, FileSensor, PythonSensor, ExternalTaskSensor, CustomSensor)

 

ExternalTask Sensor의 파라미터는 굉장히 많은 편이다. 아래 표와 같이 정리해보았다.

파라미터 필수여부 설명
external_dag_id O 센싱할 DAG 명
external_task_id X
(셋 중 하나만 입력 가능,
없으면 dag만 센싱)
센싱할 task_id 명
external_task_ids 센싱할 1개 이상의 task_id명 (list)
external_task_group_id 센싱할 task_group_id 명
allowed_states X
(같은 상태가 입력되면 안됨)
+ from airflow.utils.state import State 필요
센서가 Success 되기 위한 센싱 대상의 상태
skipped_states 센서가 Skipped 되기 위한 센싱 대상의 상태
failed_states 센서가 Failed 되기 위한 센싱 대상의 상태
execution_delta X
(둘 중 하나만 입력 가능)
현재 DAG과 센싱할 DAG의 data_interval_start의 차이를 입력
(본 DAG보다 센싱할 DAG이 얼마나 과거에 있는지 양수로 입력 / ex) 센싱할 DAG이 6시간 앞설 경우 timedelta(hours=6) )
execution_date_fn 센싱할 DAG의 data_interval_start를 구하기 위한 함수
check_existence X 해당 dag_id 또는 task_id가 있는지 확인

위 파라미터 중 가장 유의해야 할 파라미터는 external_dag_id와 execution_delta 두 가지라고 볼 수 있다. 일단 exteranl_dag_id는 우선 필수적으로 들어가야 할 파라미터이기 때문이다.

 

또한 execution_delta는 현재 DAG과 센싱할 DAG의 시간 차이를 입력하는 파라미터인데, 상수로 입력해야 한다는 점에 유의해야한다.(6시간 전에 실행하는 DAG일 경우 hours=-6이 아닌 hours=6)

 

만약 센싱할 DAG이 한 달마다 실행하는 DAG이고, 현재 DAG이 하루마다 실행하는 조건이라면 execution_date_fn을 사용하면 된다. 센싱할 DAG과 현재 DAG의 실행 시간(data_interval_start) 차이를 구하는 함수를 이 곳에 넣은 뒤 활용할 수 있다.

 

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State 

with DAG(
    dag_id='dags_external_task_sensor',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule='0 7 * * *',
    catchup=False
) as dag:
    external_task_sensor_a = ExternalTaskSensor(
        task_id='external_task_sensor_a',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_a',
        allowed_states=[State.SKIPPED], # State 라이브러리 활용해야 함
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

    external_task_sensor_b = ExternalTaskSensor(
        task_id='external_task_sensor_b',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_b',
        failed_states=[State.SKIPPED],
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

    external_task_sensor_c = ExternalTaskSensor(
        task_id='external_task_sensor_c',
        external_dag_id='dags_branch_python_operator',
        external_task_id='task_c',
        allowed_states=[State.SUCCESS],
        execution_delta=timedelta(hours=6),
        poke_interval=10        #10초
    )

3개의 external_task_sensor를 만들었고, 모두 같은 DAG을 센싱하지만 센싱할 task는 각각 다르도록 설정했다. task_a의 경우 skipped 상태일 때 해당 sensor가 success 처리되도록 하는 등 각각 다르게 처리했다. 센싱할 DAG인 dags_branch_python_operator는 random값으로 task_a, task_b, task_c를 리턴해 분기 처리해주는 DAG이다.

 

결과는 모두 Success 처리가 되었다. 센싱할 DAG은 어떻게 처리가 되었을까?

 

센싱 대상인 dags_branch_python_operator는 task_b와 task_c가 success 처리되었고, task_a는 skipped 처리되었다. 우리가 위 코드에서 의도한 바에 따라 task_a는 skipped 처리되었을 경우 sensor에서는 success 처리가 된 것을 알 수 있고, 다른 task도 마찬가지로 의도한 바에 따라 처리된 것을 알 수 있다.

728x90

'Minding's Programming > Airflow' 카테고리의 다른 글

[Airflow] Custom Operator 개발해보기 (BaseOperator)  (0) 2024.07.26
[Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기  (0) 2024.07.25
[Airflow] Task Group  (1) 2024.07.24
[Airflow] Trigger Rule  (0) 2024.07.24
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)  (2) 2024.07.24
'Minding's Programming/Airflow' 카테고리의 다른 글
  • [Airflow] Custom Operator 개발해보기 (BaseOperator)
  • [Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기
  • [Airflow] Task Group
  • [Airflow] Trigger Rule
Minding
Minding
  • Minding
    Today's Minding
    Minding
  • 전체
    오늘
    어제
    • 울고넘는 딥러닝 (278)
      • Minding's Baseball (57)
        • MLB Statcast (29)
        • 머신러닝으로 홈런왕 예측하기 (3)
        • 야구칼럼 (12)
        • 야구 규칙, 용어 (1)
        • 2022-23 질롱 코리아 (8)
        • 류현진 등판경기 (4)
      • Minding's Programming (185)
        • 프로그래머스 코딩테스트 (21)
        • Knowledge (44)
        • Numpy & Pandas (6)
        • Excel (3)
        • Git (1)
        • Pygame (11)
        • CV (3)
        • Tensorflow tutorial (4)
        • Kaggle and Dacon (4)
        • 에러 코드 (8)
        • FastAPI (8)
        • Airflow (29)
        • Crawling (6)
        • Django (14)
        • AWS (18)
        • Spark (5)
      • Minding's Reading (30)
        • 머신러닝 딥러닝에 필요한 기초 수학 with 파이.. (2)
        • 칼만필터는 어렵지 않아 (11)
        • 밑바닥부터 시작하는 딥러닝 (6)
        • 메이저리그 야구 통계학 2e (8)
        • 논문읽기 (2)
        • 빅데이터를 지탱하는 기술 (1)
      • Minding's Life (5)
        • 주식 (4)
        • 각종 소식 (1)
  • 블로그 메뉴

    • 홈
    • Baseball
    • Programming
    • Reading
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    칼만필터
    칼만필터는어렵지않아파이썬
    데이터 엔지니어
    파이썬게임개발
    데이터분석
    django
    넘파이
    머신러닝
    프로그래머스
    MLB
    Airflow
    pygame
    야구
    코딩테스트
    게임개발
    파이게임
    칼만필터는어렵지않아python
    FastAPI
    django python
    mlb stats api
    에어플로우
    질롱코리아
    KBO
    메이저리그
    Python
    칼만필터는어렵지않아
    파이썬
    AWS
    딥러닝
    KalmanFilter
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
Minding
[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)
상단으로

티스토리툴바