[Airflow/Slack] Airflow DAG 실패 시 Slack으로 알림 보내기

2024. 11. 19. 10:48·Minding's Programming/Airflow
728x90
반응형

Slack은 Webhook이라는 기능을 통해 Post 명령으로 특정 채널에 메시지를 보낼 수 있는 기능을 지원한다. 이를 이용해 Airflow DAG에서 에러가 발생할 때, Slack에 메시지를 보내는 기능을 구현해보자.

 

1. 알림을 전달할 채널 생성(또는 선택)

나는 test할 워크스페이스와 채널을 미리 만들어두었다.

 

2. Slack app 생성

https://api.slack.com/messaging/webhooks

 

Sending messages using incoming webhooks

Create an incoming webhook with a unique URL to which you send a JSON payload with message text and options.

api.slack.com

위 링크에 접속한 뒤, 'Create Your Slack app' 버튼을 눌러 아래와 같이 app을 생성한다.

 

1. Create an App 클릭

 

2. From scratch 선택

 

3. 앱 이름과 워크스페이스 선택 후 Create App 클릭

 

4. 좌측 메뉴 바에서 Features의 'Incoming Webhooks' 선택 및 기능 ON으로 설정

 

5. 하단 Webhook URL 섹션에서 'Add New Webhook to Workspace' 클릭

 

6. 채널 선택 및 엑세스 허용

7. 생성된 Webhook URL 확인

 

3. Webhook URL로 메시지 보내기

위에서 생성한 URL을 이용해 아래와 같이 터미널에서 메시지를 보내보자.

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' {URL}

 

Data-Alert 앱이 채널에 메시지를 전달한 것을 확인할 수 있다.

 

4. Airflow 에러를 Slack 메시지로 보내기

위에서 얻은 링크를 Variables로 저장해준 뒤, 이를 이용해 slack에 에러가 발생할 경우 메시지를 보내는 모듈을 개발한다. 그리고 DAG 인스턴스를 만들 때 해당 모듈을 에러 콜백으로 지정해주면 된다.

 

Variables에 'slack_url'이라는 이름으로 URL(뒷부분만)을 입력해준 뒤, 아래와 같은 모듈로 메시지를 보내게 된다. 이 모듈은 slack.py라는 이름으로 plugins 폴더에 배치했다.

from airflow.models import Variable

import logging
import requests

def on_failure_callback(context):
    """
    https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
    Define the callback to post on Slack if a failure is detected in the Workflow
    :return: operator.execute
    """
    text = str(context['task_instance'])
    text += "```" + str(context.get('exception')) +"```"
    send_message_to_a_slack_channel(text, ":scream:")


# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
    # url = "https://slack.com/api/chat.postMessage"
    # "https://hooks.slack.com/services/"까지는 공통적이기 때문에, 뒷부분만 저장함
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    return r

 

그리고 이를 DAG에 적용해보자.

from plugins import slack

...

dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # failure_collback: 모든 task에 대해 실패 시 사용할 함수 - Slack 메시지 전송
        'on_failure_callback': slack.on_failure_callback,
    }
)

 

이제 원래 잘 실행되던 DAG을 살짝 수정해 일부러 에러가 나도록 해보았다.

위와 같이 에러메시지가 Slack으로 전송되는 것을 확인할 수 있다.

728x90

'Minding's Programming > Airflow' 카테고리의 다른 글

[Airflow] Airflow API를 통해 모니터링하기  (0) 2024.11.19
[Airflow] CustomSensor 만들어 활용하기  (0) 2024.08.05
[Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)  (0) 2024.07.31
[Airflow] Connection Type 신규 추가하는 방법  (0) 2024.07.31
[Airflow] Postgres Hook bulk_load 문제점, Custom Hook으로 bulk_load() 해보기  (0) 2024.07.29
'Minding's Programming/Airflow' 카테고리의 다른 글
  • [Airflow] Airflow API를 통해 모니터링하기
  • [Airflow] CustomSensor 만들어 활용하기
  • [Airflow] Airflow의 기본 센서 (BashSensor, FileSensor, PythonSensor)
  • [Airflow] Connection Type 신규 추가하는 방법
Minding
Minding
  • Minding
    Today's Minding
    Minding
  • 전체
    오늘
    어제
    • 울고넘는 딥러닝 (278)
      • Minding's Baseball (57)
        • MLB Statcast (29)
        • 머신러닝으로 홈런왕 예측하기 (3)
        • 야구칼럼 (12)
        • 야구 규칙, 용어 (1)
        • 2022-23 질롱 코리아 (8)
        • 류현진 등판경기 (4)
      • Minding's Programming (185)
        • 프로그래머스 코딩테스트 (21)
        • Knowledge (44)
        • Numpy & Pandas (6)
        • Excel (3)
        • Git (1)
        • Pygame (11)
        • CV (3)
        • Tensorflow tutorial (4)
        • Kaggle and Dacon (4)
        • 에러 코드 (8)
        • FastAPI (8)
        • Airflow (29)
        • Crawling (6)
        • Django (14)
        • AWS (18)
        • Spark (5)
      • Minding's Reading (30)
        • 머신러닝 딥러닝에 필요한 기초 수학 with 파이.. (2)
        • 칼만필터는 어렵지 않아 (11)
        • 밑바닥부터 시작하는 딥러닝 (6)
        • 메이저리그 야구 통계학 2e (8)
        • 논문읽기 (2)
        • 빅데이터를 지탱하는 기술 (1)
      • Minding's Life (5)
        • 주식 (4)
        • 각종 소식 (1)
  • 블로그 메뉴

    • 홈
    • Baseball
    • Programming
    • Reading
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    프로그래머스
    AWS
    칼만필터는어렵지않아
    Python
    MLB
    django python
    mlb stats api
    코딩테스트
    파이게임
    데이터분석
    칼만필터는어렵지않아파이썬
    메이저리그
    데이터 엔지니어
    Airflow
    KBO
    KalmanFilter
    pygame
    파이썬게임개발
    질롱코리아
    야구
    넘파이
    django
    에어플로우
    칼만필터
    파이썬
    FastAPI
    게임개발
    머신러닝
    딥러닝
    칼만필터는어렵지않아python
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
Minding
[Airflow/Slack] Airflow DAG 실패 시 Slack으로 알림 보내기
상단으로

티스토리툴바