본문 바로가기

Minding's Programming/Airflow

[Airflow] Trigger Rule

728x90
반응형

이전에 branch, 즉 분기처리 개념을 통해 상위 task에서 상황에 따라 분기 처리하여 여러 개의 하위 task 중 선택 실행하는 방법을 배워봤다.(링크)

 

[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)

Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (tas

minding-deep-learning.tistory.com

 

이번에는 상위 task가 여러 개이고, 특정 조건에서만 하위 task를 실행하는 조건의 개념인 Trigger Rule에 대해 공부해보고자 한다.

 

Trigger Rule

 

위에서 설명했듯이, Trigger Rule은 상위 task의 상태에 따라 하위 task의 수행여부를 결정하는 '조건'을 뜻한다. 기본값은 상위 task 3개가 모두 성공했을 때에만 하위 task를 수행하는 것이다. 하지만 이 조건을 수정하고 싶을 때에는 Trigger Rule을 이용해 조건을 바꿔줄 수 있다.(ex. 3개 중 하나라도 성공할 시에 하위 task 수행)

 

 

Trigger Rule의 종류

all_success ( 기본값) 상위 task가 모두 성공하면 실행
all_failed 상위 task가 모두 실패하면 실행
all_done 상위 task가 모두 수행되면 실행 (성공/실패여부 상관없이)
all_skipped 상위 task가 모두 skipped 상태면 실행
one_failed 상위 task 중 하나 이상 실패하면 실행 (모든 상위 task 완료를 기다리지 않음)
one_success 상위 task 중 하나 이상 성공하면 실행 (모든 상위 task 완료를 기다리지 않음)
one_done 상위 task 중 하나 이상 성공 또는 실패하면 실행
none_failed 상위 task 중 실패가 없는 경우 실행 (성공 or skipped)
none_failed_min_one_success 상위 task 중 실패가 없고 성공한 task가 적어도 1개 이상이면 실행
none_skipped skipped된 사위 task가 없으면 실행 (성공/실패여부 상관없이)
always 언제나 실행

기본적으로 Trigger Rule은 하위 task에 거는 조건이며, 파라미터에 넣을 수 있는 종류는 위 표와 같이 11가지이다.

 

이 중 all_done과 none_skipped에 대해 코드를 작성해보려고 한다.

# all_done

bash_upstream_1 = BashOperator(
    task_id = 'bash_upstream_1',
    bash_command='echo upstream1'
)

@task(task_id='python_upstream_1')
def python_upstream_1():
    raise AirflowException('downstream_1 Exception!') # 실패 처리를 위해 강제로 Exception
    
@task(task_id='python_upstream_2')
def python_upstream_2():
    print('정상 처리')
    
@task(task_id='python_downstream_1', trigger_rule='all_done') # Trigger Rule 설정해주기
def python_downstream_1():
    print('정상 처리')
    
[bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()

all_done 코드의 경우에는 python_upsteream_1() 함수에 AirflowException을 내려주어 실패 처리하게 만들었다. 하지만 Trigger Rule을 'all_done'으로 지정해주었으니, python_downstream_1() 함수는 상위 task가 수행이 끝난 뒤에 바로 실행이 되어야 할 것이다.

 

그래프를 살펴봤을 때, 의도한 대로 python_upstream_1 task는 failed 처리가 되었지만 후행 task인 python_downstream_1은 Trigger Rule에 의해 성공적으로 실행된 것을 확인해볼 수 있다.

 

# none_skipped

    @task.branch(task_id='branching')
    def random_branch():
        import random
        item_lst = ['A', 'B', 'C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item == 'B':
            return 'task_b'
        elif selected_item == 'C':
            return 'task_c'

    task_a = BashOperator(
        task_id='task_a',
        bash_command='echo upstream1'
    )

    @task(task_id='task_b')
    def task_b():
        print('정상 처리')


    @task(task_id='task_c')
    def task_c():
        print('정상 처리')

    @task(task_id='task_d', trigger_rule='none_skipped')
    def task_d():
        print('정상 처리')

    random_branch() >> [task_a, task_b(), task_c()] >> task_d()

none_skipped 코드의 경우에는 제일 먼저 실행되는 random_branch를 통해 2번째로 실행될 task_a,b,c 중 한 개만 수행되도록 해 나머지 2개는 skipped 처리가 될 것이다. task_d의 경우 Trigger Rule을 'none_skipped'로 설정해 주었기 때문에, 상위 task 2개가 무조건 skipped 되는 상황이므로 실행되지 않아야 할 것이다.

 

해당 DAG을 실행시킨 뒤 Graph를 보면, task_c가 선택되어 task_b와 task_a가 skipped되었다. 그리고 Trigger Rule에 따라 task_d도 따라 skipped 처리된 것을 확인해볼 수 있다.

728x90