Slack은 Webhook이라는 기능을 통해 Post 명령으로 특정 채널에 메시지를 보낼 수 있는 기능을 지원한다. 이를 이용해 Airflow DAG에서 에러가 발생할 때, Slack에 메시지를 보내는 기능을 구현해보자.
1. 알림을 전달할 채널 생성(또는 선택)
나는 test할 워크스페이스와 채널을 미리 만들어두었다.
2. Slack app 생성
위 링크에 접속한 뒤, 'Create Your Slack app' 버튼을 눌러 아래와 같이 app을 생성한다.
1. Create an App 클릭
2. From scratch 선택
3. 앱 이름과 워크스페이스 선택 후 Create App 클릭
4. 좌측 메뉴 바에서 Features의 'Incoming Webhooks' 선택 및 기능 ON으로 설정
5. 하단 Webhook URL 섹션에서 'Add New Webhook to Workspace' 클릭
6. 채널 선택 및 엑세스 허용
7. 생성된 Webhook URL 확인
3. Webhook URL로 메시지 보내기
위에서 생성한 URL을 이용해 아래와 같이 터미널에서 메시지를 보내보자.
curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' {URL}
Data-Alert 앱이 채널에 메시지를 전달한 것을 확인할 수 있다.
4. Airflow 에러를 Slack 메시지로 보내기
위에서 얻은 링크를 Variables로 저장해준 뒤, 이를 이용해 slack에 에러가 발생할 경우 메시지를 보내는 모듈을 개발한다. 그리고 DAG 인스턴스를 만들 때 해당 모듈을 에러 콜백으로 지정해주면 된다.
Variables에 'slack_url'이라는 이름으로 URL(뒷부분만)을 입력해준 뒤, 아래와 같은 모듈로 메시지를 보내게 된다. 이 모듈은 slack.py라는 이름으로 plugins 폴더에 배치했다.
from airflow.models import Variable
import logging
import requests
def on_failure_callback(context):
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
# url = ""
# ""까지는 공통적이기 때문에, 뒷부분만 저장함
url = ""+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
r =, json=data, headers=headers)
return r
그리고 이를 DAG에 적용해보자.
from plugins import slack
dag = DAG(
dag_id = 'name_gender_v4',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
# failure_collback: 모든 task에 대해 실패 시 사용할 함수 - Slack 메시지 전송
'on_failure_callback': slack.on_failure_callback,
이제 원래 잘 실행되던 DAG을 살짝 수정해 일부러 에러가 나도록 해보았다.
위와 같이 에러메시지가 Slack으로 전송되는 것을 확인할 수 있다.
