Task 분기 처리는 왜 필요한가?
Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (task 1 >> [task2-1, task2-2, task2-3])
그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리이다.
Airflow에서 task를 분기처리 하는 방법
1) BranchPythonOperator 사용
2) @task.branch 데코레이터 이용
3) BaseBranchOperator 상속하여 직접 개발 (클래스 파일을 상속)
BranchPythonOperator 사용하는 방법
def select_random():
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_item == 'a':
return 'task_a' # task_a 실행(str으로 반환)
elif selected item in ['b','c']:
return ['task_b', 'task_c'] # task_b, task_c 실행 (list로 반환)
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=selected_random # 해당 함수의 리턴값이 해당 task의 후행으로 올 task를 의미함
)
python_branch_task >> [task_a, task_b, task_c]
BranchPythonOperator를 사용하는 방법은 크게 어렵지 않다. BranchPythonOperator에서 호출하는 함수에 상황에 따라 실행할 task의 id를 스트링(1개일 때) 또는 리스트(2개 이상일 때)로 반환시켜주면 된다.
위 함수를 가진 DAG을 제작해 실행해 보았다. (task 1,2,3은 selected_item의 결과를 출력하는 함수를 가진 task로 임의 제작)
DAG을 실행시킨 뒤 Graph 탭을 확인해보니 task_a가 분홍색 테두리로 감싸져 있다. 이는 해당 task가 skipped 처리되었다는 의미이다. select_random 함수에서 'B' 또는 'C'가 선택된 듯하다. 따라서 task_b와 task_c는 초록색으로 감싸진 'success' 처리가 되었다는 것을 볼 수 있다.
그리고 BranchPythonOperator로 실행 시킨 task의 XCom 정보를 보면 skipmixin_key라는 새로운 항목을 발견할 수 있다. 이후에 XCom을 통해서 어떤 task가 선택되었는지 확인할 수 있다.
@task.branch 데코레이터로 분기 처리하는 방법
from airflow.decorators import task
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_random == 'a':
return 'task_a'
elif selected_item in ['b','c']:
return ['task_b', 'task_c']
select_random() >> ['task_a', 'task_b', 'task_c']
위 BranchPythonOperator를 쓸 때처럼 간단히 사용할 수 있다. 달라진 점은 임포트하는 라이브러리가 달라졌다는 점과 더불어 @task.branch라는 데코레이터를 함수 위에 붙여 사용했다는 것이다. 이후 데코레이터가 붙은 함수를 실행시켜 flow를 연결 시켜주기만 하면 BranchPythonOperator를 사용한 것과 동일한 결과가 나온다. (Python Decorator 개념 참고)
[Airflow] Python Operator 사용과 Python Decorator
Python Operator 파이썬 오퍼레이터: Airflow에서 Python 함수를 실행시키는 오퍼레이터로, 해당 클래스를 이용해 DAG을 만들면 파이썬 함수를 실행시키는 워크플로우를 생성할 수 있다. 1. 내부 함수 실
minding-deep-learning.tistory.com
BaseBranchOperator로 분기처리 하는 방법 (클래스 상속)
from airflow.operators.branch import BaseBranchOperator
with DAG(...
) as dag:
class CustomBranchOperator(BaseBranchOperator): # 오버라이딩(클래스 상속, 클래스 이름은 상관없음)
def choose_branch(self, context): # BaseBranchOperator 상속시 choose_branch 함수를 구현해줘야 함.
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_item == 'A':
retrun 'task_a'
elif selected_item in ['b', 'c']:
return ['task_b', 'task_c']
custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
custom_branch_operator >> [task_a, task_b, task_c]
Airflow에서 BaseBranchOperator를 통해 분기 처리를 하기 위해서는 BaseBranchOperator를 상속받는 클래스를 만들고, 그 아래에 choose_branch()라는 함수를 만들어 분기 처리할 task를 설정해주어야 한다.
Airflow 공식 홈페이지에 따르면, BaseBranchOperator에 대한 설명이 아래와 같이 적혀있다.
A base class for creating operators with branching functionality, like to BranchPythonOperator.
Users should create a subclass from this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids.
The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped.
(번역) BranchPython Operator와 같이 분기 기능을 가진 연산자를 만들기 위한 기본 클래스입니다. 사용자는 이 연산자에서 하위 클래스를 생성하고 choose_branch(자체, 컨텍스트) 함수를 구현해야 합니다. 이 작업은 분기를 결정하는 데 필요한 모든 비즈니스 로직을 실행하고 단일 작업에 대한 task_id 또는 task_id 목록을 반환해야 합니다. 연산자는 반환된 task_id(s)를 계속하고 이 연산자의 바로 아래에 있는 다른 모든 작업은 건너뜁니다.
위에서 사용한 BranchPythonOperator, @task.branch와 같이 return값에 후행에 실행한 task를 반환해주면 된다.
실행한 결과 이번엔 task_a가 선택되어 task_b와 task_c가 skipped된 것을 볼 수 있다.
XCom을 확인했을 때에도 의도한 대로 skipmixin_key에 task_a가 선택되어 저장된 것을 알 수 있다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] Task Group (1) | 2024.07.24 |
---|---|
[Airflow] Trigger Rule (0) | 2024.07.24 |
[Airflow] 전역변수 Variable 이용하기 (1) | 2024.07.23 |
[Airflow/Xcom] Airflow에서 Xcom 사용해보기 (0) | 2024.07.23 |
[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기 (3) | 2024.07.23 |