DAG 내에서 task의 순서를 지정하고, 특정 조건 별로 실행시키는 법을 알아보았다. 하지만 task보다 더 큰 범주인 DAG을 순서에 따라 실행시키려면 어떻게 해야할까?
DAG 간 의존관계를 설정하는 방법에는 크게 두 가지가 있다고 한다.
1) TriggerDagRun Operator 사용
2) ExternalTask Sensor 사용
이 두가지 방법의 차이점은 아래 표와 같이 정리할 수 있다.
TriggerDagRun Operator | External Task Sensor | |
방식 | 실행할 다른 DAG의 ID를 지정해 수행 | 본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행 |
사용시점(권고) | Trigger되는 DAG의 선행 DAG이 하나일 경우 | Trigger되는 DAG의 선행 DAG이 2개 이상일 경우 |
위와 같이 두 가지 방법은 수행방식과 적절한 사용 시점에도 차이가 있다. 다른 DAG을 실행하기 전에 체크해야 할 DAG의 개수에 따라 차이가 난다고 보면 된다.
TriggerDagRun Operator 사용하는 방법
TriggerDagRun Operator는 DAG 내의 Task로 설정할 수 있는데, 특정 함수를 실행시키는 것이 아닌 DAG을 실행시키는 오퍼레이터이다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
as dag:
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag_task', # 필수값
trigger_dag_id='dags_python_operator', # 다음에 실행시킬 dag의 ID, 필수값
trigger_run_id=None,
execution_date='{{ data_interval_end }}',
reset_dag_run=True,
wait_for_completion=False,
poke_interval=60,
allowed_states=['success'],
failed_states=None
)
TriggerDagRunOperator에는 꽤 많은 파라미터가 존재하는데, 이 중 필수로 들어가야하는 값은 task_id와 실행시킬 Dag의 ID를 입력하는 trigger_dag_id이다.
그리고 바로 하단에 trigger_run_id 라는 파라미터가 보이는데, 여기서 run_id는 DAG의 수행 방식과 시간을 유일하게 식별해주는 Key라고 할 수 있다. 같은 시간에 수행됐더라도 수행 방식이 다를 수 있는데, 그것에 따라 키가 달라진다. 예를 들어 스케줄에 의해 수행된 경우 run_id는 "scheduled_{{data_interval_start}}" 의 형식으로 표기된다.
- Schedule: 스케줄에 의해 DAG이 수행됐을 때
- Manual: 직접 수행버튼을 눌러 DAG이 수행됐을 때
- Backfill: 과거 날짜에 대해 DAG을 수행했을 때
다시 돌아와 trigger_run_id 파라미터에 대해 설명하자면, 위와 같이 자동으로 지정되는 run_id를 직접 지정해줄 수 있는 파라미터이다.
다른 파라미터를 설명하면 아래 표와 같다.
execution_date | 해당 DAG이 실행될 경우 manual_{{execution_date}}의 id값으로 수행되는데, 여기에 들어갈 날짜 정보 |
reset_dag_run | 동일한 run_id가 있어도 수행할 것인지에 대해 묻는 파라미터, True일 경우 동일한 id가 있어도 dag을 수행한다. |
wait_for_completion | 현재 task에서 수행되는 DAG이 완료(성공) 후 다음 task를 실행시킬지를 묻는 파라미터, True일 경우 해당 DAG이 모두 완료되어야 다음 task를 실행한다. |
poke_interval | DAG이 수행 완료되었는지 확인하는 주기 (초) |
allowed_states | 수행시킬 DAG의 어떤 수행 결과에 따라 해당 task을 성공으로 처리할 지에 대한 파라미터 (ex. DAG의 결과가 성공/실패 모두 task도 성공 = ['success', 'failed'] |
failed_states | 수행시킬 DAG의 어떤 수행 결과에 따라 해당 task을 실패로 처리할 지에 대한 파라미터 |
이제 TriggerDagRunOperator가 포함된 DAG을 서버에 올려 실행시켜보았다.
앞서 그래프를 살펴보면 위와 같이 BashOperator가 있는 start_task 뒤에 tirgger_dag_task라는 이름으로 'dags_python_operator'라는 이름의 DAG을 실행시키는 것으로 연결된 것을 알 수 있다.
실행 이후 dags_python_oprator로 이동한 뒤 task의 detail 탭을 살펴보면,
Run ID가 manual로 표시되며 이후 시간이 UTC 기준 00시 30분으로 되어있는 것을 볼 수 있다. execution_date를 {{data_interval_start}} 값으로 주었기 때문에, DAG의 수행 시간인 (한국 기준) 09시 30분이 노출되는 것을 볼 수 있다.
ExternalTask Sensor 사용하는 방법
위에서 본 방법처럼 Operator를 사용하는 것이 아닌 Sensor를 이용해 다른 DAG과의 의존관계를 설정시킬 수도 있다. ( Airflow의 센서 (BashSensor, FileSensor, PythonSensor, ExternalTaskSensor, CustomSensor)
ExternalTask Sensor의 파라미터는 굉장히 많은 편이다. 아래 표와 같이 정리해보았다.
파라미터 | 필수여부 | 설명 |
external_dag_id | O | 센싱할 DAG 명 |
external_task_id | X (셋 중 하나만 입력 가능, 없으면 dag만 센싱) |
센싱할 task_id 명 |
external_task_ids | 센싱할 1개 이상의 task_id명 (list) | |
external_task_group_id | 센싱할 task_group_id 명 | |
allowed_states | X (같은 상태가 입력되면 안됨) + from airflow.utils.state import State 필요 |
센서가 Success 되기 위한 센싱 대상의 상태 |
skipped_states | 센서가 Skipped 되기 위한 센싱 대상의 상태 | |
failed_states | 센서가 Failed 되기 위한 센싱 대상의 상태 | |
execution_delta | X (둘 중 하나만 입력 가능) |
현재 DAG과 센싱할 DAG의 data_interval_start의 차이를 입력 (본 DAG보다 센싱할 DAG이 얼마나 과거에 있는지 양수로 입력 / ex) 센싱할 DAG이 6시간 앞설 경우 timedelta(hours=6) ) |
execution_date_fn | 센싱할 DAG의 data_interval_start를 구하기 위한 함수 | |
check_existence | X | 해당 dag_id 또는 task_id가 있는지 확인 |
위 파라미터 중 가장 유의해야 할 파라미터는 external_dag_id와 execution_delta 두 가지라고 볼 수 있다. 일단 exteranl_dag_id는 우선 필수적으로 들어가야 할 파라미터이기 때문이다.
또한 execution_delta는 현재 DAG과 센싱할 DAG의 시간 차이를 입력하는 파라미터인데, 상수로 입력해야 한다는 점에 유의해야한다.(6시간 전에 실행하는 DAG일 경우 hours=-6이 아닌 hours=6)
만약 센싱할 DAG이 한 달마다 실행하는 DAG이고, 현재 DAG이 하루마다 실행하는 조건이라면 execution_date_fn을 사용하면 된다. 센싱할 DAG과 현재 DAG의 실행 시간(data_interval_start) 차이를 구하는 함수를 이 곳에 넣은 뒤 활용할 수 있다.
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State
with DAG(
dag_id='dags_external_task_sensor',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule='0 7 * * *',
catchup=False
) as dag:
external_task_sensor_a = ExternalTaskSensor(
task_id='external_task_sensor_a',
external_dag_id='dags_branch_python_operator',
external_task_id='task_a',
allowed_states=[State.SKIPPED], # State 라이브러리 활용해야 함
execution_delta=timedelta(hours=6),
poke_interval=10 #10초
)
external_task_sensor_b = ExternalTaskSensor(
task_id='external_task_sensor_b',
external_dag_id='dags_branch_python_operator',
external_task_id='task_b',
failed_states=[State.SKIPPED],
execution_delta=timedelta(hours=6),
poke_interval=10 #10초
)
external_task_sensor_c = ExternalTaskSensor(
task_id='external_task_sensor_c',
external_dag_id='dags_branch_python_operator',
external_task_id='task_c',
allowed_states=[State.SUCCESS],
execution_delta=timedelta(hours=6),
poke_interval=10 #10초
)
3개의 external_task_sensor를 만들었고, 모두 같은 DAG을 센싱하지만 센싱할 task는 각각 다르도록 설정했다. task_a의 경우 skipped 상태일 때 해당 sensor가 success 처리되도록 하는 등 각각 다르게 처리했다. 센싱할 DAG인 dags_branch_python_operator는 random값으로 task_a, task_b, task_c를 리턴해 분기 처리해주는 DAG이다.
결과는 모두 Success 처리가 되었다. 센싱할 DAG은 어떻게 처리가 되었을까?
센싱 대상인 dags_branch_python_operator는 task_b와 task_c가 success 처리되었고, task_a는 skipped 처리되었다. 우리가 위 코드에서 의도한 바에 따라 task_a는 skipped 처리되었을 경우 sensor에서는 success 처리가 된 것을 알 수 있고, 다른 task도 마찬가지로 의도한 바에 따라 처리된 것을 알 수 있다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] Custom Operator 개발해보기 (BaseOperator) (0) | 2024.07.26 |
---|---|
[Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기 (0) | 2024.07.25 |
[Airflow] Task Group (1) | 2024.07.24 |
[Airflow] Trigger Rule (0) | 2024.07.24 |
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator) (2) | 2024.07.24 |