Python 썸네일형 리스트형 [Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기) DAG 내에서 task의 순서를 지정하고, 특정 조건 별로 실행시키는 법을 알아보았다. 하지만 task보다 더 큰 범주인 DAG을 순서에 따라 실행시키려면 어떻게 해야할까? DAG 간 의존관계를 설정하는 방법에는 크게 두 가지가 있다고 한다. 1) TriggerDagRun Operator 사용2) ExternalTask Sensor 사용 이 두가지 방법의 차이점은 아래 표와 같이 정리할 수 있다. TriggerDagRun OperatorExternal Task Sensor방식실행할 다른 DAG의 ID를 지정해 수행본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행사용시점(권고)Trigger되는 DAG의 선행 DAG이 하나일 경우Trigger되는 DAG의 선행 DAG이 2개 이상일 경우위와 같.. 더보기 [Airflow] Task Group Task Group이란 Task Group은 task들을 모아 그룹 형태로 관리할 수 있는 기능이다. UI Graph 탭에서 Task들을 Group화하여 보여준다.DAG 내 task가 많아졌을 때, 관리하기 쉽도록 해주는 편의 기능이며 task group 안에 task group을 중첩해 관리할 수도 있다. 위 영상을 보면 여러 개의 task들을 하나의 section으로 만들어 관리할 수 있는 것을 볼 수 있다. 자세히 보면 section 내 inner_section과 같이 group 안에 또 다른 group을 만들어 설정할 수 있는 것을 확인할 수 있다. Task Group을 꼭 사용할 필요는 없지만, 관리의 용이성이 올라가기 때문에 설정해놓는다면 편하게 관리할 수 있을 것이다. Task Group .. 더보기 [Airflow] Trigger Rule 이전에 branch, 즉 분기처리 개념을 통해 상위 task에서 상황에 따라 분기 처리하여 여러 개의 하위 task 중 선택 실행하는 방법을 배워봤다.(링크) [Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (tasminding-deep-learning.tistory.com 이번에는 상위 task가 여러 개이고, 특정 조건에서만 하위 task를 실행하는 조건의 개념인 Trigger Rule.. 더보기 [Airflow] 전역변수 Variable 이용하기 전역 변수 Variable? Xcom은 특정 DAG 또는 특정 Schedule에 수행되는 task 간에만 공유되는 데이터라면, Variable은 모든 DAG에 공유되는 데이터라고 할 수 있다. Variable은 Airflow Webserver(http://localhost:8080/)에서 등록할 수 있다. Admin 메뉴 하단 'Variables'라는 메뉴에서 등록가능하다. (실제 Variable의 Key, Value 값은 메타 DB의 Variable 테이블에 저장된다.) 전역 변수 사용하기 전역 변수를 사용하는 방법에는 크게 두 가지가 있다. 1) Variable 라이브러리를 이용해 가져오기from airflow.models import Variablevar_value = Variable.get("s.. 더보기 [Airflow/Xcom] Airflow에서 Xcom 사용해보기 Xcom? (Cross Communication) Xcom은 Airflow DAG 내 task 간의 데이터 공유를 위해 사용되는 기술이다.ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우 주로 작은 규모의 데이터 공유를 위해 사용되며, 1GB 이상의 데이터 공유를 위해서는 AWS S3, HDFS 등의 외부 솔루션 사용이 필요하다. (Xcom 내용은 메타 DB(내부 DB)의 테이블에 값이 저장되기 때문) Python 오퍼레이터에서 Xcom을 사용하는 방법 파이썬 오퍼레이터에서 Xcom을 사용하는 방법에는 크게 2가지가 있다. 1) **kwargs에 존재하는 ti(task_instance) 객체 활용# 데이터 xcom에 업로드@task(task_id = 'pyt.. 더보기 [Airflow/Macros] Airflow에서 macros(매크로) 사용해보기 Macro 변수는? Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다. Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때sql = f'''SELECT NAME, ADDRESSFROM TBL_REGWHERE REG_DATE BETWEEN ?? AND ??'''배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제매 달마다 '어제 날짜'가 다르.. 더보기 [Python/Jinja/Airflow] Jinja 템플릿과 Airflow에서의 사용방법 Jinja 템플릿 Jinja 템플릿은 파이썬 언어에서 사용하는 템플릿 엔진으로, 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진이다. Jinja 템플릿은 주로 파이썬 기반 웹 프레임워크인 Flask, Django, FastAPI에서 주로 사용한다. 이 경우 html 템플릿에 내용을 저장하고 화면에 보여질 때 실제 값으로 변환해서 출력한다. 각 상황에 맞게 변환해서 html 템플릿을 보여줄 수 있기 때문에, html 파일의 재활용성이 높아져 효율적으로 이용할 수 있다. SQL 작성 시에도 Jinja 템플릿을 활용할 수 있다. (상황에 따라 {{ }}의 값만 바뀌도록 적용 가능)ex) select * from tables where base_dt = {{ }} Airf.. 더보기 [Airflow] Python Operator의 op_args, op_kwargs *args와 **kwargs의 개념 정리: [Python] 파이썬 함수 파라미터 *args와 **kwargs op_args 파이썬 오퍼레이터를 사용해 파이썬 함수를 실행시킬 경우 해당 함수의 파라미터는 op_args를 통해 전달한다.as dag: def regist(name, sex): print(f'이름은 {name}이고 성별은 {sex}입니다.') py_task_1 = PythonOperator( task_id = 'py_task_1', python_callable=regist, op_args = ['minding', 'man'] # 리스트 형태로 작성 py_task_1 = PythonOperator( # *args가 있을 경우 task_id = 'py_task_1',.. 더보기 이전 1 2 3 4 5 6 7 ··· 12 다음