본문 바로가기

Minding's Programming/Airflow

[Airflow] Airflow DAG 만들기 & DAG 디렉토리 셋팅

728x90
반응형

operator와 관련 용어 개념

오퍼레이터: 특정 행위를 할 수 있는 기능을 모아놓은 클래스(설계도)

Task: 오퍼레이터에서 객체화되어 DAG에서 실행 가능한 오브젝트

Bash 오퍼레이터: 쉘 스크립트 명령을 수행하는 오퍼레이터, 이외에도 Python 오퍼레이터 등 다양함.

 

오퍼레이터를 통해 Task를 만들고, DAG를 통해 각 Task를 실행시킴.

 

Task의 수행 주체는?

 

Airflow의 전체적인 아키텍처는 다음과 같다.

1. 스케줄러에서 DAG파일을 파싱

2. 스케줄러가 해당 정보를 메타DB에 저장

3. DAG의 Start 시간 파악

4. Start 시간 도달 시 워커에 DAG 파일에 의한 워크플로우 시작 지시

5. 워커는 DAG파일을 읽어들인 후 처리 (실행 전에도 메타 DB에 업데이트)

6. 워크플로우 완료 후 결과 메타DB에 업데이트

 

여기서 스케줄러는 일종의 '뇌'같은 존재로, DAG 파일을 읽어 문법적인 오류 및 Task 간 관계 등을 파악하고 시작 시간을 결정한다.

Task를 실제 수행하는 주체는 워커로, DAG 파일을 읽고 작업 전후로 메타 DB에 업데이트한다.

 

 

DAG 샘플 수정해 DAG 만들어보기

docker로 airflow를 설치했을 경우 wsl에서 아래 명령어를 통해 airflow를 실행시켜 준다.

sudo service docker start

sudo docker compose up

 

그런 뒤 http://localhost:8080/ 으로 접속해 airflow UI가 있는 화면으로 접속한 뒤 DAG 하나를 선택한다. 나 같은 경우는 example_bash_operator를 선택했다. 이후 Code 탭으로 가면 python 코드가 나오는데, 해당 코드를 복사해 로컬 컴퓨터(windows)에서 수정해줄 것이다. 나는 강의를 보며 진행했기에, 아래와 같이 수정했다.

from __future__ import annotations

import datetime

import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="dags_bash_operator", # Airflow UI에서 보이는 이름(py 파일 이름과 관계없으나 동일하게 하는 것이 좋음)
    schedule="0 0 * * *", # 분 / 시 / 일 / 월 / 요일
    start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
    catchup=False, # 현재 시점을 기준으로 start_date부터 누락된 workflow의 실행 여부 (True: 누락된 부분 모두 실행)
    # dagrun_timeout=datetime.timedelta(minutes=60), # n분 이상 실행하고 있을 시 실패로 판단
    tags=["practice"], # UI의 DAG 아래에 있는 태그 설정
    # params={"example_key": "example_value"}, # dag 설정 아래에 공통적인 인자가 있을 시
) as dag: # 실제 Task를 아래에 작성
    bash_t1 = BashOperator(
        task_id="bash_t1", # UI에 나타나는 Task의 이름
        bash_command="echo whoami",
    )

    bash_t2 = BashOperator(
        task_id="bash_t2", # UI에 나타나는 Task의 이름
        bash_command="echo $HOSTNAME",
    )

    bash_t1 >> bash_t2 # task의 실행 순서 명시

* 코드 작성 시 주의할 점

- 파이썬 파일 이름과 DAG의 이름은 동일하게 하는 것이 좋다.

- Task 변수 이름과 UI에 노출되는 Task의 이름은 동일하게 하는 것이 좋다.

 

위 DAG에는 두 가지(bash_t1, bash_t2)의 task가 있는데, 한 가지는 'whoami'라는 텍스트를 출력하는 것이고, 하나는 현재 Hostname을 출력하는 간단한 task다.

 

이 파일을 나는 미리 만들어 둔 dags 폴더에 넣은 뒤, git repository에 push해 주었다. wsl 환경에서 pull을 통해 같은 파일을 받아주기 위해서다.

 

DAG 디렉토리 세팅

 

그리고 docker-compose.yaml 파일을 수정해 해당 DAG 파일이 있는 곳으로 연결해 줄 필요가 있다. 그렇지 않으면 해당 파일을 인식하지 못한다.

vi docker-compose.yaml

>>>
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags << 이 부분을 아래와 같이 수정해준다.

- ${AIRFLOW_PROJ_DIR:-.}{레포지토리폴더}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/airflow/dags:/opt/airflow/dags << 나의 경우 repo의 이름이 airflow다.

 

수정할 때에는 vi 명령어를 통해 해당 파일 편집기에 접속한 뒤, 'i'를 눌러 insert 모드 실행 및 수정 진행한뒤, 'esc'를 눌러 모드를 빠져나오고, ':wq!'를 입력해 해당 파일을 저장해주면 된다.

 

 

DAG 실행 및 확인

DAG 폴더가 정상적으로 반영되도록 docker를 다시 실행해준다. (맨 위 airflow를 실행하듯)

그리고 난 뒤 http://localhost:8080/으로 접속해주면, 내가 만든 DAG가 노출되는 것을 확인할 수 있다.

 

맨 아래

위 스크린 샷에는 DAG이 실행상태로 되어있지만 기본적으로는 Pause 상태로 되어있다. 해당 버튼을 눌러 Resume 시켜준 뒤, 왼쪽 상태 창의 초록색 네모 박스를 눌러준다. (실행 뒤 새로고침이 필요할 수 있음)

그 뒤 logs 탭을 누르면, bash operator를 통해 어떤 것들이 출력되는지 알 수 있다.

bash_t1의 경우 내가 의도한대로 'whoami'가 노출되는 것을 알 수 있다.

 

bash_t2도 한번 살펴보자.

hostname을 출력하는 명령어를 적었던 bash_t2 task는 '1d807c567ee6'이라는 문자를 출력했다. 이게 어떤 host의 이름인지 알아보자.

 

sudo docker ps

>>>
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                   PORTS                                       NAMES
c94dde143bf4   apache/airflow:2.9.3   "/usr/bin/dumb-init …"   3 minutes ago    Up 3 minutes (healthy)   8080/tcp                                    minding-airflow-scheduler-1
5d9dd50c5704   apache/airflow:2.9.3   "/usr/bin/dumb-init …"   3 minutes ago    Up 3 minutes (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   minding-airflow-webserver-1
1d807c567ee6   apache/airflow:2.9.3   "/usr/bin/dumb-init …"   3 minutes ago    Up 3 minutes (healthy)   8080/tcp                                    minding-airflow-worker-1
b35b4bc7b8ad   apache/airflow:2.9.3   "/usr/bin/dumb-init …"   3 minutes ago    Up 3 minutes (healthy)   8080/tcp                                    minding-airflow-triggerer-1
6010d395eda6   postgres:13            "docker-entrypoint.s…"   13 minutes ago   Up 3 minutes (healthy)   5432/tcp                                    minding-postgres-1
1e5851a6223e   redis:7.2-bookworm     "docker-entrypoint.s…"   13 minutes ago   Up 3 minutes (healthy)   6379/tcp                                    minding-redis-1

CONTAINER ID를 살펴보면, 위 bash_t2에서 출력했던 hostname이 worker 컨테이너의 ID와 동일하다는 것을 알 수 있다. 위 개념에서도 살펴볼 수 있었듯이, task를 실행하는 주체는 worker라는 것을 다시 한 번 확인할 수 있었다.

728x90