[Airflow] Trigger Rule

2024. 7. 24. 16:52·Minding's Programming/Airflow
728x90
반응형

이전에 branch, 즉 분기처리 개념을 통해 상위 task에서 상황에 따라 분기 처리하여 여러 개의 하위 task 중 선택 실행하는 방법을 배워봤다.(링크)

 

[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)

Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (tas

minding-deep-learning.tistory.com

 

이번에는 상위 task가 여러 개이고, 특정 조건에서만 하위 task를 실행하는 조건의 개념인 Trigger Rule에 대해 공부해보고자 한다.

 

Trigger Rule

 

위에서 설명했듯이, Trigger Rule은 상위 task의 상태에 따라 하위 task의 수행여부를 결정하는 '조건'을 뜻한다. 기본값은 상위 task 3개가 모두 성공했을 때에만 하위 task를 수행하는 것이다. 하지만 이 조건을 수정하고 싶을 때에는 Trigger Rule을 이용해 조건을 바꿔줄 수 있다.(ex. 3개 중 하나라도 성공할 시에 하위 task 수행)

 

 

Trigger Rule의 종류

all_success ( 기본값) 상위 task가 모두 성공하면 실행
all_failed 상위 task가 모두 실패하면 실행
all_done 상위 task가 모두 수행되면 실행 (성공/실패여부 상관없이)
all_skipped 상위 task가 모두 skipped 상태면 실행
one_failed 상위 task 중 하나 이상 실패하면 실행 (모든 상위 task 완료를 기다리지 않음)
one_success 상위 task 중 하나 이상 성공하면 실행 (모든 상위 task 완료를 기다리지 않음)
one_done 상위 task 중 하나 이상 성공 또는 실패하면 실행
none_failed 상위 task 중 실패가 없는 경우 실행 (성공 or skipped)
none_failed_min_one_success 상위 task 중 실패가 없고 성공한 task가 적어도 1개 이상이면 실행
none_skipped skipped된 사위 task가 없으면 실행 (성공/실패여부 상관없이)
always 언제나 실행

기본적으로 Trigger Rule은 하위 task에 거는 조건이며, 파라미터에 넣을 수 있는 종류는 위 표와 같이 11가지이다.

 

이 중 all_done과 none_skipped에 대해 코드를 작성해보려고 한다.

# all_done

bash_upstream_1 = BashOperator(
    task_id = 'bash_upstream_1',
    bash_command='echo upstream1'
)

@task(task_id='python_upstream_1')
def python_upstream_1():
    raise AirflowException('downstream_1 Exception!') # 실패 처리를 위해 강제로 Exception
    
@task(task_id='python_upstream_2')
def python_upstream_2():
    print('정상 처리')
    
@task(task_id='python_downstream_1', trigger_rule='all_done') # Trigger Rule 설정해주기
def python_downstream_1():
    print('정상 처리')
    
[bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()

all_done 코드의 경우에는 python_upsteream_1() 함수에 AirflowException을 내려주어 실패 처리하게 만들었다. 하지만 Trigger Rule을 'all_done'으로 지정해주었으니, python_downstream_1() 함수는 상위 task가 수행이 끝난 뒤에 바로 실행이 되어야 할 것이다.

 

그래프를 살펴봤을 때, 의도한 대로 python_upstream_1 task는 failed 처리가 되었지만 후행 task인 python_downstream_1은 Trigger Rule에 의해 성공적으로 실행된 것을 확인해볼 수 있다.

 

# none_skipped

    @task.branch(task_id='branching')
    def random_branch():
        import random
        item_lst = ['A', 'B', 'C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item == 'B':
            return 'task_b'
        elif selected_item == 'C':
            return 'task_c'

    task_a = BashOperator(
        task_id='task_a',
        bash_command='echo upstream1'
    )

    @task(task_id='task_b')
    def task_b():
        print('정상 처리')


    @task(task_id='task_c')
    def task_c():
        print('정상 처리')

    @task(task_id='task_d', trigger_rule='none_skipped')
    def task_d():
        print('정상 처리')

    random_branch() >> [task_a, task_b(), task_c()] >> task_d()

none_skipped 코드의 경우에는 제일 먼저 실행되는 random_branch를 통해 2번째로 실행될 task_a,b,c 중 한 개만 수행되도록 해 나머지 2개는 skipped 처리가 될 것이다. task_d의 경우 Trigger Rule을 'none_skipped'로 설정해 주었기 때문에, 상위 task 2개가 무조건 skipped 되는 상황이므로 실행되지 않아야 할 것이다.

 

해당 DAG을 실행시킨 뒤 Graph를 보면, task_c가 선택되어 task_b와 task_a가 skipped되었다. 그리고 Trigger Rule에 따라 task_d도 따라 skipped 처리된 것을 확인해볼 수 있다.

728x90

'Minding's Programming > Airflow' 카테고리의 다른 글

[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)  (0) 2024.07.25
[Airflow] Task Group  (1) 2024.07.24
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)  (2) 2024.07.24
[Airflow] 전역변수 Variable 이용하기  (1) 2024.07.23
[Airflow/Xcom] Airflow에서 Xcom 사용해보기  (0) 2024.07.23
'Minding's Programming/Airflow' 카테고리의 다른 글
  • [Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)
  • [Airflow] Task Group
  • [Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)
  • [Airflow] 전역변수 Variable 이용하기
Minding
Minding
  • Minding
    Today's Minding
    Minding
  • 전체
    오늘
    어제
    • 울고넘는 딥러닝 (278)
      • Minding's Baseball (57)
        • MLB Statcast (29)
        • 머신러닝으로 홈런왕 예측하기 (3)
        • 야구칼럼 (12)
        • 야구 규칙, 용어 (1)
        • 2022-23 질롱 코리아 (8)
        • 류현진 등판경기 (4)
      • Minding's Programming (185)
        • 프로그래머스 코딩테스트 (21)
        • Knowledge (44)
        • Numpy & Pandas (6)
        • Excel (3)
        • Git (1)
        • Pygame (11)
        • CV (3)
        • Tensorflow tutorial (4)
        • Kaggle and Dacon (4)
        • 에러 코드 (8)
        • FastAPI (8)
        • Airflow (29)
        • Crawling (6)
        • Django (14)
        • AWS (18)
        • Spark (5)
      • Minding's Reading (30)
        • 머신러닝 딥러닝에 필요한 기초 수학 with 파이.. (2)
        • 칼만필터는 어렵지 않아 (11)
        • 밑바닥부터 시작하는 딥러닝 (6)
        • 메이저리그 야구 통계학 2e (8)
        • 논문읽기 (2)
        • 빅데이터를 지탱하는 기술 (1)
      • Minding's Life (5)
        • 주식 (4)
        • 각종 소식 (1)
  • 블로그 메뉴

    • 홈
    • Baseball
    • Programming
    • Reading
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    게임개발
    mlb stats api
    AWS
    Airflow
    칼만필터는어렵지않아
    파이썬
    Python
    프로그래머스
    FastAPI
    질롱코리아
    KalmanFilter
    데이터분석
    에어플로우
    넘파이
    KBO
    파이게임
    pygame
    django python
    딥러닝
    메이저리그
    파이썬게임개발
    MLB
    django
    머신러닝
    코딩테스트
    데이터 엔지니어
    칼만필터
    칼만필터는어렵지않아파이썬
    야구
    칼만필터는어렵지않아python
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
Minding
[Airflow] Trigger Rule
상단으로

티스토리툴바