Task 분기 처리는 왜 필요한가?
Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (task 1 >> [task2-1, task2-2, task2-3])
그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리이다.
Airflow에서 task를 분기처리 하는 방법
1) BranchPythonOperator 사용
2) @task.branch 데코레이터 이용
3) BaseBranchOperator 상속하여 직접 개발 (클래스 파일을 상속)
BranchPythonOperator 사용하는 방법
def select_random():
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_item == 'a':
return 'task_a' # task_a 실행(str으로 반환)
elif selected item in ['b','c']:
return ['task_b', 'task_c'] # task_b, task_c 실행 (list로 반환)
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=selected_random # 해당 함수의 리턴값이 해당 task의 후행으로 올 task를 의미함
)
python_branch_task >> [task_a, task_b, task_c]
BranchPythonOperator를 사용하는 방법은 크게 어렵지 않다. BranchPythonOperator에서 호출하는 함수에 상황에 따라 실행할 task의 id를 스트링(1개일 때) 또는 리스트(2개 이상일 때)로 반환시켜주면 된다.
위 함수를 가진 DAG을 제작해 실행해 보았다. (task 1,2,3은 selected_item의 결과를 출력하는 함수를 가진 task로 임의 제작)
DAG을 실행시킨 뒤 Graph 탭을 확인해보니 task_a가 분홍색 테두리로 감싸져 있다. 이는 해당 task가 skipped 처리되었다는 의미이다. select_random 함수에서 'B' 또는 'C'가 선택된 듯하다. 따라서 task_b와 task_c는 초록색으로 감싸진 'success' 처리가 되었다는 것을 볼 수 있다.
그리고 BranchPythonOperator로 실행 시킨 task의 XCom 정보를 보면 skipmixin_key라는 새로운 항목을 발견할 수 있다. 이후에 XCom을 통해서 어떤 task가 선택되었는지 확인할 수 있다.
@task.branch 데코레이터로 분기 처리하는 방법
from airflow.decorators import task
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_random == 'a':
return 'task_a'
elif selected_item in ['b','c']:
return ['task_b', 'task_c']
select_random() >> ['task_a', 'task_b', 'task_c']
위 BranchPythonOperator를 쓸 때처럼 간단히 사용할 수 있다. 달라진 점은 임포트하는 라이브러리가 달라졌다는 점과 더불어 @task.branch라는 데코레이터를 함수 위에 붙여 사용했다는 것이다. 이후 데코레이터가 붙은 함수를 실행시켜 flow를 연결 시켜주기만 하면 BranchPythonOperator를 사용한 것과 동일한 결과가 나온다. (Python Decorator 개념 참고)
BaseBranchOperator로 분기처리 하는 방법 (클래스 상속)
from airflow.operators.branch import BaseBranchOperator
with DAG(...
) as dag:
class CustomBranchOperator(BaseBranchOperator): # 오버라이딩(클래스 상속, 클래스 이름은 상관없음)
def choose_branch(self, context): # BaseBranchOperator 상속시 choose_branch 함수를 구현해줘야 함.
import random
item_list = ['a', 'b', 'c']
selected_item = random.choice(item_list)
if selected_item == 'A':
retrun 'task_a'
elif selected_item in ['b', 'c']:
return ['task_b', 'task_c']
custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
custom_branch_operator >> [task_a, task_b, task_c]
Airflow에서 BaseBranchOperator를 통해 분기 처리를 하기 위해서는 BaseBranchOperator를 상속받는 클래스를 만들고, 그 아래에 choose_branch()라는 함수를 만들어 분기 처리할 task를 설정해주어야 한다.
Airflow 공식 홈페이지에 따르면, BaseBranchOperator에 대한 설명이 아래와 같이 적혀있다.
A base class for creating operators with branching functionality, like to BranchPythonOperator.
Users should create a subclass from this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids.
The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped.
(번역) BranchPython Operator와 같이 분기 기능을 가진 연산자를 만들기 위한 기본 클래스입니다. 사용자는 이 연산자에서 하위 클래스를 생성하고 choose_branch(자체, 컨텍스트) 함수를 구현해야 합니다. 이 작업은 분기를 결정하는 데 필요한 모든 비즈니스 로직을 실행하고 단일 작업에 대한 task_id 또는 task_id 목록을 반환해야 합니다. 연산자는 반환된 task_id(s)를 계속하고 이 연산자의 바로 아래에 있는 다른 모든 작업은 건너뜁니다.
위에서 사용한 BranchPythonOperator, @task.branch와 같이 return값에 후행에 실행한 task를 반환해주면 된다.
실행한 결과 이번엔 task_a가 선택되어 task_b와 task_c가 skipped된 것을 볼 수 있다.
XCom을 확인했을 때에도 의도한 대로 skipmixin_key에 task_a가 선택되어 저장된 것을 알 수 있다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] Task Group (1) | 2024.07.24 |
---|---|
[Airflow] Trigger Rule (0) | 2024.07.24 |
[Airflow] 전역변수 Variable 이용하기 (1) | 2024.07.23 |
[Airflow/Xcom] Airflow에서 Xcom 사용해보기 (0) | 2024.07.23 |
[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기 (3) | 2024.07.23 |