본문 바로가기

반응형

Airflow

[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기 Macro 변수는? Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다. Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때sql = f'''SELECT NAME, ADDRESSFROM TBL_REGWHERE REG_DATE BETWEEN ?? AND ??'''배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제매 달마다 '어제 날짜'가 다르.. 더보기
[Airflow] Airflow의 날짜 개념 Airflow 날짜 Template 변수 Airflow는 Airbnb에서 개발 시 ETL 도구로써 개발했기 때문에, Airflow의 날짜 개념을 이해하기 위해서는 '데이터 관점'에서의 날짜를 바라볼 수 있어야 한다. 예를 들어 하루마다 주기적으로 수행되는 일 배치의 워크플로우가 있다고 생각해보자. 이 워크플로우는 배치 실행 일 기준 전 날 00시부터 23시 59분까지의 데이터를 추출하는 task를 가지고 있다.그렇게 됐을 때, 데이터 관점에서의 시작 일은 배치를 실행하고 있는 오늘이 아닌 어제인 것이다.(ex. 25일에 task 실행, 데이터는 24일 00:00~24일 23:59 까지의 데이터이므로 data의 시작 일은 25일이 아닌 24일) 이는 Airflow의 Template 변수를 출력해보아도 알 .. 더보기
[Python/Jinja/Airflow] Jinja 템플릿과 Airflow에서의 사용방법 Jinja 템플릿 Jinja 템플릿은 파이썬 언어에서 사용하는 템플릿 엔진으로, 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진이다. Jinja 템플릿은 주로 파이썬 기반 웹 프레임워크인 Flask, Django, FastAPI에서 주로 사용한다. 이 경우 html 템플릿에 내용을 저장하고 화면에 보여질 때 실제 값으로 변환해서 출력한다. 각 상황에 맞게 변환해서 html 템플릿을 보여줄 수 있기 때문에, html 파일의 재활용성이 높아져 효율적으로 이용할 수 있다. SQL 작성 시에도 Jinja 템플릿을 활용할 수 있다. (상황에 따라 {{ }}의 값만 바뀌도록 적용 가능)ex) select * from tables where base_dt = {{ }}  Airf.. 더보기
[Airflow] Python Operator의 op_args, op_kwargs *args와 **kwargs의 개념 정리: [Python] 파이썬 함수 파라미터 *args와 **kwargs op_args 파이썬 오퍼레이터를 사용해 파이썬 함수를 실행시킬 경우 해당 함수의 파라미터는 op_args를 통해 전달한다.as dag: def regist(name, sex): print(f'이름은 {name}이고 성별은 {sex}입니다.') py_task_1 = PythonOperator( task_id = 'py_task_1', python_callable=regist, op_args = ['minding', 'man'] # 리스트 형태로 작성 py_task_1 = PythonOperator( # *args가 있을 경우 task_id = 'py_task_1',.. 더보기
[Airflow] Python Operator 사용과 Python Decorator Python Operator 파이썬 오퍼레이터: Airflow에서 Python 함수를 실행시키는 오퍼레이터로, 해당 클래스를 이용해 DAG을 만들면 파이썬 함수를 실행시키는 워크플로우를 생성할 수 있다. 1. 내부 함수 실행# 내부에 직접 함수를 만드는 경우from airflow import DAGimport pendulumimport datetimefrom airflow.operators.python import PythonOperatorimport randomwith DAG( dag_id="dags_python_operator", schedule="30 6 * * *", # 매일 6시 30분 마다 start_date=pendulum.datetime(2023, 3, 1, tz="Asia.. 더보기
[Airflow] Email 오퍼레이터 사용해보기 (Gmail 서버 사용) Gmail 사전 설정 작업 Gmail > 설정 > 모든 설정보기 > 전달 및 POP/IMAP > IMAP 사용 구글 계정관리 > 보안 > 2단계 인증 > 앱 비밀번호 세팅여기서 발급받은 비밀번호는 다시 볼 수 없으니 꼭 기억해두어야 한다.  airflow 설정vi 편집기를 이용해 docker-compose.yaml 파일에 해당 내용을 추가해주어야 한다.  AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' # 사용할 서버 AIRFLOW__SMTP__SMTP_USER: 'tlsfk48@gmail.com' # 로그인할 계정 AIRFLOW__SMTP__SMTP_PASSWORD: '{password}' # 앱 비밀번호 AIRFLOW__SMTP__SMTP_PORT: .. 더보기
[Airflow] Bash Operator로 쉘 스크립트 파일 실행하기 쉘 스크립트는?Unix/Linux Shell 명령을 이용해 만들어지고 인터프리터에 의해 한 줄씩 처리되는 파일(ex. 컴파일 방식: C, Java / 인터프리터 방식: Python, Shell)echo, mkdir 등 기본적인 쉘 명령어를 입력해 작성하며, 변수를 입력받거나 for/if문 및 함수도 사용 가능하다.확장자가 없어도 동작하지만 주로 파일명에 .sh 확장자를 붙임 왜 쉡 스크립트가 필요한가?쉘 명령어를 이용해 복잡한 로직을 처리하는 경우 함수화하여 간단히 하기 위해(ex. sftp를 통해 파일 받은 후 DB에 Insert & tar.gz로 압축해두기)쉘 명령어 재사용을 위해서 Worker 컨테이너에서 쉘 스크립트를 수행하려면?일반적으로 컨테이너에서 쉘 스크립트를 이용하지 못하는 이유컨테이너는 .. 더보기
[Airflow] DAG 내 task 연결하기 (순서 지정) DAG 내에서 각 Task들의 순서를 연결해주는 방법에는 크게 2가지가 있다. 1. >>, 이 방법은 Airflow에서 공식적으로 추천하는 방법이다. >>, # task 하나씩 연결해 줄 경우t1 >> t2t1 >> t3t2 >> t4t3 >> t4t5 >> t4# 여러 task를 한꺼번에 나타내 줄 경우t1 >> [t2, t3] >> t4 (> t4위와 같이 >> 화살표의 진행 방향대로 task의 순서를 지정해줄 수 있다. 동시에 수행해야 할 task의 경우 리스트에 넣어 표현해줄 수 있다. 위 처럼 task의 순서를 연결해서 Airflow 웹 서버에서 확인해본다면 아래와 같은 그래프를 확인할 수 있다.  2. 함수 사용해 연결아래 공식문서를 통해 함수를 통한 연결 방법도 알 수 있다.https://a.. 더보기

728x90