본문 바로가기

Minding's Programming/Airflow

[Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)

728x90
반응형

Airflow에서의 센서란?

(공식문서: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html)

 

센서는 오퍼레이터의 일종으로 특정 조건을 파악하는 것에 특화된 오퍼레이터라고 할 수 있다. 미리 설정한 조건이 만족되기를 기다리고, 만족된다면 True를 반환하는 task를 가지고 있다.

 

모든 센서는 BaseSensorOperator를 상속해 구현되며, 상속 시에는 __init()__ 함수와 poke(context) 함수의 재정의가 필요하다. 이 중 센싱(Sensing)하는 로직은 poke 함수에 정의되어야 한다.

 

BaseSensorOperator 기준 파라미터는 아래와 같다.

soft_fail (bool) Timeout으로 인해 failure 처리 대신 skipped 처리 여부
poke_interval (float) sensor task를 작동시키는 주기(특정 조건 달성 확인 주기) (초 단위)
timeout (float) 해당 sensor task가 fail로 처리될 최대 시간 (초 단위)
DAG이 하루 단위일 경우 해당 값도 하루 단위로 넣는 경우가 다수
mode (str) 'poke' 또는 'reschedule' 중에 하나 선택 가능
poke: DAG 수행 내내 Running Slot을 차지함 (짧은 센싱 간격에 유리 / 초 단위)
reschedule: 센서가 동작할 때에만 Slot을 차지 (긴 센싱 간격에 유리 / 분 단위)
exponential_backoff (bool) True로 전달 시 task 주기가 2의 지수승 만큼 늘어남 (초반에는 자주, 후반에는 가끔)
max_wait (timedelta | float | None) exponential_backoff 활성화에만 사용 가능
체크하는 주기의 상한선을 설정할 수 있음 (timedelta or 초 단위 입력)

 

 

 

BashSensor

 

BashSensor docs를 확인해보면 파라미터가 BashOperator와 비슷한 것을 알 수 있다. Sensor 또한 BaseOperator를 상속받아 만든 것이기 때문이다.

 

또한 Return값을 0으로 했을 때 True로 받아들인다고 한다. 다른 값이 출력되면 False 처리가 될 것이다.

파이썬에서 True 값을 줄 때 return True로 코드를 작성하듯이, 쉘 스크립트에서는 exit 0(0~255까지의 STATUS 값이 있지만 0만 정상의 의미)이 이와 같은 의미이다. 쉘 스크립트에서 마지막 명령 수행의 STATUS를 확인하려면 'echo $?'로 확인 가능하다.

 

 

FileSensor

 

File sensor는 파일 시스템 내에 파일 또는 폴더가 들어왔는지 확인하는 역할을 한다.

파라미터는 fs_conn_id, filepath, recursive, deferrable 총 4가지가 있다. fs_conn_id는 해당 파일에 대한 connection id를 말하는 것으로, FileSensor를 사용하기 전에 Connections에 등록이 필요한 것을 알 수 있다.

 

filepath는 센싱(체크)할 파일(폴더)의 경로(Connection에 입력한 Base 경로 뒤의 상대경로로 입력), recursive는 glob을 이용한 하위 파일 경로들의 list를 확인해 해당 파일이 해당 경로에 제대로 들어갔는지를 확인해줄 지의 여부를 뜻한다.

 

* File Connections 등록

등록하는 방법은 매우 간단하다. connection id를 임의로 설정하고, type을 file(path)로 지정해준 다음, path에 BASE 디렉토리 경로를 그대로 적어준다.

 

실습

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
import pendulum

with DAG(
    dag_id='dags_file_sensor',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule='0 7 * * *',
    catchup=False
) as dag:
    tvCorona19VaccinestatNew_sensor = FileSensor(
        task_id='tvCorona19VaccinestatNew_sensor',
        fs_conn_id='conn_file_opt_airflow_files',
        filepath='tvCorona19VaccinestatNew/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/tvCorona19VaccinestatNew.csv',
        recursive=False,
        poke_interval=60,
        timeout=60*60*24, # 1일
        mode='reschedule'
    )

지난 번 작성해둔 코로나 현황 DAG을 통해 만들어진 csv파일이 해당 경로에 있는지 확인하는 FileSensor DAG을 만들고 실행해보았다.

 

파일을 확인한 뒤 정상적으로 처리되는 것을 확인할 수 있다.

728x90