[Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)

2024. 7. 31. 17:38·Minding's Programming/Airflow
목차
  1. BashSensor
  2. FileSensor
728x90
반응형

Airflow에서의 센서란?

(공식문서: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html)

 

센서는 오퍼레이터의 일종으로 특정 조건을 파악하는 것에 특화된 오퍼레이터라고 할 수 있다. 미리 설정한 조건이 만족되기를 기다리고, 만족된다면 True를 반환하는 task를 가지고 있다.

 

모든 센서는 BaseSensorOperator를 상속해 구현되며, 상속 시에는 __init()__ 함수와 poke(context) 함수의 재정의가 필요하다. 이 중 센싱(Sensing)하는 로직은 poke 함수에 정의되어야 한다.

 

BaseSensorOperator 기준 파라미터는 아래와 같다.

soft_fail (bool) Timeout으로 인해 failure 처리 대신 skipped 처리 여부
poke_interval (float) sensor task를 작동시키는 주기(특정 조건 달성 확인 주기) (초 단위)
timeout (float) 해당 sensor task가 fail로 처리될 최대 시간 (초 단위)
DAG이 하루 단위일 경우 해당 값도 하루 단위로 넣는 경우가 다수
mode (str) 'poke' 또는 'reschedule' 중에 하나 선택 가능
poke: DAG 수행 내내 Running Slot을 차지함 (짧은 센싱 간격에 유리 / 초 단위)
reschedule: 센서가 동작할 때에만 Slot을 차지 (긴 센싱 간격에 유리 / 분 단위)
exponential_backoff (bool) True로 전달 시 task 주기가 2의 지수승 만큼 늘어남 (초반에는 자주, 후반에는 가끔)
max_wait (timedelta | float | None) exponential_backoff 활성화에만 사용 가능
체크하는 주기의 상한선을 설정할 수 있음 (timedelta or 초 단위 입력)

 

 

 

BashSensor

 

BashSensor docs를 확인해보면 파라미터가 BashOperator와 비슷한 것을 알 수 있다. Sensor 또한 BaseOperator를 상속받아 만든 것이기 때문이다.

 

또한 Return값을 0으로 했을 때 True로 받아들인다고 한다. 다른 값이 출력되면 False 처리가 될 것이다.

파이썬에서 True 값을 줄 때 return True로 코드를 작성하듯이, 쉘 스크립트에서는 exit 0(0~255까지의 STATUS 값이 있지만 0만 정상의 의미)이 이와 같은 의미이다. 쉘 스크립트에서 마지막 명령 수행의 STATUS를 확인하려면 'echo $?'로 확인 가능하다.

 

 

FileSensor

 

File sensor는 파일 시스템 내에 파일 또는 폴더가 들어왔는지 확인하는 역할을 한다.

파라미터는 fs_conn_id, filepath, recursive, deferrable 총 4가지가 있다. fs_conn_id는 해당 파일에 대한 connection id를 말하는 것으로, FileSensor를 사용하기 전에 Connections에 등록이 필요한 것을 알 수 있다.

 

filepath는 센싱(체크)할 파일(폴더)의 경로(Connection에 입력한 Base 경로 뒤의 상대경로로 입력), recursive는 glob을 이용한 하위 파일 경로들의 list를 확인해 해당 파일이 해당 경로에 제대로 들어갔는지를 확인해줄 지의 여부를 뜻한다.

 

* File Connections 등록

등록하는 방법은 매우 간단하다. connection id를 임의로 설정하고, type을 file(path)로 지정해준 다음, path에 BASE 디렉토리 경로를 그대로 적어준다.

 

실습

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
import pendulum

with DAG(
    dag_id='dags_file_sensor',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule='0 7 * * *',
    catchup=False
) as dag:
    tvCorona19VaccinestatNew_sensor = FileSensor(
        task_id='tvCorona19VaccinestatNew_sensor',
        fs_conn_id='conn_file_opt_airflow_files',
        filepath='tvCorona19VaccinestatNew/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/tvCorona19VaccinestatNew.csv',
        recursive=False,
        poke_interval=60,
        timeout=60*60*24, # 1일
        mode='reschedule'
    )

지난 번 작성해둔 코로나 현황 DAG을 통해 만들어진 csv파일이 해당 경로에 있는지 확인하는 FileSensor DAG을 만들고 실행해보았다.

 

파일을 확인한 뒤 정상적으로 처리되는 것을 확인할 수 있다.

728x90

'Minding's Programming > Airflow' 카테고리의 다른 글

[Airflow/Slack] Airflow DAG 실패 시 Slack으로 알림 보내기  (0) 2024.11.19
[Airflow] CustomSensor 만들어 활용하기  (0) 2024.08.05
[Airflow] Connection Type 신규 추가하는 방법  (0) 2024.07.31
[Airflow] Postgres Hook bulk_load 문제점, Custom Hook으로 bulk_load() 해보기  (0) 2024.07.29
[Airflow] Connection과 Hook  (0) 2024.07.27
  1. BashSensor
  2. FileSensor
'Minding's Programming/Airflow' 카테고리의 다른 글
  • [Airflow/Slack] Airflow DAG 실패 시 Slack으로 알림 보내기
  • [Airflow] CustomSensor 만들어 활용하기
  • [Airflow] Connection Type 신규 추가하는 방법
  • [Airflow] Postgres Hook bulk_load 문제점, Custom Hook으로 bulk_load() 해보기
Minding
Minding
Today's MindingMinding 님의 블로그입니다.
  • Minding
    Today's Minding
    Minding
  • 전체
    오늘
    어제
    • 울고넘는 딥러닝 (278)
      • Minding's Baseball (57)
        • MLB Statcast (29)
        • 머신러닝으로 홈런왕 예측하기 (3)
        • 야구칼럼 (12)
        • 야구 규칙, 용어 (1)
        • 2022-23 질롱 코리아 (8)
        • 류현진 등판경기 (4)
      • Minding's Programming (185)
        • 프로그래머스 코딩테스트 (21)
        • Knowledge (44)
        • Numpy & Pandas (6)
        • Excel (3)
        • Git (1)
        • Pygame (11)
        • CV (3)
        • Tensorflow tutorial (4)
        • Kaggle and Dacon (4)
        • 에러 코드 (8)
        • FastAPI (8)
        • Airflow (29)
        • Crawling (6)
        • Django (14)
        • AWS (18)
        • Spark (5)
      • Minding's Reading (30)
        • 머신러닝 딥러닝에 필요한 기초 수학 with 파이.. (2)
        • 칼만필터는 어렵지 않아 (11)
        • 밑바닥부터 시작하는 딥러닝 (6)
        • 메이저리그 야구 통계학 2e (8)
        • 논문읽기 (2)
        • 빅데이터를 지탱하는 기술 (1)
      • Minding's Life (5)
        • 주식 (4)
        • 각종 소식 (1)
  • 블로그 메뉴

    • 홈
    • Baseball
    • Programming
    • Reading
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    파이썬
    칼만필터
    프로그래머스
    칼만필터는어렵지않아python
    KalmanFilter
    django
    AWS
    KBO
    파이썬게임개발
    칼만필터는어렵지않아
    야구
    메이저리그
    코딩테스트
    MLB
    mlb stats api
    칼만필터는어렵지않아파이썬
    넘파이
    데이터분석
    데이터 엔지니어
    pygame
    django python
    Python
    머신러닝
    파이게임
    에어플로우
    Airflow
    질롱코리아
    게임개발
    FastAPI
    딥러닝
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
Minding
[Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.