본문 바로가기

Minding's Programming/Airflow

[Airflow] CustomSensor 만들어 활용하기

728x90
반응형

Sensor는 지난 포스팅에서 알 수 있듯 특정 DAG 또는 task의 실행 여부를 판단하는 역할을 한다. (참고: Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)

 

[Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)

Airflow에서의 센서란?(공식문서: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html) 센서는 오퍼레이터의 일종으로 특정 조건을 파악하는 것에 특화된 오퍼레이터라고 할 수 있다. 미

minding-deep-learning.tistory.com

 

이번에는 Custom Sensor를 만들어 활용하는 방법에 대해 공부해보려고 한다. 지난 번 Python Sensor의 내용을 Custom화 하여 만들 것이다. (특정 날짜가 데이터셋 내 존재하는 지 확인하는 sensor)

 

Custom Sensor는 특정 업무의 재활용성이 높아 다른 DAG에서도 활용될 가능성이 높다면 가급적 만드는 것이 좋다고 한다. 협업 환경에서 중복된 코드를 방지할 수 있고, 로직의 일원화도 할 수 있기 때문이다.

 

Custom sensor 또한 custom operator, custom hook을 만들 때와 동일하게 plugins 폴더 아래에 sensors라는 폴더를 만들고, 그 폴더 내에 custom화한 sensor 코드 파일을 만들어주어야 한다.

 

 

 

Custom Sensor 코드

 

from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.base import BaseHook

'''
서울시 공공데이터 API 추출시 특정 날짜 컬럼을 조사하여 
배치 날짜 기준 전날 데이터가 존재하는지 체크하는 센서 
1. 데이터셋에 날짜 컬럼이 존재하고 
2. API 사용시 그 날짜 컬럼으로 ORDER BY DESC 되어 가져온다는 가정하에 사용 가능
'''

class SeoulApiDateSensor(BaseSensorOperator): # BaseSensorOperator를 상속 받음
    template_fields = ('endpoint',)
    def __init__(self, dataset_nm, base_dt_col, day_off=0, **kwargs): # 생성자 재정의
        '''
        dataset_nm: 서울시 공공데이터 포털에서 센싱하고자 하는 데이터셋 명
        base_dt_col: 센싱 기준 컬럼 (yyyy.mm.dd... or yyyy/mm/dd... 형태만 가능)
        day_off: 배치일 기준 생성여부를 확인하고자 하는 날짜 차이를 입력 (기본값: 0)
        '''
        super().__init__(**kwargs)
        self.http_conn_id = 'openapi.seoul.go.kr'
        self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm + '/1/100'   # 100건만 추출
        self.base_dt_col = base_dt_col
        self.day_off = day_off

        
    def poke(self, context): # poke 재정의
        import requests
        import json
        from dateutil.relativedelta import relativedelta
        connection = BaseHook.get_connection(self.http_conn_id)
        url = f'http://{connection.host}:{connection.port}/{self.endpoint}'
        self.log.info(f'request url:{url}')
        response = requests.get(url)

        contents = json.loads(response.text)
        key_nm = list(contents.keys())[0]
        row_data = contents.get(key_nm).get('row')
        last_dt = row_data[0].get(self.base_dt_col)
        last_date = last_dt[:10]
        last_date = last_date.replace('.', '-').replace('/', '-')
        search_ymd = (context.get('data_interval_end').in_timezone('Asia/Seoul') + relativedelta(days=self.day_off)).strftime('%Y-%m-%d')
        try:
            import pendulum
            pendulum.from_format(last_date, 'YYYY-MM-DD')
        except:
            from airflow.exceptions import AirflowException
            AirflowException(f'{self.base_dt_col} 컬럼은 YYYY.MM.DD 또는 YYYY/MM/DD 형태가 아닙니다.')

        
        if last_date >= search_ymd:
            self.log.info(f'생성 확인(기준 날짜: {search_ymd} / API Last 날짜: {last_date})')
            return True
        else:
            self.log.info(f'Update 미완료 (기준 날짜: {search_ymd} / API Last 날짜:{last_date})')
            return False

지난 번 Python Sensor 코드와 큰 차이는 없으나 dayoff라는 파라미터가 추가되어 배치일 기준 이전 n일이 데이터셋 날짜 컬럼에 포함되어있는지 확인할 수 있게 했다.

 

 

Custom sensor를 DAG에 적용

 

from sensors.seoul_api_date_sensor import SeoulApiDateSensor
from airflow import DAG
import pendulum

with DAG(
    dag_id='dags_custom_sensor',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:
    tb_corona_19_count_status_sensor = SeoulApiDateSensor(
        task_id='tb_corona_19_count_status_sensor',
        dataset_nm='TbCorona19CountStatus',
        base_dt_col='S_DT',
        day_off=0,
        poke_interval=600,
        mode='reschedule'
    )
    
    tv_corona19_vaccine_stat_new_sensor = SeoulApiDateSensor(
        task_id='tv_corona19_vaccine_stat_new_sensor',
        dataset_nm='tvCorona19VaccinestatNew',
        base_dt_col='S_VC_DT',
        day_off=-1,
        poke_interval=600,
        mode='reschedule'
    )

 

이후 웹서버에서 DAG 실행을 확인해보았다.

 

두 가지 모두 up_for_reschedule 상태로 DAG이 실행을 마쳤다. 어떤 이유때문일까?

 

로그를 살펴보니, 서울시 공공데이터 api에서 제공하는 해당 데이터는 2023년 5월 31일 이후로 데이터를 제공하지 않고 있었다. 따라서, 기준 날짜(오늘 날짜)와 API의 최신 날짜가 동일하지 않아 up_for_reschedule 상태로 넘어간 것이다.

 

 

728x90