[Airflow] Custom Operator 개발해보기 (BaseOperator)

2024. 7. 26. 15:15·Minding's Programming/Airflow
728x90
반응형

Airflow는 오퍼레이터를 직접 만들어 사용할 수 있도록 클래스를 제공해주고 있다. 이런 활용성이 Airflow의 가장 큰 장점이라고 할 수 있는 만큼, BaseOperator를 활용한 Custom Operator를 개발에 대해 공부해보고자 한다.

 

BaseOperator 상속 시 재정의할 메서드

 

위에서 말했듯이 Custom Operator를 만들기 위해 Airflow는 BaseOperator라는 클래스를 제공해주고 있다.

BaseOperator를 상속(Overriding)받을 때는 아래의 두 가지 메서드를 재정의해야 한다.

  • def __init__ (객체 생성자 함수): 커스텀 오퍼레이터에 들어갈 파라미터 등을 설정
  • def execute(self, context) : __init__ 생성자로 객체 얻은 후 execute 메서드를 실행시키도록 설정되어 있기 때문에, 비즈니스 로직은 execute에 구현 필요

 

CustomOperator 개발 전 유의할 사항

 

BaseOperator를 통해 Custom Operator를 개발하기 전 유의할 사항이 몇 가지 있다. 이 점들을 확인한 뒤 개발을 진행하자.

  • 오퍼레이터 기능이 제대로 정의되어 있는가?
  • 기존 오퍼레이터로 충분히 대체 가능한가? (가능할 경우 기존 오퍼레이터 사용)
  • 오퍼레이터와 dag의 폴더 위치 확인
    • dag: /dags 폴더
    • 오퍼레이터: /plugins/operators 폴더 (import할 때 경로 확인)

나는 지난 시간 서울시 공공데이터 API 이용해보기 에서 endpoint를 이용해 10개까지 밖에 불러오지 못하는 SimpleHttpOperator를 대신해 전체 데이터를 API를 통해 받은 후 csv파일로 저장하는 Operator를 개발하고자 한다.

 

 

CustomOperator 생성 관련 Airflow 공식 문서 확인

 

https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

 

Creating a custom Operator — Airflow Documentation

 

airflow.apache.org

공식문서에서는 예시 코드와 더불어 Custom Operator를 만드는 데 다양한 정보를 얻을 수 있다.

# 예시 코드
from airflow.models.baseoperator import BaseOperator


class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs) # task_id 등 기본 파라미터가 담겨져있는 **kwargs를 super()함수를 통해 BaseOperator에 전달함
        self.name = name # 파라미터 지정

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

 

또한 template을 사용할 수 있는 파라미터도 아래와 같이 지정해줄 수 있다.

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("name",) # Template 사용할 파라미터 지정

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

 

 

서울시 공공 데이터 API 활용하는 Custom Operator 개발해보기

class SeoulApiToCsvOperator(BaseOperator): # 클래스 상속
    template_fields = ('endpoint', 'path', 'file_name', 'base_dt') # 템플릿 문법 사용 가능한 파라미터 설정

    def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs): # 4개의 파라미터를 User에게 받을 예정
        super().__init__(**kwargs)
        self.http_conn_id = 'openapi.seoul.go.kr' # 커넥션 ID를 미리 지정 (input으로 받지 않고 이것만 사용)
        self.path = path # 파일을 저장할 경로
        self.file_name = file_name
        self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
        self.base_dt = base_dt

우선 __init__ 함수부터 재정의해 주었다. 사용자로부터 받아야 할 파라미터는 dataset_nm, path, file_name, base_dt로 총 4가지 이다. (base_dt는 기본값이 None으로 입력하지 않으면 None 처리)

task_id는 **kwargs에 포함된 파라미터이기 때문에, 위에서 설명했듯 super() 함수를 통해 부모 클래스인 BaseOperator에 전달한다.

또한 이 오퍼레이터는 서울시 공공데이터에 한정해 사용하는 것이기 때문에, http_conn_id의 커넥션 ID와 endpoint의 api key를 입력하는 부분은 webserver에서 불러올 수 있도록 했다.

 

    def execute(self, context):
        import os

        connection = BaseHook.get_connection(self.http_conn_id) # BaseHook의 get_connection을 이용해 Webserver의 커넥션 정보를 꺼내올 수 있음
        self.base_url = f'{connection.host}:{connection.port}/{self.endpoint}' # connection에 저장된 값 잘 확인할 것!

        total_row_df = pd.DataFrame()
        start_row = 1
        end_row = 1000

        while True:
            self.log.info(f'시작:{start_row}')
            self.log.info(f'끝:{end_row}')
            row_df = self._call_api(self.base_url, start_row, end_row) # 아래 선언한 _call_api 함수를 통해 api 호출
            total_row_df = pd.concat([total_row_df, row_df])
            if len(row_df) < 1000: # 1000개씩 호출하는데, 1000개 이하라면 더 이상 긁어올 데이터가 없다는 것을 뜻함
                break
            else:
                start_row = end_row + 1
                end_row += 1000 # 1000개 더 호출

        if not os.path.exists(self.path):
            os.system(f'mkdir -p {self.path}')
        total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False)

그 다음 실질적으로 해당 오퍼레이터가 동작하는 execute 함수에 대한 부분을 작성했다. 우선 BaseHook의 get_connection 함수를 이용해 webserver에 저장된 Connection ID에 대한 정보들을 불러와준다. (host, port 등) 그리고 해당 정보들을 토대로 API 호출을 위한 링크를 만든다.

 

그 뒤 빈 데이터프레임을 만들어준다.(csv 파일로 저장할 데이터프레임) 이후 시작 행 1, 끝 행 1000으로 설정한 API 호출 링크를 _call_api 함수로 보내준다. 이 함수는 직접 작성한 함수로, 아래에서 따로 설명하도록 하겠다.

 

이 함수를 통해 dataframe 형태의 데이터를 받게 된다. 위에서 만들어준 데이터프레임에 concat을 통해 합쳐준다. 이 df의 길이가 1000 미만이라면, 1000개씩 데이터를 호출했지만 더 이상 가져올 데이터가 없다는 뜻으로, 그대로 파일 경로를 설정해준 뒤 csv파일로 저장한다.

 

    def _call_api(self, base_url, start_row, end_row):
        import requests
        import json
        
        headers = {'Content-Type': 'application/json',
                   'charset': 'utf-8',
                   'Accept': '*/*'
                   }
        
        request_url = f'{base_url}/{start_row}/{end_row}/' # 가져올 데이터 수에 대한 링크 추가
        if self.base_dt is not None:
            request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
        response = requests.get(request_url, headers) # 위 링크를 토대로 get 명령 호출
        contents = json.loads(response.text) # 위에서 받은 데이터를 json으로 load

        key_nm = list(contents.keys())[0]
        row_data = contents.get(key_nm).get('row') # 실질적인 데이터가 있는 row 키의 value값을 불러옴
        row_df = pd.DataFrame(row_data) # 해당 데이터를 dataframe으로 저장

        return row_df

위에서 사용한 _call_api 함수이다. requests 라이브러리를 통해 위 API 호출 링크를 가지고 GET 명령을 통해 데이터를 받아오며, 이를 json 라이브러리를 통해 묶어준 뒤 df 형태로 반환해준다.

 

 

Custom Operator를 이용한 DAG 만들기

from operators.seoul_api_to_csv import SeoulApiToCsvOperator
from airflow import DAG
import pendulum

with DAG(
    dag_id='dags_seoul_api',
    schedule='0 7 * * *',
    start_date=pendulum.datetime(2024,7,1, tz='Asia/Seoul'),
    tags = ['practice'],
    catchup=False
) as dag:
    '''서울시 10분 강우량 동향'''
    seoul_rain_fall_10m = SeoulApiToCsvOperator(
        task_id = 'seoul_rain_fall_10m',
        dataset_nm='ListRainfallService',
        path='/home/minding/airflow/files/seoul_rainfall_10m/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}', # files 폴더 연결 필요
        file_name='Seoul_Rainfall_10m.csv'
    )

    '''서울시 일일 강수량 현향'''
    ts_rainfall_data = SeoulApiToCsvOperator(
        task_id = 'ts_rainfall_data',
        dataset_nm='tsRainfallData',
        path='/home/minding/airflow/files/ts_rainfall_data/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}',
        file_name='ts_rainfall_data.csv'
    )

    seoul_rain_fall_10m >> ts_rainfall_data

airflow는 기본적으로 /plugins 경로까지 파악하므로 /operators 폴더 내 존재하는 커스텀 오퍼레이터도 operators.{py 파일 이름}으로 import해줄 수 있다. 이후 다른 DAG을 만들던 것처럼 똑같이 작성해주면 된다.

 

하지만 파일을 저장해주는 경로를 지정할 때 유의할 점이 있다. 이 DAG이 실행되는 주체는 docker 내 worker라는 컨테이너에서 수행되기 때문에, 해당 경로에 파일이 저장되더라도 컨테이너가 종료된 뒤에는 파일은 사라지게 되어있다. 이를 방지하기 위해서는 위 파일 경로에 해당하는 '/files' 폴더를 worker 컨테이너와 연결해줄 필요가 있다.

 

현재 WSL 컴퓨터 내 docker-compose.yaml 파일을 다시 들여다보자.

volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins

volumes 부분을 들여다보면, 현재 PROJ_DIR로써 연결된 폴더는 총 4개로, dags,logs,config,plugins이다. 이 폴더는 컨테이너 종료 뒤에도 그대로 남지만, 우리가 파일을 저장하고자 하는 files 폴더는 아직 연결이 되어있지 않은 상태이다.

 

  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/files:/opt/airflow/files

위와 같이 docker-compose.yaml 파일을 수정해준다. (팁: yy(커서가 있는 줄 복사), p(불여넣기)를 이용하면 더 빨리 줄을 추가할 수 있다.)

 

이후 Webserver에 DAG을 올려 실행시켜본다.

오타를 조심하자.

 

 

혹시 실행과정에서 폴더가 생성되지 않는 현상이 나타난다면,

docker-compose.yaml 파일에서 volumes 아래에 있는 user 부분을 아래와 같이 수정하자.

user: "0:0"

주로 권한이 없어 폴더를 생성하지 못하는 것으로 파악되어 root 권한을 주는 것이다.

 

DAG 실행 뒤 files 폴더를 살펴보면

이렇게 폴더가 생성되고 파일이 정상적으로 노출되는 것을 확인할 수 있다!

728x90

'Minding's Programming > Airflow' 카테고리의 다른 글

[Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기  (0) 2024.07.26
[Airflow/Docker] Docker-compose.yaml 파일 해석  (0) 2024.07.26
[Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기  (0) 2024.07.25
[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)  (0) 2024.07.25
[Airflow] Task Group  (1) 2024.07.24
'Minding's Programming/Airflow' 카테고리의 다른 글
  • [Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기
  • [Airflow/Docker] Docker-compose.yaml 파일 해석
  • [Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기
  • [Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)
Minding
Minding
  • Minding
    Today's Minding
    Minding
  • 전체
    오늘
    어제
    • 울고넘는 딥러닝 (278)
      • Minding's Baseball (57)
        • MLB Statcast (29)
        • 머신러닝으로 홈런왕 예측하기 (3)
        • 야구칼럼 (12)
        • 야구 규칙, 용어 (1)
        • 2022-23 질롱 코리아 (8)
        • 류현진 등판경기 (4)
      • Minding's Programming (185)
        • 프로그래머스 코딩테스트 (21)
        • Knowledge (44)
        • Numpy & Pandas (6)
        • Excel (3)
        • Git (1)
        • Pygame (11)
        • CV (3)
        • Tensorflow tutorial (4)
        • Kaggle and Dacon (4)
        • 에러 코드 (8)
        • FastAPI (8)
        • Airflow (29)
        • Crawling (6)
        • Django (14)
        • AWS (18)
        • Spark (5)
      • Minding's Reading (30)
        • 머신러닝 딥러닝에 필요한 기초 수학 with 파이.. (2)
        • 칼만필터는 어렵지 않아 (11)
        • 밑바닥부터 시작하는 딥러닝 (6)
        • 메이저리그 야구 통계학 2e (8)
        • 논문읽기 (2)
        • 빅데이터를 지탱하는 기술 (1)
      • Minding's Life (5)
        • 주식 (4)
        • 각종 소식 (1)
  • 블로그 메뉴

    • 홈
    • Baseball
    • Programming
    • Reading
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    django python
    칼만필터
    파이게임
    칼만필터는어렵지않아
    데이터분석
    Python
    메이저리그
    에어플로우
    KalmanFilter
    AWS
    머신러닝
    pygame
    mlb stats api
    넘파이
    데이터 엔지니어
    질롱코리아
    KBO
    칼만필터는어렵지않아파이썬
    파이썬게임개발
    야구
    코딩테스트
    게임개발
    칼만필터는어렵지않아python
    파이썬
    django
    MLB
    FastAPI
    Airflow
    딥러닝
    프로그래머스
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
Minding
[Airflow] Custom Operator 개발해보기 (BaseOperator)
상단으로

티스토리툴바