[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기)
·
Minding's Programming/Airflow
DAG 내에서 task의 순서를 지정하고, 특정 조건 별로 실행시키는 법을 알아보았다. 하지만 task보다 더 큰 범주인 DAG을 순서에 따라 실행시키려면 어떻게 해야할까? DAG 간 의존관계를 설정하는 방법에는 크게 두 가지가 있다고 한다. 1) TriggerDagRun Operator 사용2) ExternalTask Sensor 사용 이 두가지 방법의 차이점은 아래 표와 같이 정리할 수 있다. TriggerDagRun OperatorExternal Task Sensor방식실행할 다른 DAG의 ID를 지정해 수행본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행사용시점(권고)Trigger되는 DAG의 선행 DAG이 하나일 경우Trigger되는 DAG의 선행 DAG이 2개 이상일 경우위와 같..
[Airflow] Task Group
·
Minding's Programming/Airflow
Task Group이란 Task Group은 task들을 모아 그룹 형태로 관리할 수 있는 기능이다. UI Graph 탭에서 Task들을 Group화하여 보여준다.DAG 내 task가 많아졌을 때, 관리하기 쉽도록 해주는 편의 기능이며 task group 안에 task group을 중첩해 관리할 수도 있다. 위 영상을 보면 여러 개의 task들을 하나의 section으로 만들어 관리할 수 있는 것을 볼 수 있다. 자세히 보면 section 내 inner_section과 같이 group 안에 또 다른 group을 만들어 설정할 수 있는 것을 확인할 수 있다. Task Group을 꼭 사용할 필요는 없지만, 관리의 용이성이 올라가기 때문에 설정해놓는다면 편하게 관리할 수 있을 것이다.  Task Group ..
[Airflow] Trigger Rule
·
Minding's Programming/Airflow
이전에 branch, 즉 분기처리 개념을 통해 상위 task에서 상황에 따라 분기 처리하여 여러 개의 하위 task 중 선택 실행하는 방법을 배워봤다.(링크) [Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (tasminding-deep-learning.tistory.com 이번에는 상위 task가 여러 개이고, 특정 조건에서만 하위 task를 실행하는 조건의 개념인 Trigger Rule..
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)
·
Minding's Programming/Airflow
Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (task 1 >> [task2-1, task2-2, task2-3]) 그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리이다.  Airflow에서 task를 분기처리 하는 방법 1) BranchPythonOperator 사용2) @task.branch 데코레이터 이용3) BaseBranchOperator 상속하여 직접 개발 (클래스 파일을 상속)  Branc..
[Airflow] 전역변수 Variable 이용하기
·
Minding's Programming/Airflow
전역 변수 Variable? Xcom은 특정 DAG 또는 특정 Schedule에 수행되는 task 간에만 공유되는 데이터라면, Variable은 모든 DAG에 공유되는 데이터라고 할 수 있다. Variable은 Airflow Webserver(http://localhost:8080/)에서 등록할 수 있다. Admin 메뉴 하단 'Variables'라는 메뉴에서 등록가능하다. (실제 Variable의 Key, Value 값은 메타 DB의 Variable 테이블에 저장된다.)  전역 변수 사용하기 전역 변수를 사용하는 방법에는 크게 두 가지가 있다. 1) Variable 라이브러리를 이용해 가져오기from airflow.models import Variablevar_value = Variable.get("s..
[Airflow/Xcom] Airflow에서 Xcom 사용해보기
·
Minding's Programming/Airflow
Xcom? (Cross Communication) Xcom은 Airflow DAG 내 task 간의 데이터 공유를 위해 사용되는 기술이다.ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우 주로 작은 규모의 데이터 공유를 위해 사용되며, 1GB 이상의 데이터 공유를 위해서는 AWS S3, HDFS 등의 외부 솔루션 사용이 필요하다. (Xcom 내용은 메타 DB(내부 DB)의 테이블에 값이 저장되기 때문)  Python 오퍼레이터에서 Xcom을 사용하는 방법 파이썬 오퍼레이터에서 Xcom을 사용하는 방법에는 크게 2가지가 있다. 1) **kwargs에 존재하는 ti(task_instance) 객체 활용# 데이터 xcom에 업로드@task(task_id = 'pyt..
[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기
·
Minding's Programming/Airflow
Macro 변수는? Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다. Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때sql = f'''SELECT NAME, ADDRESSFROM TBL_REGWHERE REG_DATE BETWEEN ?? AND ??'''배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제매 달마다 '어제 날짜'가 다르..
[Airflow] Airflow의 날짜 개념
·
Minding's Programming/Airflow
Airflow 날짜 Template 변수 Airflow는 Airbnb에서 개발 시 ETL 도구로써 개발했기 때문에, Airflow의 날짜 개념을 이해하기 위해서는 '데이터 관점'에서의 날짜를 바라볼 수 있어야 한다. 예를 들어 하루마다 주기적으로 수행되는 일 배치의 워크플로우가 있다고 생각해보자. 이 워크플로우는 배치 실행 일 기준 전 날 00시부터 23시 59분까지의 데이터를 추출하는 task를 가지고 있다.그렇게 됐을 때, 데이터 관점에서의 시작 일은 배치를 실행하고 있는 오늘이 아닌 어제인 것이다.(ex. 25일에 task 실행, 데이터는 24일 00:00~24일 23:59 까지의 데이터이므로 data의 시작 일은 25일이 아닌 24일) 이는 Airflow의 Template 변수를 출력해보아도 알 ..