본문 바로가기

Minding's Programming/Airflow

[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기

728x90
반응형

Macro 변수는?

 

Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다.

 

Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.

ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때

sql = f'''
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN ?? AND ??
'''
  • 배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함
  • 전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제
  • 매 달마다 '어제 날짜'가 다르기 때문에, 배치 일에서 '하루 뺀' 날짜 값이 필요함

이럴 때 유용하게 사용 가능한 macros 변수는 Airflow 공식 사이트에 위와 같이 사용할 수 있다고 설명되어 있다. 이 중 날짜 연산에 유용한 파이썬 라이브러리인 datetime과 dateutil이 보인다. 즉, macro를 잘 쓰기 위해서는 해당 라이브러리에 대한 이해가 필요하다.

 

 

파이썬의 datetime, dateutil 라이브러리

from datetime import datetime
from dateutil import relativedelta

now = datetime(year=2024,month=7,days=23)

now + relativedelta.relativedelta(month=1) # 1월로 변경
now + relativedelta.relativedelta(month=-1) # 1개월 빼기
now + relativedelta.relativedelta(day=1) # 1일로 변경
now + relativedelta.relativedelta(days=-1) # 1일 빼기

now + relativedelta.relativedelta(month=-1) + relativedelta.relativedelta(days=-1) # 1개월 1일 빼기

위와 같이 datetime으로 날짜를 지정한 뒤 dateutil의 relativedelta라는 함수를 이용하면 날짜 연산을 자유롭게 할 수 있다.

 

 

Bash Operator로 macro 실습해보기

 

예시 1)

매월 말일 수행되는 Dag에서 변수 START_DATE는 전월 말일, 변수 END_DATE는 수행 당일 기준 어제로 env 세팅하기

 

예시 2)

매울 둘째 주 수행되는 Dag에서 변수 START_DATE는 2주전 월요일, 변수 END_DATE는 2주 전 토요일로 env 세팅하기

 

위 두 가지 예시를 Bash 오퍼레이터와 Macros를 통해 구현해보도록 할 것이다. (변수는 YYYY-MM-DD 형식으로 출력예정)

 

# 예시1

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_1",
    schedule="10 0 L * *", # 매 월 말일 0시 10분
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["practice"]
) as dag:
    # START_DATE: 전월 말일, END_DATE: 1일 전
    bash_task_1 = BashOperator(
        task_id = 'bash_task_1',
        env = {'START_DATE': '{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}', # .in_timezone() : ()안의 해당하는 시간대로 출력해줌 (기본 설정 = UTC)
               'END_DATE' : '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=1)) | ds }}'} # 수행일로부터 하루 전(1일 빼기)
        bash_command = 'echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )
# 예시 2

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_2",
    schedule="10 0 * * 6#2", # 두 번째 토요일 0시 10분마다
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["practice"]
) as dag:
    # START_DATE: 2주 전 월요일, END_DATE: 2주 전 토요일
    bash_task_2 = BashOperator(
        task_id = 'bash_task_2',
        env = {'START_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=19)) | ds }}', # 배치일로부터 19일 전 = 2주 전 월요일
               'END_DATE' : '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=14)) | ds }}'} # 배치일로부터 14일 전 = 2주 전 토요일
        bash_command = 'echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

 

 

Python Operator로 macro 실습해보기

from airflow import DAG
import pendulum
from airflow.decorators import task



with DAG(
    dag_id="dags_python_with_macro",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
    tags = ['practice']
) as dag:
    
    @task(task_id='task_using_macros',
      # templates_dicr가 params으로 미리 선언됨
      templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}', # 1개월을 빼고 1일로 설정 = 전월 1일을 의미
                      'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}' # 1일로 설정 후 하루를 뺌 = 전월 마지막 날을 의미
     }
    )
    def get_datetime_macro(**kwargs):
        templates_dict = kwargs.get('templates_dict') or {}
        if templates_dict:
            start_date = templates_dict.get('start_date') or 'start_date없음'
            end_date = templates_dict.get('end_date') or 'end_date없음'
            print(start_date)
            print(end_date)


    @task(task_id='task_direct_calc')
    def get_datetime_calc(**kwargs):
        from dateutil.relativedelta import relativedelta # macro를 사용하지 않고 직접 연산

        data_interval_end = kwargs['data_interval_end']
        prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
        prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) +  relativedelta(days=-1)
        print(prev_month_day_first.strftime('%Y-%m-%d'))
        print(prev_month_day_last.strftime('%Y-%m-%d'))

    get_datetime_macro() >> get_datetime_calc()

크게 두 가지 방법을 사용해보았다.

 

macro 템플릿을 사용해 날짜를 연산하는 과정 (get_datetime_macro 함수)과 라이브러리를 호출해 python 내에서 직접 연산하는 과정 (get_datetime_calc 함수)을 사용해보았다. 두 과정의 계산 결과는 같지만, 각 task의 방법이 다를 뿐이다.

 

두 방법은 편의성이나 시간에서 큰 차이는 없기 때문에, 편한 방법을 골라서 사용하면 된다.

728x90