Minding's Programming/Airflow

[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기

Minding 2024. 7. 23. 14:56
728x90
반응형

Macro 변수는?

 

Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다.

 

Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.

ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때

sql = f'''
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN ?? AND ??
'''
  • 배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함
  • 전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제
  • 매 달마다 '어제 날짜'가 다르기 때문에, 배치 일에서 '하루 뺀' 날짜 값이 필요함

이럴 때 유용하게 사용 가능한 macros 변수는 Airflow 공식 사이트에 위와 같이 사용할 수 있다고 설명되어 있다. 이 중 날짜 연산에 유용한 파이썬 라이브러리인 datetime과 dateutil이 보인다. 즉, macro를 잘 쓰기 위해서는 해당 라이브러리에 대한 이해가 필요하다.

 

 

파이썬의 datetime, dateutil 라이브러리

from datetime import datetime
from dateutil import relativedelta

now = datetime(year=2024,month=7,days=23)

now + relativedelta.relativedelta(month=1) # 1월로 변경
now + relativedelta.relativedelta(month=-1) # 1개월 빼기
now + relativedelta.relativedelta(day=1) # 1일로 변경
now + relativedelta.relativedelta(days=-1) # 1일 빼기

now + relativedelta.relativedelta(month=-1) + relativedelta.relativedelta(days=-1) # 1개월 1일 빼기

위와 같이 datetime으로 날짜를 지정한 뒤 dateutil의 relativedelta라는 함수를 이용하면 날짜 연산을 자유롭게 할 수 있다.

 

 

Bash Operator로 macro 실습해보기

 

예시 1)

매월 말일 수행되는 Dag에서 변수 START_DATE는 전월 말일, 변수 END_DATE는 수행 당일 기준 어제로 env 세팅하기

 

예시 2)

매울 둘째 주 수행되는 Dag에서 변수 START_DATE는 2주전 월요일, 변수 END_DATE는 2주 전 토요일로 env 세팅하기

 

위 두 가지 예시를 Bash 오퍼레이터와 Macros를 통해 구현해보도록 할 것이다. (변수는 YYYY-MM-DD 형식으로 출력예정)

 

# 예시1

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_1",
    schedule="10 0 L * *", # 매 월 말일 0시 10분
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["practice"]
) as dag:
    # START_DATE: 전월 말일, END_DATE: 1일 전
    bash_task_1 = BashOperator(
        task_id = 'bash_task_1',
        env = {'START_DATE': '{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}', # .in_timezone() : ()안의 해당하는 시간대로 출력해줌 (기본 설정 = UTC)
               'END_DATE' : '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=1)) | ds }}'} # 수행일로부터 하루 전(1일 빼기)
        bash_command = 'echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )
# 예시 2

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_2",
    schedule="10 0 * * 6#2", # 두 번째 토요일 0시 10분마다
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["practice"]
) as dag:
    # START_DATE: 2주 전 월요일, END_DATE: 2주 전 토요일
    bash_task_2 = BashOperator(
        task_id = 'bash_task_2',
        env = {'START_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=19)) | ds }}', # 배치일로부터 19일 전 = 2주 전 월요일
               'END_DATE' : '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=14)) | ds }}'} # 배치일로부터 14일 전 = 2주 전 토요일
        bash_command = 'echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

 

 

Python Operator로 macro 실습해보기

from airflow import DAG
import pendulum
from airflow.decorators import task



with DAG(
    dag_id="dags_python_with_macro",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
    tags = ['practice']
) as dag:
    
    @task(task_id='task_using_macros',
      # templates_dicr가 params으로 미리 선언됨
      templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}', # 1개월을 빼고 1일로 설정 = 전월 1일을 의미
                      'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}' # 1일로 설정 후 하루를 뺌 = 전월 마지막 날을 의미
     }
    )
    def get_datetime_macro(**kwargs):
        templates_dict = kwargs.get('templates_dict') or {}
        if templates_dict:
            start_date = templates_dict.get('start_date') or 'start_date없음'
            end_date = templates_dict.get('end_date') or 'end_date없음'
            print(start_date)
            print(end_date)


    @task(task_id='task_direct_calc')
    def get_datetime_calc(**kwargs):
        from dateutil.relativedelta import relativedelta # macro를 사용하지 않고 직접 연산

        data_interval_end = kwargs['data_interval_end']
        prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
        prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) +  relativedelta(days=-1)
        print(prev_month_day_first.strftime('%Y-%m-%d'))
        print(prev_month_day_last.strftime('%Y-%m-%d'))

    get_datetime_macro() >> get_datetime_calc()

크게 두 가지 방법을 사용해보았다.

 

macro 템플릿을 사용해 날짜를 연산하는 과정 (get_datetime_macro 함수)과 라이브러리를 호출해 python 내에서 직접 연산하는 과정 (get_datetime_calc 함수)을 사용해보았다. 두 과정의 계산 결과는 같지만, 각 task의 방법이 다를 뿐이다.

 

두 방법은 편의성이나 시간에서 큰 차이는 없기 때문에, 편한 방법을 골라서 사용하면 된다.

728x90