본문 바로가기

반응형

Airflow

[Airflow] Custom Operator 개발해보기 (BaseOperator) Airflow는 오퍼레이터를 직접 만들어 사용할 수 있도록 클래스를 제공해주고 있다. 이런 활용성이 Airflow의 가장 큰 장점이라고 할 수 있는 만큼, BaseOperator를 활용한 Custom Operator를 개발에 대해 공부해보고자 한다. BaseOperator 상속 시 재정의할 메서드 위에서 말했듯이 Custom Operator를 만들기 위해 Airflow는 BaseOperator라는 클래스를 제공해주고 있다.BaseOperator를 상속(Overriding)받을 때는 아래의 두 가지 메서드를 재정의해야 한다.def __init__ (객체 생성자 함수): 커스텀 오퍼레이터에 들어갈 파라미터 등을 설정def execute(self, context) : __init__ 생성자로 객체 얻은 후 e.. 더보기
[Airflow] SimpleHttpOperator로 서울시 공공데이터 API 이용해보기 서울시 공공데이터 API를 이용하기에 앞서 서울시 데이터광장 (https://data.seoul.go.kr/)에서 로그인 후 api 인증키를 발급받아야 한다.나는 좌측 아래 '일반 인증키'를 신청했다. 그리고 원하는 데이터 페이지에 들어가 OpenAPI 탭에 들어가보면 아래와 같은 정보를 확인할 수 있다.샘플 URL을 참고해서 원하는 데이터 파일 형식과 행 수를 설정해 데이터를 다운로드 받아보자. 나는 최근 장마로 인해 동남아의 '스콜'처럼 비가 내리는 것을 보고 서울시의 강수량 데이터가 궁금해 해당 데이터를 API로 받아보고자 한다. Airflow에서는 이와 같이 API 호출을 통해 데이터를 받기 위해서 SimpleHttpOperator를 사용한다. 말그대로 간단한 http 요청을 하는 오퍼레이터인데,.. 더보기
[Airflow] TriggerDagRun Operator / ExternalTask Sensor (다른 DAG 실행하기) DAG 내에서 task의 순서를 지정하고, 특정 조건 별로 실행시키는 법을 알아보았다. 하지만 task보다 더 큰 범주인 DAG을 순서에 따라 실행시키려면 어떻게 해야할까? DAG 간 의존관계를 설정하는 방법에는 크게 두 가지가 있다고 한다. 1) TriggerDagRun Operator 사용2) ExternalTask Sensor 사용 이 두가지 방법의 차이점은 아래 표와 같이 정리할 수 있다. TriggerDagRun OperatorExternal Task Sensor방식실행할 다른 DAG의 ID를 지정해 수행본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행사용시점(권고)Trigger되는 DAG의 선행 DAG이 하나일 경우Trigger되는 DAG의 선행 DAG이 2개 이상일 경우위와 같.. 더보기
[Airflow] Task Group Task Group이란 Task Group은 task들을 모아 그룹 형태로 관리할 수 있는 기능이다. UI Graph 탭에서 Task들을 Group화하여 보여준다.DAG 내 task가 많아졌을 때, 관리하기 쉽도록 해주는 편의 기능이며 task group 안에 task group을 중첩해 관리할 수도 있다. 위 영상을 보면 여러 개의 task들을 하나의 section으로 만들어 관리할 수 있는 것을 볼 수 있다. 자세히 보면 section 내 inner_section과 같이 group 안에 또 다른 group을 만들어 설정할 수 있는 것을 확인할 수 있다. Task Group을 꼭 사용할 필요는 없지만, 관리의 용이성이 올라가기 때문에 설정해놓는다면 편하게 관리할 수 있을 것이다.  Task Group .. 더보기
[Airflow] Trigger Rule 이전에 branch, 즉 분기처리 개념을 통해 상위 task에서 상황에 따라 분기 처리하여 여러 개의 하위 task 중 선택 실행하는 방법을 배워봤다.(링크) [Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator)Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (tasminding-deep-learning.tistory.com 이번에는 상위 task가 여러 개이고, 특정 조건에서만 하위 task를 실행하는 조건의 개념인 Trigger Rule.. 더보기
[Airflow] Task 분기 처리 (BranchPythonOperator, @task.branch, BaseBranchOperator) Task 분기 처리는 왜 필요한가? Task 1 다음에 이어지는 3가지의 Task 2-1, 2-2, 2-3이 있다고 가정해보자. task 1이 먼저 수행된 다음, 3가지의 task가 동시에 돌아가는 것이 지금까지 배워온 결과이다. (task 1 >> [task2-1, task2-2, task2-3]) 그러나, task1의 결과에 따라 task 2-1,2-2,2-3 중 한 가지만 수행하도록 설계해야 하는 경우에는 어떻게 해야할까? 그럴 때 필요한 것이 task 분기 처리이다.  Airflow에서 task를 분기처리 하는 방법 1) BranchPythonOperator 사용2) @task.branch 데코레이터 이용3) BaseBranchOperator 상속하여 직접 개발 (클래스 파일을 상속)  Branc.. 더보기
[Airflow] 전역변수 Variable 이용하기 전역 변수 Variable? Xcom은 특정 DAG 또는 특정 Schedule에 수행되는 task 간에만 공유되는 데이터라면, Variable은 모든 DAG에 공유되는 데이터라고 할 수 있다. Variable은 Airflow Webserver(http://localhost:8080/)에서 등록할 수 있다. Admin 메뉴 하단 'Variables'라는 메뉴에서 등록가능하다. (실제 Variable의 Key, Value 값은 메타 DB의 Variable 테이블에 저장된다.)  전역 변수 사용하기 전역 변수를 사용하는 방법에는 크게 두 가지가 있다. 1) Variable 라이브러리를 이용해 가져오기from airflow.models import Variablevar_value = Variable.get("s.. 더보기
[Airflow/Xcom] Airflow에서 Xcom 사용해보기 Xcom? (Cross Communication) Xcom은 Airflow DAG 내 task 간의 데이터 공유를 위해 사용되는 기술이다.ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우 주로 작은 규모의 데이터 공유를 위해 사용되며, 1GB 이상의 데이터 공유를 위해서는 AWS S3, HDFS 등의 외부 솔루션 사용이 필요하다. (Xcom 내용은 메타 DB(내부 DB)의 테이블에 값이 저장되기 때문)  Python 오퍼레이터에서 Xcom을 사용하는 방법 파이썬 오퍼레이터에서 Xcom을 사용하는 방법에는 크게 2가지가 있다. 1) **kwargs에 존재하는 ti(task_instance) 객체 활용# 데이터 xcom에 업로드@task(task_id = 'pyt.. 더보기

728x90