본문 바로가기

Minding's Programming/Airflow

[Airflow] Task Group

728x90
반응형

Task Group이란

 

Task Group은 task들을 모아 그룹 형태로 관리할 수 있는 기능이다. UI Graph 탭에서 Task들을 Group화하여 보여준다.

DAG 내 task가 많아졌을 때, 관리하기 쉽도록 해주는 편의 기능이며 task group 안에 task group을 중첩해 관리할 수도 있다.

 

출처: Airflow 공식 홈페이지

위 영상을 보면 여러 개의 task들을 하나의 section으로 만들어 관리할 수 있는 것을 볼 수 있다. 자세히 보면 section 내 inner_section과 같이 group 안에 또 다른 group을 만들어 설정할 수 있는 것을 확인할 수 있다.

 

Task Group을 꼭 사용할 필요는 없지만, 관리의 용이성이 올라가기 때문에 설정해놓는다면 편하게 관리할 수 있을 것이다.

 

 

Task Group 설정하는 방법

 

1) @task_group 데코레이터 이용

from airflow.decorators import task_group

) as dag:
    def inner_func(**kwargs):
        msg = kwargs.get('msg') or '' 
        print(msg)

    @task_group(group_id='first_group')
    def group_1():
        ''' task_group 데커레이터를 이용한 첫 번째 그룹입니다. ''' # docstring

        @task(task_id='inner_function1')
        def inner_func1(**kwargs):
            print('첫 번째 TaskGroup 내 첫 번째 task입니다.')

        inner_function2 = PythonOperator(
            task_id='inner_function2',
            python_callable=inner_func,
            op_kwargs={'msg':'첫 번째 TaskGroup내 두 번쨰 task입니다.'}
        )

        inner_func1() >> inner_function2

task group을 설정하는 첫 번째 방법은 @task_group 데코레이터를 이용해 task들을 그룹화시켜 주는 것이다. 데코레이터 아래에 함수를 설정하고 해당 함수 아래에 task들을 만들어주면 그룹화할 수 있다.

 

또한 task group을 설정하는 함수 아래에 있는 주석(''' ~~ ''')은 docstring이라고 불리며, 해당 그룹의 설명을 제공해주는 역할을 하며, Airflow Webserver UI에서 해당 내용을 확인할 수 있다.

 

2) TaskGroup 클래스를 이용해 직접 설정

from airflow.utils.task_group import TaskGroup

with TaskGroup(group_id='second_group', tooltip='두 번째 그룹입니다') as group_2:
        ''' 여기에 적은 docstring은 표시되지 않습니다'''
        @task(task_id='inner_function1')
        def inner_func1(**kwargs):
            print('두 번째 TaskGroup 내 첫 번째 task입니다.')

        inner_function2 = PythonOperator(
            task_id='inner_function2',
            python_callable=inner_func,
            op_kwargs={'msg': '두 번째 TaskGroup내 두 번째 task입니다.'}
        )
        inner_func1() >> inner_function2

두 번째는 TaskGroup 클래스를 이용해 직접 그룹을 설정해주는 방법이다. DAG을 만들 때 처럼 with을 활용해 group을 설정해줄 수 있다. 위 데코레이터를 이용한 방법과 달리 TaskGroup 클래스에 tooltip이라는 파라미터가 있어 해당 그룹에 대한 설명문은 여기에 작성하면 된다.

 

추가로, 그룹이 다르면 task의 이름이 같아도 에러가 나지 않는다. 위 코드에서 group1과 group2의 두 개의 함수가 이름이 같지만 정상적으로 실행된다.

 

또한 그룹의 단위로 task flow를 지정할 수 있다. 아래와 같이 설정하면 group1의 task가 모두 수행된 뒤 group2가 수행될 것이다.

group_1() >> group_2 # task Group 또한 flow 지정 가능

 

DAG을 실행시켜준 뒤 Graph를 확인하면 아래와 같이 노출되는 것을 확인할 수 있다.

728x90