Sensor는 지난 포스팅에서 알 수 있듯 특정 DAG 또는 task의 실행 여부를 판단하는 역할을 한다. (참고: Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)
이번에는 Custom Sensor를 만들어 활용하는 방법에 대해 공부해보려고 한다. 지난 번 Python Sensor의 내용을 Custom화 하여 만들 것이다. (특정 날짜가 데이터셋 내 존재하는 지 확인하는 sensor)
Custom Sensor는 특정 업무의 재활용성이 높아 다른 DAG에서도 활용될 가능성이 높다면 가급적 만드는 것이 좋다고 한다. 협업 환경에서 중복된 코드를 방지할 수 있고, 로직의 일원화도 할 수 있기 때문이다.
Custom sensor 또한 custom operator, custom hook을 만들 때와 동일하게 plugins 폴더 아래에 sensors라는 폴더를 만들고, 그 폴더 내에 custom화한 sensor 코드 파일을 만들어주어야 한다.
Custom Sensor 코드
from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.base import BaseHook
'''
서울시 공공데이터 API 추출시 특정 날짜 컬럼을 조사하여
배치 날짜 기준 전날 데이터가 존재하는지 체크하는 센서
1. 데이터셋에 날짜 컬럼이 존재하고
2. API 사용시 그 날짜 컬럼으로 ORDER BY DESC 되어 가져온다는 가정하에 사용 가능
'''
class SeoulApiDateSensor(BaseSensorOperator): # BaseSensorOperator를 상속 받음
template_fields = ('endpoint',)
def __init__(self, dataset_nm, base_dt_col, day_off=0, **kwargs): # 생성자 재정의
'''
dataset_nm: 서울시 공공데이터 포털에서 센싱하고자 하는 데이터셋 명
base_dt_col: 센싱 기준 컬럼 (yyyy.mm.dd... or yyyy/mm/dd... 형태만 가능)
day_off: 배치일 기준 생성여부를 확인하고자 하는 날짜 차이를 입력 (기본값: 0)
'''
super().__init__(**kwargs)
self.http_conn_id = 'openapi.seoul.go.kr'
self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm + '/1/100' # 100건만 추출
self.base_dt_col = base_dt_col
self.day_off = day_off
def poke(self, context): # poke 재정의
import requests
import json
from dateutil.relativedelta import relativedelta
connection = BaseHook.get_connection(self.http_conn_id)
url = f'http://{connection.host}:{connection.port}/{self.endpoint}'
self.log.info(f'request url:{url}')
response = requests.get(url)
contents = json.loads(response.text)
key_nm = list(contents.keys())[0]
row_data = contents.get(key_nm).get('row')
last_dt = row_data[0].get(self.base_dt_col)
last_date = last_dt[:10]
last_date = last_date.replace('.', '-').replace('/', '-')
search_ymd = (context.get('data_interval_end').in_timezone('Asia/Seoul') + relativedelta(days=self.day_off)).strftime('%Y-%m-%d')
try:
import pendulum
pendulum.from_format(last_date, 'YYYY-MM-DD')
except:
from airflow.exceptions import AirflowException
AirflowException(f'{self.base_dt_col} 컬럼은 YYYY.MM.DD 또는 YYYY/MM/DD 형태가 아닙니다.')
if last_date >= search_ymd:
self.log.info(f'생성 확인(기준 날짜: {search_ymd} / API Last 날짜: {last_date})')
return True
else:
self.log.info(f'Update 미완료 (기준 날짜: {search_ymd} / API Last 날짜:{last_date})')
return False
지난 번 Python Sensor 코드와 큰 차이는 없으나 dayoff라는 파라미터가 추가되어 배치일 기준 이전 n일이 데이터셋 날짜 컬럼에 포함되어있는지 확인할 수 있게 했다.
Custom sensor를 DAG에 적용
from sensors.seoul_api_date_sensor import SeoulApiDateSensor
from airflow import DAG
import pendulum
with DAG(
dag_id='dags_custom_sensor',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
tb_corona_19_count_status_sensor = SeoulApiDateSensor(
task_id='tb_corona_19_count_status_sensor',
dataset_nm='TbCorona19CountStatus',
base_dt_col='S_DT',
day_off=0,
poke_interval=600,
mode='reschedule'
)
tv_corona19_vaccine_stat_new_sensor = SeoulApiDateSensor(
task_id='tv_corona19_vaccine_stat_new_sensor',
dataset_nm='tvCorona19VaccinestatNew',
base_dt_col='S_VC_DT',
day_off=-1,
poke_interval=600,
mode='reschedule'
)
이후 웹서버에서 DAG 실행을 확인해보았다.
두 가지 모두 up_for_reschedule 상태로 DAG이 실행을 마쳤다. 어떤 이유때문일까?
로그를 살펴보니, 서울시 공공데이터 api에서 제공하는 해당 데이터는 2023년 5월 31일 이후로 데이터를 제공하지 않고 있었다. 따라서, 기준 날짜(오늘 날짜)와 API의 최신 날짜가 동일하지 않아 up_for_reschedule 상태로 넘어간 것이다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] Airflow API를 통해 모니터링하기 (0) | 2024.11.19 |
---|---|
[Airflow/Slack] Airflow DAG 실패 시 Slack으로 알림 보내기 (0) | 2024.11.19 |
[Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor) (0) | 2024.07.31 |
[Airflow] Connection Type 신규 추가하는 방법 (0) | 2024.07.31 |
[Airflow] Postgres Hook bulk_load 문제점, Custom Hook으로 bulk_load() 해보기 (0) | 2024.07.29 |