Task Group이란
Task Group은 task들을 모아 그룹 형태로 관리할 수 있는 기능이다. UI Graph 탭에서 Task들을 Group화하여 보여준다.
DAG 내 task가 많아졌을 때, 관리하기 쉽도록 해주는 편의 기능이며 task group 안에 task group을 중첩해 관리할 수도 있다.
위 영상을 보면 여러 개의 task들을 하나의 section으로 만들어 관리할 수 있는 것을 볼 수 있다. 자세히 보면 section 내 inner_section과 같이 group 안에 또 다른 group을 만들어 설정할 수 있는 것을 확인할 수 있다.
Task Group을 꼭 사용할 필요는 없지만, 관리의 용이성이 올라가기 때문에 설정해놓는다면 편하게 관리할 수 있을 것이다.
Task Group 설정하는 방법
1) @task_group 데코레이터 이용
from airflow.decorators import task_group
) as dag:
def inner_func(**kwargs):
msg = kwargs.get('msg') or ''
print(msg)
@task_group(group_id='first_group')
def group_1():
''' task_group 데커레이터를 이용한 첫 번째 그룹입니다. ''' # docstring
@task(task_id='inner_function1')
def inner_func1(**kwargs):
print('첫 번째 TaskGroup 내 첫 번째 task입니다.')
inner_function2 = PythonOperator(
task_id='inner_function2',
python_callable=inner_func,
op_kwargs={'msg':'첫 번째 TaskGroup내 두 번쨰 task입니다.'}
)
inner_func1() >> inner_function2
task group을 설정하는 첫 번째 방법은 @task_group 데코레이터를 이용해 task들을 그룹화시켜 주는 것이다. 데코레이터 아래에 함수를 설정하고 해당 함수 아래에 task들을 만들어주면 그룹화할 수 있다.
또한 task group을 설정하는 함수 아래에 있는 주석(''' ~~ ''')은 docstring이라고 불리며, 해당 그룹의 설명을 제공해주는 역할을 하며, Airflow Webserver UI에서 해당 내용을 확인할 수 있다.
2) TaskGroup 클래스를 이용해 직접 설정
from airflow.utils.task_group import TaskGroup
with TaskGroup(group_id='second_group', tooltip='두 번째 그룹입니다') as group_2:
''' 여기에 적은 docstring은 표시되지 않습니다'''
@task(task_id='inner_function1')
def inner_func1(**kwargs):
print('두 번째 TaskGroup 내 첫 번째 task입니다.')
inner_function2 = PythonOperator(
task_id='inner_function2',
python_callable=inner_func,
op_kwargs={'msg': '두 번째 TaskGroup내 두 번째 task입니다.'}
)
inner_func1() >> inner_function2
두 번째는 TaskGroup 클래스를 이용해 직접 그룹을 설정해주는 방법이다. DAG을 만들 때 처럼 with을 활용해 group을 설정해줄 수 있다. 위 데코레이터를 이용한 방법과 달리 TaskGroup 클래스에 tooltip이라는 파라미터가 있어 해당 그룹에 대한 설명문은 여기에 작성하면 된다.
추가로, 그룹이 다르면 task의 이름이 같아도 에러가 나지 않는다. 위 코드에서 group1과 group2의 두 개의 함수가 이름이 같지만 정상적으로 실행된다.
또한 그룹의 단위로 task flow를 지정할 수 있다. 아래와 같이 설정하면 group1의 task가 모두 수행된 뒤 group2가 수행될 것이다.
group_1() >> group_2 # task Group 또한 flow 지정 가능
DAG을 실행시켜준 뒤 Graph를 확인하면 아래와 같이 노출되는 것을 확인할 수 있다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기 (0) | 2024.07.25 |
---|---|
[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기) (0) | 2024.07.25 |
[Airflow] Trigger Rule (0) | 2024.07.24 |
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator) (2) | 2024.07.24 |
[Airflow] 전역변수 Variable 이용하기 (1) | 2024.07.23 |