본문 바로가기
Minding's Programming/Airflow

[Airflow] Custom Operator 개발해보기 (BaseOperator)

by Minding 2024. 7. 26.
728x90
반응형

Airflow는 오퍼레이터를 직접 만들어 사용할 수 있도록 클래스를 제공해주고 있다. 이런 활용성이 Airflow의 가장 큰 장점이라고 할 수 있는 만큼, BaseOperator를 활용한 Custom Operator를 개발에 대해 공부해보고자 한다.

 

BaseOperator 상속 시 재정의할 메서드

 

위에서 말했듯이 Custom Operator를 만들기 위해 Airflow는 BaseOperator라는 클래스를 제공해주고 있다.

BaseOperator를 상속(Overriding)받을 때는 아래의 두 가지 메서드를 재정의해야 한다.

  • def __init__ (객체 생성자 함수): 커스텀 오퍼레이터에 들어갈 파라미터 등을 설정
  • def execute(self, context) : __init__ 생성자로 객체 얻은 후 execute 메서드를 실행시키도록 설정되어 있기 때문에, 비즈니스 로직은 execute에 구현 필요

 

CustomOperator 개발 전 유의할 사항

 

BaseOperator를 통해 Custom Operator를 개발하기 전 유의할 사항이 몇 가지 있다. 이 점들을 확인한 뒤 개발을 진행하자.

  • 오퍼레이터 기능이 제대로 정의되어 있는가?
  • 기존 오퍼레이터로 충분히 대체 가능한가? (가능할 경우 기존 오퍼레이터 사용)
  • 오퍼레이터와 dag의 폴더 위치 확인
    • dag: /dags 폴더
    • 오퍼레이터: /plugins/operators 폴더 (import할 때 경로 확인)

나는 지난 시간 서울시 공공데이터 API 이용해보기 에서 endpoint를 이용해 10개까지 밖에 불러오지 못하는 SimpleHttpOperator를 대신해 전체 데이터를 API를 통해 받은 후 csv파일로 저장하는 Operator를 개발하고자 한다.

 

 

CustomOperator 생성 관련 Airflow 공식 문서 확인

 

https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

 

Creating a custom Operator — Airflow Documentation

 

airflow.apache.org

공식문서에서는 예시 코드와 더불어 Custom Operator를 만드는 데 다양한 정보를 얻을 수 있다.

# 예시 코드
from airflow.models.baseoperator import BaseOperator


class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs) # task_id 등 기본 파라미터가 담겨져있는 **kwargs를 super()함수를 통해 BaseOperator에 전달함
        self.name = name # 파라미터 지정

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

 

또한 template을 사용할 수 있는 파라미터도 아래와 같이 지정해줄 수 있다.

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("name",) # Template 사용할 파라미터 지정

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

 

 

서울시 공공 데이터 API 활용하는 Custom Operator 개발해보기

class SeoulApiToCsvOperator(BaseOperator): # 클래스 상속
    template_fields = ('endpoint', 'path', 'file_name', 'base_dt') # 템플릿 문법 사용 가능한 파라미터 설정

    def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs): # 4개의 파라미터를 User에게 받을 예정
        super().__init__(**kwargs)
        self.http_conn_id = 'openapi.seoul.go.kr' # 커넥션 ID를 미리 지정 (input으로 받지 않고 이것만 사용)
        self.path = path # 파일을 저장할 경로
        self.file_name = file_name
        self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
        self.base_dt = base_dt

우선 __init__ 함수부터 재정의해 주었다. 사용자로부터 받아야 할 파라미터는 dataset_nm, path, file_name, base_dt로 총 4가지 이다. (base_dt는 기본값이 None으로 입력하지 않으면 None 처리)

task_id는 **kwargs에 포함된 파라미터이기 때문에, 위에서 설명했듯 super() 함수를 통해 부모 클래스인 BaseOperator에 전달한다.

또한 이 오퍼레이터는 서울시 공공데이터에 한정해 사용하는 것이기 때문에, http_conn_id의 커넥션 ID와 endpoint의 api key를 입력하는 부분은 webserver에서 불러올 수 있도록 했다.

 

    def execute(self, context):
        import os

        connection = BaseHook.get_connection(self.http_conn_id) # BaseHook의 get_connection을 이용해 Webserver의 커넥션 정보를 꺼내올 수 있음
        self.base_url = f'{connection.host}:{connection.port}/{self.endpoint}' # connection에 저장된 값 잘 확인할 것!

        total_row_df = pd.DataFrame()
        start_row = 1
        end_row = 1000

        while True:
            self.log.info(f'시작:{start_row}')
            self.log.info(f'끝:{end_row}')
            row_df = self._call_api(self.base_url, start_row, end_row) # 아래 선언한 _call_api 함수를 통해 api 호출
            total_row_df = pd.concat([total_row_df, row_df])
            if len(row_df) < 1000: # 1000개씩 호출하는데, 1000개 이하라면 더 이상 긁어올 데이터가 없다는 것을 뜻함
                break
            else:
                start_row = end_row + 1
                end_row += 1000 # 1000개 더 호출

        if not os.path.exists(self.path):
            os.system(f'mkdir -p {self.path}')
        total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False)

그 다음 실질적으로 해당 오퍼레이터가 동작하는 execute 함수에 대한 부분을 작성했다. 우선 BaseHook의 get_connection 함수를 이용해 webserver에 저장된 Connection ID에 대한 정보들을 불러와준다. (host, port 등) 그리고 해당 정보들을 토대로 API 호출을 위한 링크를 만든다.

 

그 뒤 빈 데이터프레임을 만들어준다.(csv 파일로 저장할 데이터프레임) 이후 시작 행 1, 끝 행 1000으로 설정한 API 호출 링크를 _call_api 함수로 보내준다. 이 함수는 직접 작성한 함수로, 아래에서 따로 설명하도록 하겠다.

 

이 함수를 통해 dataframe 형태의 데이터를 받게 된다. 위에서 만들어준 데이터프레임에 concat을 통해 합쳐준다. 이 df의 길이가 1000 미만이라면, 1000개씩 데이터를 호출했지만 더 이상 가져올 데이터가 없다는 뜻으로, 그대로 파일 경로를 설정해준 뒤 csv파일로 저장한다.

 

    def _call_api(self, base_url, start_row, end_row):
        import requests
        import json
        
        headers = {'Content-Type': 'application/json',
                   'charset': 'utf-8',
                   'Accept': '*/*'
                   }
        
        request_url = f'{base_url}/{start_row}/{end_row}/' # 가져올 데이터 수에 대한 링크 추가
        if self.base_dt is not None:
            request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
        response = requests.get(request_url, headers) # 위 링크를 토대로 get 명령 호출
        contents = json.loads(response.text) # 위에서 받은 데이터를 json으로 load

        key_nm = list(contents.keys())[0]
        row_data = contents.get(key_nm).get('row') # 실질적인 데이터가 있는 row 키의 value값을 불러옴
        row_df = pd.DataFrame(row_data) # 해당 데이터를 dataframe으로 저장

        return row_df

위에서 사용한 _call_api 함수이다. requests 라이브러리를 통해 위 API 호출 링크를 가지고 GET 명령을 통해 데이터를 받아오며, 이를 json 라이브러리를 통해 묶어준 뒤 df 형태로 반환해준다.

 

 

Custom Operator를 이용한 DAG 만들기

from operators.seoul_api_to_csv import SeoulApiToCsvOperator
from airflow import DAG
import pendulum

with DAG(
    dag_id='dags_seoul_api',
    schedule='0 7 * * *',
    start_date=pendulum.datetime(2024,7,1, tz='Asia/Seoul'),
    tags = ['practice'],
    catchup=False
) as dag:
    '''서울시 10분 강우량 동향'''
    seoul_rain_fall_10m = SeoulApiToCsvOperator(
        task_id = 'seoul_rain_fall_10m',
        dataset_nm='ListRainfallService',
        path='/home/minding/airflow/files/seoul_rainfall_10m/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}', # files 폴더 연결 필요
        file_name='Seoul_Rainfall_10m.csv'
    )

    '''서울시 일일 강수량 현향'''
    ts_rainfall_data = SeoulApiToCsvOperator(
        task_id = 'ts_rainfall_data',
        dataset_nm='tsRainfallData',
        path='/home/minding/airflow/files/ts_rainfall_data/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}',
        file_name='ts_rainfall_data.csv'
    )

    seoul_rain_fall_10m >> ts_rainfall_data

airflow는 기본적으로 /plugins 경로까지 파악하므로 /operators 폴더 내 존재하는 커스텀 오퍼레이터도 operators.{py 파일 이름}으로 import해줄 수 있다. 이후 다른 DAG을 만들던 것처럼 똑같이 작성해주면 된다.

 

하지만 파일을 저장해주는 경로를 지정할 때 유의할 점이 있다. 이 DAG이 실행되는 주체는 docker 내 worker라는 컨테이너에서 수행되기 때문에, 해당 경로에 파일이 저장되더라도 컨테이너가 종료된 뒤에는 파일은 사라지게 되어있다. 이를 방지하기 위해서는 위 파일 경로에 해당하는 '/files' 폴더를 worker 컨테이너와 연결해줄 필요가 있다.

 

현재 WSL 컴퓨터 내 docker-compose.yaml 파일을 다시 들여다보자.

volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins

volumes 부분을 들여다보면, 현재 PROJ_DIR로써 연결된 폴더는 총 4개로, dags,logs,config,plugins이다. 이 폴더는 컨테이너 종료 뒤에도 그대로 남지만, 우리가 파일을 저장하고자 하는 files 폴더는 아직 연결이 되어있지 않은 상태이다.

 

  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/airflow/files:/opt/airflow/files

위와 같이 docker-compose.yaml 파일을 수정해준다. (팁: yy(커서가 있는 줄 복사), p(불여넣기)를 이용하면 더 빨리 줄을 추가할 수 있다.)

 

이후 Webserver에 DAG을 올려 실행시켜본다.

오타를 조심하자.

 

 

혹시 실행과정에서 폴더가 생성되지 않는 현상이 나타난다면,

docker-compose.yaml 파일에서 volumes 아래에 있는 user 부분을 아래와 같이 수정하자.

user: "0:0"

주로 권한이 없어 폴더를 생성하지 못하는 것으로 파악되어 root 권한을 주는 것이다.

 

DAG 실행 뒤 files 폴더를 살펴보면

이렇게 폴더가 생성되고 파일이 정상적으로 노출되는 것을 확인할 수 있다!

728x90
반응형

댓글