본문 바로가기

반응형

에어플로우

[Airflow] CustomSensor 만들어 활용하기 Sensor는 지난 포스팅에서 알 수 있듯 특정 DAG 또는 task의 실행 여부를 판단하는 역할을 한다. (참고: Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor) [Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)Airflow에서의 센서란?(공식문서: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html) 센서는 오퍼레이터의 일종으로 특정 조건을 파악하는 것에 특화된 오퍼레이터라고 할 수 있다. 미minding-deep-learning.tistory.com 이번에는 Custom Sensor를 만들어 활용.. 더보기
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator) Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (task 1 >> [task2-1, task2-2, task2-3]) 그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리이다.  Airflow에서 task를 분기처리 하는 방법 1) BranchPythonOperator 사용2) @task.branch 데코레이터 이용3) BaseBranchOperator 상속하여 직접 개발 (클래스 파일을 상속)  Branc.. 더보기
[Airflow] 전역변수 Variable 이용하기 전역 변수 Variable? Xcom은 특정 DAG 또는 특정 Schedule에 수행되는 task 간에만 공유되는 데이터라면, Variable은 모든 DAG에 공유되는 데이터라고 할 수 있다. Variable은 Airflow Webserver(http://localhost:8080/)에서 등록할 수 있다. Admin 메뉴 하단 'Variables'라는 메뉴에서 등록가능하다. (실제 Variable의 Key, Value 값은 메타 DB의 Variable 테이블에 저장된다.)  전역 변수 사용하기 전역 변수를 사용하는 방법에는 크게 두 가지가 있다. 1) Variable 라이브러리를 이용해 가져오기from airflow.models import Variablevar_value = Variable.get("s.. 더보기
[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기 Macro 변수는? Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다. Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때sql = f'''SELECT NAME, ADDRESSFROM TBL_REGWHERE REG_DATE BETWEEN ?? AND ??'''배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제매 달마다 '어제 날짜'가 다르.. 더보기
[Airflow] Airflow의 날짜 개념 Airflow 날짜 Template 변수 Airflow는 Airbnb에서 개발 시 ETL 도구로써 개발했기 때문에, Airflow의 날짜 개념을 이해하기 위해서는 '데이터 관점'에서의 날짜를 바라볼 수 있어야 한다. 예를 들어 하루마다 주기적으로 수행되는 일 배치의 워크플로우가 있다고 생각해보자. 이 워크플로우는 배치 실행 일 기준 전 날 00시부터 23시 59분까지의 데이터를 추출하는 task를 가지고 있다.그렇게 됐을 때, 데이터 관점에서의 시작 일은 배치를 실행하고 있는 오늘이 아닌 어제인 것이다.(ex. 25일에 task 실행, 데이터는 24일 00:00~24일 23:59 까지의 데이터이므로 data의 시작 일은 25일이 아닌 24일) 이는 Airflow의 Template 변수를 출력해보아도 알 .. 더보기
[Python/Jinja/Airflow] Jinja 템플릿과 Airflow에서의 사용방법 Jinja 템플릿 Jinja 템플릿은 파이썬 언어에서 사용하는 템플릿 엔진으로, 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진이다. Jinja 템플릿은 주로 파이썬 기반 웹 프레임워크인 Flask, Django, FastAPI에서 주로 사용한다. 이 경우 html 템플릿에 내용을 저장하고 화면에 보여질 때 실제 값으로 변환해서 출력한다. 각 상황에 맞게 변환해서 html 템플릿을 보여줄 수 있기 때문에, html 파일의 재활용성이 높아져 효율적으로 이용할 수 있다. SQL 작성 시에도 Jinja 템플릿을 활용할 수 있다. (상황에 따라 {{ }}의 값만 바뀌도록 적용 가능)ex) select * from tables where base_dt = {{ }}  Airf.. 더보기
[Airflow] Python Operator의 op_args, op_kwargs *args와 **kwargs의 개념 정리: [Python] 파이썬 함수 파라미터 *args와 **kwargs op_args 파이썬 오퍼레이터를 사용해 파이썬 함수를 실행시킬 경우 해당 함수의 파라미터는 op_args를 통해 전달한다.as dag: def regist(name, sex): print(f'이름은 {name}이고 성별은 {sex}입니다.') py_task_1 = PythonOperator( task_id = 'py_task_1', python_callable=regist, op_args = ['minding', 'man'] # 리스트 형태로 작성 py_task_1 = PythonOperator( # *args가 있을 경우 task_id = 'py_task_1',.. 더보기
[Airflow] Python Operator 사용과 Python Decorator Python Operator 파이썬 오퍼레이터: Airflow에서 Python 함수를 실행시키는 오퍼레이터로, 해당 클래스를 이용해 DAG을 만들면 파이썬 함수를 실행시키는 워크플로우를 생성할 수 있다. 1. 내부 함수 실행# 내부에 직접 함수를 만드는 경우from airflow import DAGimport pendulumimport datetimefrom airflow.operators.python import PythonOperatorimport randomwith DAG( dag_id="dags_python_operator", schedule="30 6 * * *", # 매일 6시 30분 마다 start_date=pendulum.datetime(2023, 3, 1, tz="Asia.. 더보기

728x90