Airflow는 오퍼레이터를 직접 만들어 사용할 수 있도록 클래스를 제공해주고 있다. 이런 활용성이 Airflow의 가장 큰 장점이라고 할 수 있는 만큼, BaseOperator를 활용한 Custom Operator를 개발에 대해 공부해보고자 한다.
BaseOperator 상속 시 재정의할 메서드
위에서 말했듯이 Custom Operator를 만들기 위해 Airflow는 BaseOperator라는 클래스를 제공해주고 있다.
BaseOperator를 상속(Overriding)받을 때는 아래의 두 가지 메서드를 재정의해야 한다.
- def __init__ (객체 생성자 함수): 커스텀 오퍼레이터에 들어갈 파라미터 등을 설정
- def execute(self, context) : __init__ 생성자로 객체 얻은 후 execute 메서드를 실행시키도록 설정되어 있기 때문에, 비즈니스 로직은 execute에 구현 필요
CustomOperator 개발 전 유의할 사항
BaseOperator를 통해 Custom Operator를 개발하기 전 유의할 사항이 몇 가지 있다. 이 점들을 확인한 뒤 개발을 진행하자.
- 오퍼레이터 기능이 제대로 정의되어 있는가?
- 기존 오퍼레이터로 충분히 대체 가능한가? (가능할 경우 기존 오퍼레이터 사용)
- 오퍼레이터와 dag의 폴더 위치 확인
- dag: /dags 폴더
- 오퍼레이터: /plugins/operators 폴더 (import할 때 경로 확인)
나는 지난 시간 서울시 공공데이터 API 이용해보기 에서 endpoint를 이용해 10개까지 밖에 불러오지 못하는 SimpleHttpOperator를 대신해 전체 데이터를 API를 통해 받은 후 csv파일로 저장하는 Operator를 개발하고자 한다.
CustomOperator 생성 관련 Airflow 공식 문서 확인
https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
공식문서에서는 예시 코드와 더불어 Custom Operator를 만드는 데 다양한 정보를 얻을 수 있다.
# 예시 코드
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs) # task_id 등 기본 파라미터가 담겨져있는 **kwargs를 super()함수를 통해 BaseOperator에 전달함
self.name = name # 파라미터 지정
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
또한 template을 사용할 수 있는 파라미터도 아래와 같이 지정해줄 수 있다.
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",) # Template 사용할 파라미터 지정
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
서울시 공공 데이터 API 활용하는 Custom Operator 개발해보기
class SeoulApiToCsvOperator(BaseOperator): # 클래스 상속
template_fields = ('endpoint', 'path', 'file_name', 'base_dt') # 템플릿 문법 사용 가능한 파라미터 설정
def __init__(self, dataset_nm, path, file_name, base_dt=None, **kwargs): # 4개의 파라미터를 User에게 받을 예정
super().__init__(**kwargs)
self.http_conn_id = 'openapi.seoul.go.kr' # 커넥션 ID를 미리 지정 (input으로 받지 않고 이것만 사용)
self.path = path # 파일을 저장할 경로
self.file_name = file_name
self.endpoint = '{{var.value.apikey_openapi_seoul_go_kr}}/json/' + dataset_nm
self.base_dt = base_dt
우선 __init__ 함수부터 재정의해 주었다. 사용자로부터 받아야 할 파라미터는 dataset_nm, path, file_name, base_dt로 총 4가지 이다. (base_dt는 기본값이 None으로 입력하지 않으면 None 처리)
task_id는 **kwargs에 포함된 파라미터이기 때문에, 위에서 설명했듯 super() 함수를 통해 부모 클래스인 BaseOperator에 전달한다.
또한 이 오퍼레이터는 서울시 공공데이터에 한정해 사용하는 것이기 때문에, http_conn_id의 커넥션 ID와 endpoint의 api key를 입력하는 부분은 webserver에서 불러올 수 있도록 했다.
def execute(self, context):
import os
connection = BaseHook.get_connection(self.http_conn_id) # BaseHook의 get_connection을 이용해 Webserver의 커넥션 정보를 꺼내올 수 있음
self.base_url = f'{connection.host}:{connection.port}/{self.endpoint}' # connection에 저장된 값 잘 확인할 것!
total_row_df = pd.DataFrame()
start_row = 1
end_row = 1000
while True:
self.log.info(f'시작:{start_row}')
self.log.info(f'끝:{end_row}')
row_df = self._call_api(self.base_url, start_row, end_row) # 아래 선언한 _call_api 함수를 통해 api 호출
total_row_df = pd.concat([total_row_df, row_df])
if len(row_df) < 1000: # 1000개씩 호출하는데, 1000개 이하라면 더 이상 긁어올 데이터가 없다는 것을 뜻함
break
else:
start_row = end_row + 1
end_row += 1000 # 1000개 더 호출
if not os.path.exists(self.path):
os.system(f'mkdir -p {self.path}')
total_row_df.to_csv(self.path + '/' + self.file_name, encoding='utf-8', index=False)
그 다음 실질적으로 해당 오퍼레이터가 동작하는 execute 함수에 대한 부분을 작성했다. 우선 BaseHook의 get_connection 함수를 이용해 webserver에 저장된 Connection ID에 대한 정보들을 불러와준다. (host, port 등) 그리고 해당 정보들을 토대로 API 호출을 위한 링크를 만든다.
그 뒤 빈 데이터프레임을 만들어준다.(csv 파일로 저장할 데이터프레임) 이후 시작 행 1, 끝 행 1000으로 설정한 API 호출 링크를 _call_api 함수로 보내준다. 이 함수는 직접 작성한 함수로, 아래에서 따로 설명하도록 하겠다.
이 함수를 통해 dataframe 형태의 데이터를 받게 된다. 위에서 만들어준 데이터프레임에 concat을 통해 합쳐준다. 이 df의 길이가 1000 미만이라면, 1000개씩 데이터를 호출했지만 더 이상 가져올 데이터가 없다는 뜻으로, 그대로 파일 경로를 설정해준 뒤 csv파일로 저장한다.
def _call_api(self, base_url, start_row, end_row):
import requests
import json
headers = {'Content-Type': 'application/json',
'charset': 'utf-8',
'Accept': '*/*'
}
request_url = f'{base_url}/{start_row}/{end_row}/' # 가져올 데이터 수에 대한 링크 추가
if self.base_dt is not None:
request_url = f'{base_url}/{start_row}/{end_row}/{self.base_dt}'
response = requests.get(request_url, headers) # 위 링크를 토대로 get 명령 호출
contents = json.loads(response.text) # 위에서 받은 데이터를 json으로 load
key_nm = list(contents.keys())[0]
row_data = contents.get(key_nm).get('row') # 실질적인 데이터가 있는 row 키의 value값을 불러옴
row_df = pd.DataFrame(row_data) # 해당 데이터를 dataframe으로 저장
return row_df
위에서 사용한 _call_api 함수이다. requests 라이브러리를 통해 위 API 호출 링크를 가지고 GET 명령을 통해 데이터를 받아오며, 이를 json 라이브러리를 통해 묶어준 뒤 df 형태로 반환해준다.
Custom Operator를 이용한 DAG 만들기
from operators.seoul_api_to_csv import SeoulApiToCsvOperator
from airflow import DAG
import pendulum
with DAG(
dag_id='dags_seoul_api',
schedule='0 7 * * *',
start_date=pendulum.datetime(2024,7,1, tz='Asia/Seoul'),
tags = ['practice'],
catchup=False
) as dag:
'''서울시 10분 강우량 동향'''
seoul_rain_fall_10m = SeoulApiToCsvOperator(
task_id = 'seoul_rain_fall_10m',
dataset_nm='ListRainfallService',
path='/home/minding/airflow/files/seoul_rainfall_10m/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}', # files 폴더 연결 필요
file_name='Seoul_Rainfall_10m.csv'
)
'''서울시 일일 강수량 현향'''
ts_rainfall_data = SeoulApiToCsvOperator(
task_id = 'ts_rainfall_data',
dataset_nm='tsRainfallData',
path='/home/minding/airflow/files/ts_rainfall_data/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}',
file_name='ts_rainfall_data.csv'
)
seoul_rain_fall_10m >> ts_rainfall_data
airflow는 기본적으로 /plugins 경로까지 파악하므로 /operators 폴더 내 존재하는 커스텀 오퍼레이터도 operators.{py 파일 이름}으로 import해줄 수 있다. 이후 다른 DAG을 만들던 것처럼 똑같이 작성해주면 된다.
하지만 파일을 저장해주는 경로를 지정할 때 유의할 점이 있다. 이 DAG이 실행되는 주체는 docker 내 worker라는 컨테이너에서 수행되기 때문에, 해당 경로에 파일이 저장되더라도 컨테이너가 종료된 뒤에는 파일은 사라지게 되어있다. 이를 방지하기 위해서는 위 파일 경로에 해당하는 '/files' 폴더를 worker 컨테이너와 연결해줄 필요가 있다.
현재 WSL 컴퓨터 내 docker-compose.yaml 파일을 다시 들여다보자.
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins
volumes 부분을 들여다보면, 현재 PROJ_DIR로써 연결된 폴더는 총 4개로, dags,logs,config,plugins이다. 이 폴더는 컨테이너 종료 뒤에도 그대로 남지만, 우리가 파일을 저장하고자 하는 files 폴더는 아직 연결이 되어있지 않은 상태이다.
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/airflow/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/airflow/files:/opt/airflow/files
위와 같이 docker-compose.yaml 파일을 수정해준다. (팁: yy(커서가 있는 줄 복사), p(불여넣기)를 이용하면 더 빨리 줄을 추가할 수 있다.)
이후 Webserver에 DAG을 올려 실행시켜본다.
혹시 실행과정에서 폴더가 생성되지 않는 현상이 나타난다면,
docker-compose.yaml 파일에서 volumes 아래에 있는 user 부분을 아래와 같이 수정하자.
user: "0:0"
주로 권한이 없어 폴더를 생성하지 못하는 것으로 파악되어 root 권한을 주는 것이다.
DAG 실행 뒤 files 폴더를 살펴보면
이렇게 폴더가 생성되고 파일이 정상적으로 노출되는 것을 확인할 수 있다!
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기 (0) | 2024.07.26 |
---|---|
[Airflow/Docker] Docker-compose.yaml 파일 해석 (0) | 2024.07.26 |
[Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기 (0) | 2024.07.25 |
[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기) (0) | 2024.07.25 |
[Airflow] Task Group (1) | 2024.07.24 |