본문 바로가기

Minding's Programming/Airflow

[Airflow/Slack] Airflow DAG 실패 시 Slack으로 알림 보내기

728x90
반응형

Slack은 Webhook이라는 기능을 통해 Post 명령으로 특정 채널에 메시지를 보낼 수 있는 기능을 지원한다. 이를 이용해 Airflow DAG에서 에러가 발생할 때, Slack에 메시지를 보내는 기능을 구현해보자.

 

1. 알림을 전달할 채널 생성(또는 선택)

나는 test할 워크스페이스와 채널을 미리 만들어두었다.

 

2. Slack app 생성

https://api.slack.com/messaging/webhooks

 

Sending messages using incoming webhooks

Create an incoming webhook with a unique URL to which you send a JSON payload with message text and options.

api.slack.com

위 링크에 접속한 뒤, 'Create Your Slack app' 버튼을 눌러 아래와 같이 app을 생성한다.

 

1. Create an App 클릭

 

2. From scratch 선택

 

3. 앱 이름과 워크스페이스 선택 후 Create App 클릭

 

4. 좌측 메뉴 바에서 Features의 'Incoming Webhooks' 선택 및 기능 ON으로 설정

 

5. 하단 Webhook URL 섹션에서 'Add New Webhook to Workspace' 클릭

 

6. 채널 선택 및 엑세스 허용

7. 생성된 Webhook URL 확인

 

3. Webhook URL로 메시지 보내기

위에서 생성한 URL을 이용해 아래와 같이 터미널에서 메시지를 보내보자.

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' {URL}

 

Data-Alert 앱이 채널에 메시지를 전달한 것을 확인할 수 있다.

 

4. Airflow 에러를 Slack 메시지로 보내기

위에서 얻은 링크를 Variables로 저장해준 뒤, 이를 이용해 slack에 에러가 발생할 경우 메시지를 보내는 모듈을 개발한다. 그리고 DAG 인스턴스를 만들 때 해당 모듈을 에러 콜백으로 지정해주면 된다.

 

Variables에 'slack_url'이라는 이름으로 URL(뒷부분만)을 입력해준 뒤, 아래와 같은 모듈로 메시지를 보내게 된다. 이 모듈은 slack.py라는 이름으로 plugins 폴더에 배치했다.

from airflow.models import Variable

import logging
import requests

def on_failure_callback(context):
    """
    https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
    Define the callback to post on Slack if a failure is detected in the Workflow
    :return: operator.execute
    """
    text = str(context['task_instance'])
    text += "```" + str(context.get('exception')) +"```"
    send_message_to_a_slack_channel(text, ":scream:")


# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
    # url = "https://slack.com/api/chat.postMessage"
    # "https://hooks.slack.com/services/"까지는 공통적이기 때문에, 뒷부분만 저장함
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    return r

 

그리고 이를 DAG에 적용해보자.

from plugins import slack

...

dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # failure_collback: 모든 task에 대해 실패 시 사용할 함수 - Slack 메시지 전송
        'on_failure_callback': slack.on_failure_callback,
    }
)

 

이제 원래 잘 실행되던 DAG을 살짝 수정해 일부러 에러가 나도록 해보았다.

위와 같이 에러메시지가 Slack으로 전송되는 것을 확인할 수 있다.

728x90