이전에 branch, 즉 분기처리 개념을 통해 상위 task에서 상황에 따라 분기 처리하여 여러 개의 하위 task 중 선택 실행하는 방법을 배워봤다.(링크)
이번에는 상위 task가 여러 개이고, 특정 조건에서만 하위 task를 실행하는 조건의 개념인 Trigger Rule에 대해 공부해보고자 한다.
Trigger Rule
위에서 설명했듯이, Trigger Rule은 상위 task의 상태에 따라 하위 task의 수행여부를 결정하는 '조건'을 뜻한다. 기본값은 상위 task 3개가 모두 성공했을 때에만 하위 task를 수행하는 것이다. 하지만 이 조건을 수정하고 싶을 때에는 Trigger Rule을 이용해 조건을 바꿔줄 수 있다.(ex. 3개 중 하나라도 성공할 시에 하위 task 수행)
Trigger Rule의 종류
all_success ( 기본값) | 상위 task가 모두 성공하면 실행 |
all_failed | 상위 task가 모두 실패하면 실행 |
all_done | 상위 task가 모두 수행되면 실행 (성공/실패여부 상관없이) |
all_skipped | 상위 task가 모두 skipped 상태면 실행 |
one_failed | 상위 task 중 하나 이상 실패하면 실행 (모든 상위 task 완료를 기다리지 않음) |
one_success | 상위 task 중 하나 이상 성공하면 실행 (모든 상위 task 완료를 기다리지 않음) |
one_done | 상위 task 중 하나 이상 성공 또는 실패하면 실행 |
none_failed | 상위 task 중 실패가 없는 경우 실행 (성공 or skipped) |
none_failed_min_one_success | 상위 task 중 실패가 없고 성공한 task가 적어도 1개 이상이면 실행 |
none_skipped | skipped된 사위 task가 없으면 실행 (성공/실패여부 상관없이) |
always | 언제나 실행 |
기본적으로 Trigger Rule은 하위 task에 거는 조건이며, 파라미터에 넣을 수 있는 종류는 위 표와 같이 11가지이다.
이 중 all_done과 none_skipped에 대해 코드를 작성해보려고 한다.
# all_done
bash_upstream_1 = BashOperator(
task_id = 'bash_upstream_1',
bash_command='echo upstream1'
)
@task(task_id='python_upstream_1')
def python_upstream_1():
raise AirflowException('downstream_1 Exception!') # 실패 처리를 위해 강제로 Exception
@task(task_id='python_upstream_2')
def python_upstream_2():
print('정상 처리')
@task(task_id='python_downstream_1', trigger_rule='all_done') # Trigger Rule 설정해주기
def python_downstream_1():
print('정상 처리')
[bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()
all_done 코드의 경우에는 python_upsteream_1() 함수에 AirflowException을 내려주어 실패 처리하게 만들었다. 하지만 Trigger Rule을 'all_done'으로 지정해주었으니, python_downstream_1() 함수는 상위 task가 수행이 끝난 뒤에 바로 실행이 되어야 할 것이다.
그래프를 살펴봤을 때, 의도한 대로 python_upstream_1 task는 failed 처리가 되었지만 후행 task인 python_downstream_1은 Trigger Rule에 의해 성공적으로 실행된 것을 확인해볼 수 있다.
# none_skipped
@task.branch(task_id='branching')
def random_branch():
import random
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item == 'B':
return 'task_b'
elif selected_item == 'C':
return 'task_c'
task_a = BashOperator(
task_id='task_a',
bash_command='echo upstream1'
)
@task(task_id='task_b')
def task_b():
print('정상 처리')
@task(task_id='task_c')
def task_c():
print('정상 처리')
@task(task_id='task_d', trigger_rule='none_skipped')
def task_d():
print('정상 처리')
random_branch() >> [task_a, task_b(), task_c()] >> task_d()
none_skipped 코드의 경우에는 제일 먼저 실행되는 random_branch를 통해 2번째로 실행될 task_a,b,c 중 한 개만 수행되도록 해 나머지 2개는 skipped 처리가 될 것이다. task_d의 경우 Trigger Rule을 'none_skipped'로 설정해 주었기 때문에, 상위 task 2개가 무조건 skipped 되는 상황이므로 실행되지 않아야 할 것이다.
해당 DAG을 실행시킨 뒤 Graph를 보면, task_c가 선택되어 task_b와 task_a가 skipped되었다. 그리고 Trigger Rule에 따라 task_d도 따라 skipped 처리된 것을 확인해볼 수 있다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기) (0) | 2024.07.25 |
---|---|
[Airflow] Task Group (1) | 2024.07.24 |
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator) (2) | 2024.07.24 |
[Airflow] 전역변수 Variable 이용하기 (1) | 2024.07.23 |
[Airflow/Xcom] Airflow에서 Xcom 사용해보기 (0) | 2024.07.23 |