본문 바로가기
Minding's Programming/Airflow

[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)

by Minding 2024. 7. 24.
728x90
반응형

Task 분기 처리는 왜 필요한가?

 

Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (task 1 >> [task2-1, task2-2, task2-3])

 

그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리이다.

 

 

Airflow에서 task를 분기처리 하는 방법

 

1) BranchPythonOperator 사용

2) @task.branch 데코레이터 이용

3) BaseBranchOperator 상속하여 직접 개발 (클래스 파일을 상속)

 

 

BranchPythonOperator 사용하는 방법

def select_random():
    import random
    
    item_list = ['a', 'b', 'c']
    selected_item = random.choice(item_list)
    if selected_item == 'a':
    	return 'task_a' # task_a 실행(str으로 반환)
    elif selected item in ['b','c']:
    	return ['task_b', 'task_c'] # task_b, task_c 실행 (list로 반환)
        
python_branch_task = BranchPythonOperator(
    task_id='python_branch_task',
    python_callable=selected_random # 해당 함수의 리턴값이 해당 task의 후행으로 올 task를 의미함
)

python_branch_task >> [task_a, task_b, task_c]

BranchPythonOperator를 사용하는 방법은 크게 어렵지 않다. BranchPythonOperator에서 호출하는 함수에 상황에 따라 실행할 task의 id를 스트링(1개일 때) 또는 리스트(2개 이상일 때)로 반환시켜주면 된다.

 

위 함수를 가진 DAG을 제작해 실행해 보았다. (task 1,2,3은 selected_item의 결과를 출력하는 함수를 가진 task로 임의 제작)

DAG을 실행시킨 뒤 Graph 탭을 확인해보니 task_a가 분홍색 테두리로 감싸져 있다. 이는 해당 task가 skipped 처리되었다는 의미이다. select_random 함수에서 'B' 또는 'C'가 선택된 듯하다. 따라서 task_b와 task_c는 초록색으로 감싸진 'success' 처리가 되었다는 것을 볼 수 있다.

 

그리고 BranchPythonOperator로 실행 시킨 task의 XCom 정보를 보면 skipmixin_key라는 새로운 항목을 발견할 수 있다. 이후에 XCom을 통해서 어떤 task가 선택되었는지 확인할 수 있다.

 

 

@task.branch 데코레이터로 분기 처리하는 방법

from airflow.decorators import task

@task.branch(task_id='python_branch_task')
def select_random():
    import random
    item_list = ['a', 'b', 'c']
    selected_item = random.choice(item_list)
    if selected_random == 'a':
    	return 'task_a'
    elif selected_item in ['b','c']:
    	return ['task_b', 'task_c']
        
select_random() >> ['task_a', 'task_b', 'task_c']

위 BranchPythonOperator를 쓸 때처럼 간단히 사용할 수 있다. 달라진 점은 임포트하는 라이브러리가 달라졌다는 점과 더불어 @task.branch라는 데코레이터를 함수 위에 붙여 사용했다는 것이다. 이후 데코레이터가 붙은 함수를 실행시켜 flow를 연결 시켜주기만 하면 BranchPythonOperator를 사용한 것과 동일한 결과가 나온다. (Python Decorator 개념 참고)

 

[Airflow] Python Operator 사용과 Python Decorator

Python Operator 파이썬 오퍼레이터: Airflow에서 Python 함수를 실행시키는 오퍼레이터로, 해당 클래스를 이용해 DAG을 만들면 파이썬 함수를 실행시키는 워크플로우를 생성할 수 있다. 1. 내부 함수 실

minding-deep-learning.tistory.com

 

 

BaseBranchOperator로 분기처리 하는 방법 (클래스 상속)

from airflow.operators.branch import BaseBranchOperator
with DAG(...
) as dag:
    class CustomBranchOperator(BaseBranchOperator): # 오버라이딩(클래스 상속, 클래스 이름은 상관없음)
    	def choose_branch(self, context): # BaseBranchOperator 상속시 choose_branch 함수를 구현해줘야 함.
        	import random
            
            item_list = ['a', 'b', 'c']
            selected_item = random.choice(item_list)
            if selected_item == 'A':
            	retrun 'task_a'
            elif selected_item in ['b', 'c']:
            	return ['task_b', 'task_c']
                
    custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
    custom_branch_operator >> [task_a, task_b, task_c]

Airflow에서 BaseBranchOperator를 통해 분기 처리를 하기 위해서는 BaseBranchOperator를 상속받는 클래스를 만들고, 그 아래에 choose_branch()라는 함수를 만들어 분기 처리할 task를 설정해주어야 한다.

 

Airflow 공식 홈페이지에 따르면, BaseBranchOperator에 대한 설명이 아래와 같이 적혀있다.

A base class for creating operators with branching functionality, like to BranchPythonOperator.
Users should create a subclass from this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids.
The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped.

(번역) BranchPython Operator와 같이 분기 기능을 가진 연산자를 만들기 위한 기본 클래스입니다. 사용자는 이 연산자에서 하위 클래스를 생성하고 choose_branch(자체, 컨텍스트) 함수를 구현해야 합니다. 이 작업은 분기를 결정하는 데 필요한 모든 비즈니스 로직을 실행하고 단일 작업에 대한 task_id 또는 task_id 목록을 반환해야 합니다. 연산자는 반환된 task_id(s)를 계속하고 이 연산자의 바로 아래에 있는 다른 모든 작업은 건너뜁니다.

 

위에서 사용한 BranchPythonOperator, @task.branch와 같이 return값에 후행에 실행한 task를 반환해주면 된다.

 

실행한 결과 이번엔 task_a가 선택되어 task_b와 task_c가 skipped된 것을 볼 수 있다.

 

XCom을 확인했을 때에도 의도한 대로 skipmixin_key에 task_a가 선택되어 저장된 것을 알 수 있다.

728x90
반응형

댓글