Python Operator
파이썬 오퍼레이터: Airflow에서 Python 함수를 실행시키는 오퍼레이터로, 해당 클래스를 이용해 DAG을 만들면 파이썬 함수를 실행시키는 워크플로우를 생성할 수 있다.
1. 내부 함수 실행
# 내부에 직접 함수를 만드는 경우
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
import random
with DAG(
dag_id="dags_python_operator",
schedule="30 6 * * *", # 매일 6시 30분 마다
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
def select_fruit(): # 랜덤으로 과일 리스트 중 하나를 출력하는 함수
fruit = ['APPLE', 'BANANA', "ORANGE", "AVOCADO"]
rand_int = random.randint(0,3)
print(fruit[rand_int])
py_t1 = PythonOperator(
task_id = 'py_t1',
python_callable = select_fruit
)
py_t1
2. 외부 함수 실행
# 외부 함수 import 해 실행하는 경우
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from common.common_func import get_sftp # airflow는 기본적으로 /plugins 폴더까지 path로 인식하기 때문에 그 아래 경로로 import 해줘야 함
with DAG(
dag_id="dags_python_import_func",
schedule="30 6 * * *", # 매일 6시 30분 마다
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
task_get_sftp = PythonOperator(
task_id = 'task_get_sftp',
python_callable=get_sftp # 짧은 문구를 출력하는 간단한 함수
)
task_get_sftp
위 글에서 주의해야할 점은 파이썬 함수를 Import할 때의 경로이다.
Airflow의 경우 파이썬 함수를 ./plugins 폴더 내에서 바로 찾기 때문에, plugins 폴더 아래부터 경로가 시작해야 한다.
ex) get_stfp 함수가 Airflow/plugins/common 폴더 안에 있는 경우
일반적인 파이썬 인터프리터의 경우: from Airflow.plugins.common .common_func import get_stfp
Airflow의 경우: common.common_func import get_stfp
이후 Airflow 서버에 git을 통해 올리게 될 경우
다음과 같이 잘 노출되는 것을 확인할 수 있다.
파이썬 데코레이터
파이썬 데코레이터는 '함수를 장식하다'라는 의미를 가지고 있는데, 원래의 함수를 감싸서 해당 함수 바깥에 추가 기능을 덧붙이는 방법이다.
'감싼다'라는 것은 파이썬의 아래와 같은 특징으로 설명할 수 있다.
- 함수 안에 함수를 선언할 수 있다.
- 함수의 인자로 함수를 전달하는 것이 가능하다.
- 함수 자체를 리턴하는 것이 가능하다.
여기서 데코레이터는 기존에 함수를 직접 호출하는 부분을 수정할 필요없이 원래의 함수가 정의된 부분 바로 윗줄에 한 줄 추가하는 것만으로 해당 함수를 감쌀 수 있다.
# 데코레이터 적용 전 (함수를 호출하는 부분, 이 부분은 수정할 필요 없음)
get_data1()
get_data2()
get_data3()
# 데코레이터 적용 후 (로그를 남기는 outer_func 이라는 함수를 씌워줌)
@outer_func
def get_data1():
print('함수를 실행합니다.')
@outer_func
def get_data2():
print('함수를 실행합니다.')
@outer_func
def get_data2():
print('함수를 실행합니다.')
Task 데코레이터
Task 데코레이터는 파이썬 함수를 정의하는 것만으로 쉽게 Task를 생성할 수 있는 기능을 말한다. DAG 파일에 PythonOperator를 불러올 필요없이 바로 task로 추가할 수 있다. 코드가 훨씬 간결해진다.
# 기존 방식
def python_func1():
...
py_task_1 = PythonOperator (
Task_id = 'py_task_1',
python_callable=python_func1
)
py_task_1
# task 데코레이터 사용
@task(task_id='py_task_1')
def python_func1():
...
py_task = python_func1()
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] Python Operator의 op_args, op_kwargs (1) | 2024.07.22 |
---|---|
[Python] 파이썬 함수 파라미터 *args와 **kwargs (0) | 2024.07.22 |
[Airflow] Email 오퍼레이터 사용해보기 (Gmail 서버 사용) (0) | 2024.07.19 |
[Airflow] Bash Operator로 쉘 스크립트 파일 실행하기 (0) | 2024.07.19 |
[Airflow] DAG 내 task 연결하기 (순서 지정) (0) | 2024.07.19 |