Macro 변수는?
Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다.
Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.
ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때
sql = f'''
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN ?? AND ??
'''
- 배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함
- 전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제
- 매 달마다 '어제 날짜'가 다르기 때문에, 배치 일에서 '하루 뺀' 날짜 값이 필요함
이럴 때 유용하게 사용 가능한 macros 변수는 Airflow 공식 사이트에 위와 같이 사용할 수 있다고 설명되어 있다. 이 중 날짜 연산에 유용한 파이썬 라이브러리인 datetime과 dateutil이 보인다. 즉, macro를 잘 쓰기 위해서는 해당 라이브러리에 대한 이해가 필요하다.
파이썬의 datetime, dateutil 라이브러리
from datetime import datetime
from dateutil import relativedelta
now = datetime(year=2024,month=7,days=23)
now + relativedelta.relativedelta(month=1) # 1월로 변경
now + relativedelta.relativedelta(month=-1) # 1개월 빼기
now + relativedelta.relativedelta(day=1) # 1일로 변경
now + relativedelta.relativedelta(days=-1) # 1일 빼기
now + relativedelta.relativedelta(month=-1) + relativedelta.relativedelta(days=-1) # 1개월 1일 빼기
위와 같이 datetime으로 날짜를 지정한 뒤 dateutil의 relativedelta라는 함수를 이용하면 날짜 연산을 자유롭게 할 수 있다.
Bash Operator로 macro 실습해보기
예시 1)
매월 말일 수행되는 Dag에서 변수 START_DATE는 전월 말일, 변수 END_DATE는 수행 당일 기준 어제로 env 세팅하기
예시 2)
매울 둘째 주 수행되는 Dag에서 변수 START_DATE는 2주전 월요일, 변수 END_DATE는 2주 전 토요일로 env 세팅하기
위 두 가지 예시를 Bash 오퍼레이터와 Macros를 통해 구현해보도록 할 것이다. (변수는 YYYY-MM-DD 형식으로 출력예정)
# 예시1
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_macro_1",
schedule="10 0 L * *", # 매 월 말일 0시 10분
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False,
tags=["practice"]
) as dag:
# START_DATE: 전월 말일, END_DATE: 1일 전
bash_task_1 = BashOperator(
task_id = 'bash_task_1',
env = {'START_DATE': '{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}', # .in_timezone() : ()안의 해당하는 시간대로 출력해줌 (기본 설정 = UTC)
'END_DATE' : '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=1)) | ds }}'} # 수행일로부터 하루 전(1일 빼기)
bash_command = 'echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
# 예시 2
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_macro_2",
schedule="10 0 * * 6#2", # 두 번째 토요일 0시 10분마다
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False,
tags=["practice"]
) as dag:
# START_DATE: 2주 전 월요일, END_DATE: 2주 전 토요일
bash_task_2 = BashOperator(
task_id = 'bash_task_2',
env = {'START_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=19)) | ds }}', # 배치일로부터 19일 전 = 2주 전 월요일
'END_DATE' : '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelat.relativedelat(days=14)) | ds }}'} # 배치일로부터 14일 전 = 2주 전 토요일
bash_command = 'echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
Python Operator로 macro 실습해보기
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_macro",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False,
tags = ['practice']
) as dag:
@task(task_id='task_using_macros',
# templates_dicr가 params으로 미리 선언됨
templates_dict={'start_date':'{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}', # 1개월을 빼고 1일로 설정 = 전월 1일을 의미
'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}' # 1일로 설정 후 하루를 뺌 = 전월 마지막 날을 의미
}
)
def get_datetime_macro(**kwargs):
templates_dict = kwargs.get('templates_dict') or {}
if templates_dict:
start_date = templates_dict.get('start_date') or 'start_date없음'
end_date = templates_dict.get('end_date') or 'end_date없음'
print(start_date)
print(end_date)
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta # macro를 사용하지 않고 직접 연산
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_macro() >> get_datetime_calc()
크게 두 가지 방법을 사용해보았다.
macro 템플릿을 사용해 날짜를 연산하는 과정 (get_datetime_macro 함수)과 라이브러리를 호출해 python 내에서 직접 연산하는 과정 (get_datetime_calc 함수)을 사용해보았다. 두 과정의 계산 결과는 같지만, 각 task의 방법이 다를 뿐이다.
두 방법은 편의성이나 시간에서 큰 차이는 없기 때문에, 편한 방법을 골라서 사용하면 된다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] 전역변수 Variable 이용하기 (1) | 2024.07.23 |
---|---|
[Airflow/Xcom] Airflow에서 Xcom 사용해보기 (0) | 2024.07.23 |
[Airflow] Airflow의 날짜 개념 (0) | 2024.07.23 |
[Python/Jinja/Airflow] Jinja 템플릿과 Airflow에서의 사용방법 (2) | 2024.07.22 |
[Airflow] Python Operator의 op_args, op_kwargs (1) | 2024.07.22 |