본문 바로가기

Minding's Programming/Airflow

[Airflow] Python Operator 사용과 Python Decorator

728x90
반응형

Python Operator

 

파이썬 오퍼레이터: Airflow에서 Python 함수를 실행시키는 오퍼레이터로, 해당 클래스를 이용해 DAG을 만들면 파이썬 함수를 실행시키는 워크플로우를 생성할 수 있다.

 

1. 내부 함수 실행

# 내부에 직접 함수를 만드는 경우

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
import random

with DAG(
    dag_id="dags_python_operator",
    schedule="30 6 * * *", # 매일 6시 30분 마다
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    def select_fruit(): # 랜덤으로 과일 리스트 중 하나를 출력하는 함수
        fruit = ['APPLE', 'BANANA', "ORANGE", "AVOCADO"]
        rand_int = random.randint(0,3)
        print(fruit[rand_int])

    py_t1 = PythonOperator(
        task_id = 'py_t1',
        python_callable = select_fruit
    )

    py_t1

 

2. 외부 함수 실행

# 외부 함수 import 해 실행하는 경우

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from common.common_func import get_sftp # airflow는 기본적으로 /plugins 폴더까지 path로 인식하기 때문에 그 아래 경로로 import 해줘야 함

with DAG(
    dag_id="dags_python_import_func",
    schedule="30 6 * * *", # 매일 6시 30분 마다
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    task_get_sftp = PythonOperator(
        task_id = 'task_get_sftp',
        python_callable=get_sftp # 짧은 문구를 출력하는 간단한 함수
    )

    task_get_sftp

위 글에서 주의해야할 점은 파이썬 함수를 Import할 때의 경로이다.

Airflow의 경우 파이썬 함수를 ./plugins 폴더 내에서 바로 찾기 때문에, plugins 폴더 아래부터 경로가 시작해야 한다.

 

ex) get_stfp 함수가 Airflow/plugins/common 폴더 안에 있는 경우

일반적인 파이썬 인터프리터의 경우: from Airflow.plugins.common .common_func import get_stfp

Airflow의 경우: common.common_func import get_stfp

 

이후 Airflow 서버에 git을 통해 올리게 될 경우

다음과 같이 잘 노출되는 것을 확인할 수 있다.

 

 

파이썬 데코레이터

 

파이썬 데코레이터는 '함수를 장식하다'라는 의미를 가지고 있는데, 원래의 함수를 감싸서 해당 함수 바깥에 추가 기능을 덧붙이는 방법이다.

 

'감싼다'라는 것은 파이썬의 아래와 같은 특징으로 설명할 수 있다.

  • 함수 안에 함수를 선언할 수 있다.
  • 함수의 인자로 함수를 전달하는 것이 가능하다.
  • 함수 자체를 리턴하는 것이 가능하다.

여기서 데코레이터는 기존에 함수를 직접 호출하는 부분을 수정할 필요없이 원래의 함수가 정의된 부분 바로 윗줄에 한 줄 추가하는 것만으로 해당 함수를 감쌀 수 있다.

# 데코레이터 적용 전 (함수를 호출하는 부분, 이 부분은 수정할 필요 없음)

get_data1()
get_data2()
get_data3()


# 데코레이터 적용 후 (로그를 남기는 outer_func 이라는 함수를 씌워줌)

@outer_func
def get_data1():
	print('함수를 실행합니다.')
    
@outer_func
def get_data2():
	print('함수를 실행합니다.')
    
@outer_func
def get_data2():
	print('함수를 실행합니다.')

 

 

Task 데코레이터

 

Task 데코레이터는 파이썬 함수를 정의하는 것만으로 쉽게 Task를 생성할 수 있는 기능을 말한다. DAG 파일에 PythonOperator를 불러올 필요없이 바로 task로 추가할 수 있다. 코드가 훨씬 간결해진다.

# 기존 방식
def python_func1():
	...
    
py_task_1 = PythonOperator (
	Task_id = 'py_task_1',
    python_callable=python_func1
)

py_task_1
# task 데코레이터 사용

@task(task_id='py_task_1')
def python_func1():
	...
    
py_task = python_func1()
728x90