본문 바로가기

반응형

Minding's Programming/Airflow

[Airflow/Xcom] Airflow에서 Xcom 사용해보기 Xcom? (Cross Communication) Xcom은 Airflow DAG 내 task 간의 데이터 공유를 위해 사용되는 기술이다.ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우 주로 작은 규모의 데이터 공유를 위해 사용되며, 1GB 이상의 데이터 공유를 위해서는 AWS S3, HDFS 등의 외부 솔루션 사용이 필요하다. (Xcom 내용은 메타 DB(내부 DB)의 테이블에 값이 저장되기 때문)  Python 오퍼레이터에서 Xcom을 사용하는 방법 파이썬 오퍼레이터에서 Xcom을 사용하는 방법에는 크게 2가지가 있다. 1) **kwargs에 존재하는 ti(task_instance) 객체 활용# 데이터 xcom에 업로드@task(task_id = 'pyt.. 더보기
[Airflow/Macros] Airflow에서 macros(매크로) 사용해보기 Macro 변수는? Jinja 템플릿 내에서 날짜 계산을 가능하게 해주는 기능이다. 파이썬 라이브러리의 datetime과 dateutil을 이용해 날짜 계산을 지원한다. Macro 변수는 주기적으로 실행되는 스케줄에 날짜 계산이 필요할 때 필요하다.ex) 매월 말일마다 DB에서 SQL을 통해 전월 마지막일부터 어제 날짜까지의 데이터를 불러와야 할 때sql = f'''SELECT NAME, ADDRESSFROM TBL_REGWHERE REG_DATE BETWEEN ?? AND ??'''배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN이 설정 되어야함전 월 마지막 일은 {{ data_interval_start }}로 불러올 수 있지만, '어제 날짜'가 문제매 달마다 '어제 날짜'가 다르.. 더보기
[Airflow] Airflow의 날짜 개념 Airflow 날짜 Template 변수 Airflow는 Airbnb에서 개발 시 ETL 도구로써 개발했기 때문에, Airflow의 날짜 개념을 이해하기 위해서는 '데이터 관점'에서의 날짜를 바라볼 수 있어야 한다. 예를 들어 하루마다 주기적으로 수행되는 일 배치의 워크플로우가 있다고 생각해보자. 이 워크플로우는 배치 실행 일 기준 전 날 00시부터 23시 59분까지의 데이터를 추출하는 task를 가지고 있다.그렇게 됐을 때, 데이터 관점에서의 시작 일은 배치를 실행하고 있는 오늘이 아닌 어제인 것이다.(ex. 25일에 task 실행, 데이터는 24일 00:00~24일 23:59 까지의 데이터이므로 data의 시작 일은 25일이 아닌 24일) 이는 Airflow의 Template 변수를 출력해보아도 알 .. 더보기
[Python/Jinja/Airflow] Jinja 템플릿과 Airflow에서의 사용방법 Jinja 템플릿 Jinja 템플릿은 파이썬 언어에서 사용하는 템플릿 엔진으로, 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진이다. Jinja 템플릿은 주로 파이썬 기반 웹 프레임워크인 Flask, Django, FastAPI에서 주로 사용한다. 이 경우 html 템플릿에 내용을 저장하고 화면에 보여질 때 실제 값으로 변환해서 출력한다. 각 상황에 맞게 변환해서 html 템플릿을 보여줄 수 있기 때문에, html 파일의 재활용성이 높아져 효율적으로 이용할 수 있다. SQL 작성 시에도 Jinja 템플릿을 활용할 수 있다. (상황에 따라 {{ }}의 값만 바뀌도록 적용 가능)ex) select * from tables where base_dt = {{ }}  Airf.. 더보기
[Airflow] Python Operator의 op_args, op_kwargs *args와 **kwargs의 개념 정리: [Python] 파이썬 함수 파라미터 *args와 **kwargs op_args 파이썬 오퍼레이터를 사용해 파이썬 함수를 실행시킬 경우 해당 함수의 파라미터는 op_args를 통해 전달한다.as dag: def regist(name, sex): print(f'이름은 {name}이고 성별은 {sex}입니다.') py_task_1 = PythonOperator( task_id = 'py_task_1', python_callable=regist, op_args = ['minding', 'man'] # 리스트 형태로 작성 py_task_1 = PythonOperator( # *args가 있을 경우 task_id = 'py_task_1',.. 더보기
[Python] 파이썬 함수 파라미터 *args와 **kwargs 파이썬의 일반적인 함수 인자 보통의 파이썬 함수는 인자(파라미터)를 필요로 하고, 아래와 같이 쓰인다.def regist(name, sex): print(name) print(sex) 위 함수를 기준으로 봤을 때, 이름과 성별 말고도 추가적인 정보(이메일, 전화번호 등)를 받을 수도 있을 때에는 어떻게 해야할까? 저 함수를 그대로 놓고 추가적인 정보를 제공한다면 에러가 나기 때문에, 수정이 필요하다. *args 첫 번째 방법으로는 *args를 사용할 수 있다.def regist(name, sex, *args): print(type(args)) # args는 tuple 형태로 저장됨 email = args[0] if len(args) >= 1 else None # 부가정보 없는 경우 대비해.. 더보기
[Airflow] Python Operator 사용과 Python Decorator Python Operator 파이썬 오퍼레이터: Airflow에서 Python 함수를 실행시키는 오퍼레이터로, 해당 클래스를 이용해 DAG을 만들면 파이썬 함수를 실행시키는 워크플로우를 생성할 수 있다. 1. 내부 함수 실행# 내부에 직접 함수를 만드는 경우from airflow import DAGimport pendulumimport datetimefrom airflow.operators.python import PythonOperatorimport randomwith DAG( dag_id="dags_python_operator", schedule="30 6 * * *", # 매일 6시 30분 마다 start_date=pendulum.datetime(2023, 3, 1, tz="Asia.. 더보기
[Airflow] Email 오퍼레이터 사용해보기 (Gmail 서버 사용) Gmail 사전 설정 작업 Gmail > 설정 > 모든 설정보기 > 전달 및 POP/IMAP > IMAP 사용 구글 계정관리 > 보안 > 2단계 인증 > 앱 비밀번호 세팅여기서 발급받은 비밀번호는 다시 볼 수 없으니 꼭 기억해두어야 한다.  airflow 설정vi 편집기를 이용해 docker-compose.yaml 파일에 해당 내용을 추가해주어야 한다.  AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' # 사용할 서버 AIRFLOW__SMTP__SMTP_USER: 'tlsfk48@gmail.com' # 로그인할 계정 AIRFLOW__SMTP__SMTP_PASSWORD: '{password}' # 앱 비밀번호 AIRFLOW__SMTP__SMTP_PORT: .. 더보기

728x90