본문 바로가기
반응형

Minding's Programming/Airflow27

[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator) Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (task 1 >> [task2-1, task2-2, task2-3]) 그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리이다.  Airflow에서 task를 분기처리 하는 방법 1) BranchPythonOperator 사용2) @task.branch 데코레이터 이용3) BaseBranchOperator 상속하여 직접 개발 (클래스 파일을 상속)  Branc.. 2024. 7. 24.
[Airflow] 전역변수 Variable 이용하기 전역 변수 Variable? Xcom은 특정 DAG 또는 특정 Schedule에 수행되는 task 간에만 공유되는 데이터라면, Variable은 모든 DAG에 공유되는 데이터라고 할 수 있다. Variable은 Airflow Webserver(http://localhost:8080/)에서 등록할 수 있다. Admin 메뉴 하단 'Variables'라는 메뉴에서 등록가능하다. (실제 Variable의 Key, Value 값은 메타 DB의 Variable 테이블에 저장된다.)  전역 변수 사용하기 전역 변수를 사용하는 방법에는 크게 두 가지가 있다. 1) Variable 라이브러리를 이용해 가져오기from airflow.models import Variablevar_value = Variable.get("s.. 2024. 7. 23.
[Airflow/Xcom] Airflow에서 Xcom 사용해보기 Xcom? (Cross Communication) Xcom은 Airflow DAG 내 task 간의 데이터 공유를 위해 사용되는 기술이다.ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우 주로 작은 규모의 데이터 공유를 위해 사용되며, 1GB 이상의 데이터 공유를 위해서는 AWS S3, HDFS 등의 외부 솔루션 사용이 필요하다. (Xcom 내용은 메타 DB(내부 DB)의 테이블에 값이 저장되기 때문)  Python 오퍼레이터에서 Xcom을 사용하는 방법 파이썬 오퍼레이터에서 Xcom을 사용하는 방법에는 크게 2가지가 있다. 1) **kwargs에 존재하는 ti(task_instance) 객체 활용# 데이터 xcom에 업로드@task(task_id = 'pyt.. 2024. 7. 23.
[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기 Macro 변수는? Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다. Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때sql = f'''SELECT NAME, ADDRESSFROM TBL_REGWHERE REG_DATE BETWEEN ?? AND ??'''배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제매 달마다 '어제 날짜'가 다르.. 2024. 7. 23.
[Airflow] Airflow의 날짜 개념 Airflow 날짜 Template 변수 Airflow는 Airbnb에서 개발 시 ETL 도구로써 개발했기 때문에, Airflow의 날짜 개념을 이해하기 위해서는 '데이터 관점'에서의 날짜를 바라볼 수 있어야 한다. 예를 들어 하루마다 주기적으로 수행되는 일 배치의 워크플로우가 있다고 생각해보자. 이 워크플로우는 배치 실행 일 기준 전 날 00시부터 23시 59분까지의 데이터를 추출하는 task를 가지고 있다.그렇게 됐을 때, 데이터 관점에서의 시작 일은 배치를 실행하고 있는 오늘이 아닌 어제인 것이다.(ex. 25일에 task 실행, 데이터는 24일 00:00~24일 23:59 까지의 데이터이므로 data의 시작 일은 25일이 아닌 24일) 이는 Airflow의 Template 변수를 출력해보아도 알 .. 2024. 7. 23.
[Python/Jinja/Airflow] Jinja 템플릿과 Airflow에서의 사용방법 Jinja 템플릿 Jinja 템플릿은 파이썬 언어에서 사용하는 템플릿 엔진으로, 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진이다. Jinja 템플릿은 주로 파이썬 기반 웹 프레임워크인 Flask, Django, FastAPI에서 주로 사용한다. 이 경우 html 템플릿에 내용을 저장하고 화면에 보여질 때 실제 값으로 변환해서 출력한다. 각 상황에 맞게 변환해서 html 템플릿을 보여줄 수 있기 때문에, html 파일의 재활용성이 높아져 효율적으로 이용할 수 있다. SQL 작성 시에도 Jinja 템플릿을 활용할 수 있다. (상황에 따라 {{ }}의 값만 바뀌도록 적용 가능)ex) select * from tables where base_dt = {{ }}  Airf.. 2024. 7. 22.
728x90
반응형