[Airflow] Connection과 Hook

2024. 7. 27. 18:43·Minding's Programming/Airflow
728x90
반응형

지난 포스팅에서 Docker를 통해 custom postgresDB 연결을 위한 컨테이너를 만들고 해당 DB에 연결하는 것까지 실습해보았다. ( Postgres 컨테이너 추가해 DB 접속하기 글 링크 )

 

[Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기

docker-compose.yaml 파일 수정해 Postgres 컨테이너 추가하기 먼저 docker-compose.yaml 파일을 열어 services 항목에 아래 내용을 추가한다.services postgres_custom: image: postgres:13 environment: POSTGRES_USER: minding POSTGRES_

minding-deep-learning.tistory.com

 

이제 연결한 PostgresDB에 데이터를 Insert하는 DAG을 실행 시켜보려고 하는데, 어떤 문제가 발생한다.

# Postgres 데이터 Insert하는 DAG

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='dags_python_with_postgres',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:

    
    def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
        import psycopg2
        from contextlib import closing

		# closing은 with문을 벗어난 순간 자동으로 conn.close() 함수를 실행시켜 주는 역할
        with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn: # DB서버와 연결하는 Session 열기
            with closing(conn.cursor()) as cursor: # cursor: Session을 통해서 서버 간 데이터를 옮기는 객체
                dag_id = kwargs.get('ti').dag_id
                task_id = kwargs.get('ti').task_id
                run_id = kwargs.get('ti').run_id
                msg = 'insrt 수행'
                sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);' # SQL문 입력
                cursor.execute(sql,(dag_id,task_id,run_id,msg))
                conn.commit()

    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_args=['172.28.0.3', '5432', 'minding', 'minding', 'minding']
    )
        
    insrt_postgres

psycopg2를 이용해 Postgres DB와 연결하는 insrt_postgres()라는 함수를 만들어 PythonOperator로 실행시키는 DAG을 만들어 실행 시켜보았다. (DAG을 실행하기 전에 미리 TABLE을 만들어두어야 한다!)

 

DAG 자체는 정상적으로 실행되었고, 데이터 자체도 DB에 정상적으로 반영된 것을 확인할 수 있었다.

DBeaver로 확인한 모습

 

그런데 어떤게 문제인걸까?

 

먼저, DAG 코드 상 접속정보가 노출된다는 것이다. 개인적으로만 사용하는 코드라면 상관 없겠지만, 회사 내 서비스 개발용이라면 이야기가 달라진다.

 

또한 위 접속정보가 변경된다면 대응이 어렵다는 것이다. 만약 직접 접속하는 DAG을 수백개 만들어뒀다면, 일일이 수정할 수도 없는 노릇이다.

 

이를 해결할 방법은 두 가지 정도가 떠오른다. User이름, Password 등을 Variable(전역변수)에 등록하고 꺼내오는 것 또는 Hook을 이용하는 것이다. 많은 Airflow 이용자들은 Variable 보다는 Hook 사용을 권장한다. 왜냐하면 Variable 등록이 필요없는 간단한 방법이기 때문이다.

 

 

Hook? (+ Connection)

 

개념

Connection:  Airflow webserver UI 화면에서 등록할 수 있는 커넥션 정보
Hook:  Airflow에서 외부 솔루셤의 기능을 사용할 수 있도록 미리 구현된 메서드를 가진 클래스

 

Hook은 Connection 정보를 통해 생성되는 객체이기 때문에, Connection과 뗄레야 뗄 수 없는 사이라고 할 수 있다.(따라서 Hook을 사용하려면 Connection을 먼저 등록해주어야 한다.) Hook이 위와 같은 보안 문제를 해결할 수 있는 이유는 접속 정보를 Connection을 통해 받아오기 때문에, 이 접속 정보가 코드상으로 노출되지 않게 만들 수 있다.

 

또한 Hook은 특정 솔루션을 다룰 수 있는 메서드가 미리 구현되어 있다는 장점이 있다. Postgres, redis 등의 외부 서비스를 보다 손쉽게 다룰 수 있다.

 

그러나 Hook은 오퍼레이터/센서와는 달리 task를 만드는 역할은 하지 못하기 때문에, Custom Operator 또는 Python Operator 안의 함수로 사용할 수 있다.

 

 

Hook 사용해보기

 

1) Connection 등록 (Postgres)

Admin 아래에 있는 Connections 항목에 들어가 아래와 같이 Connection을 등록해준다.

Connection_id 자유로운 이름
Connection_type Postgres
Host docker-compose.yaml 파일에 등록한 해당 DB 컨테이너의 고정 IP
Database DB 이름
Login 유저 ID
Password 유저 비밀번호
Port docker-compose.yaml 파일에 등록한 해당 DB 컨테이너의 포트

 

이후 Hook을 이용한 DAG을 작성해준다.

) as dag:
    def insrt_postgres(postgres_conn_id, **kwargs):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        from contextlib import closing
        
        postgres_hook = PostgresHook(postgres_conn_id)
        with closing(postgres_hook.get_conn()) as conn:
            with closing(conn.cursor()) as cursor:
                dag_id = kwargs.get('ti').dag_id
                task_id = kwargs.get('ti').task_id
                run_id = kwargs.get('ti').run_id
                msg = 'hook insrt 수행'
                sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
                cursor.execute(sql, (dag_id, task_id, run_id, msg))
                conn.commit()

    insrt_postgres_with_hook = PythonOperator(
        task_id='insrt_postgres_with_hook',
        python_callable=insrt_postgres,
        op_kwargs={'postgres_conn_id':'conn-db-postgres-custom'}
    )
    insrt_postgres_with_hook

위에서 작성한 DAG과 크게 다르진 않지만, PostgresHook 이라는 클래스를 불러와 .get_conn()이라는 함수를 사용해 DB와 연결을 시도하고 있다. get_conn() 함수는 위에서 등록한 Connection 정보를 get_connection() 메서드를 통해 불러와서, psycopg2 라이브러리를 이용해 DB와 연결하는 역할을 한다. 이 방법을 쓰면, 위에서 직접 DB를 연결한 것과 같은 방법이지만, 보안 정보를 숨겨줄 수 있다.

 

이제 위 DAG을 실행해 DB에 정상적으로 데이터가 INSERT되는지 확인해보자.

'hook insrt 수행' 이라는 메세지가 담긴 데이터가 정상적으로 반영된 것을 확인할 수 있다.

 

참고: https://airflow.apache.org/docs/apache-airflow-providers-postgres/5.11.2/_api/airflow/providers/postgres/hooks/postgres/index.html

 

airflow.providers.postgres.hooks.postgres — apache-airflow-providers-postgres Documentation

 

airflow.apache.org

 

728x90

'Minding's Programming > Airflow' 카테고리의 다른 글

[Airflow] Connection Type 신규 추가하는 방법  (0) 2024.07.31
[Airflow] Postgres Hook bulk_load 문제점, Custom Hook으로 bulk_load() 해보기  (0) 2024.07.29
[Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기  (0) 2024.07.26
[Airflow/Docker] Docker-compose.yaml 파일 해석  (0) 2024.07.26
[Airflow] Custom Operator 개발해보기 (BaseOperator)  (0) 2024.07.26
'Minding's Programming/Airflow' 카테고리의 다른 글
  • [Airflow] Connection Type 신규 추가하는 방법
  • [Airflow] Postgres Hook bulk_load 문제점, Custom Hook으로 bulk_load() 해보기
  • [Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기
  • [Airflow/Docker] Docker-compose.yaml 파일 해석
Minding
Minding
  • Minding
    Today's Minding
    Minding
  • 전체
    오늘
    어제
    • 울고넘는 딥러닝 (278)
      • Minding's Baseball (57)
        • MLB Statcast (29)
        • 머신러닝으로 홈런왕 예측하기 (3)
        • 야구칼럼 (12)
        • 야구 규칙, 용어 (1)
        • 2022-23 질롱 코리아 (8)
        • 류현진 등판경기 (4)
      • Minding's Programming (185)
        • 프로그래머스 코딩테스트 (21)
        • Knowledge (44)
        • Numpy & Pandas (6)
        • Excel (3)
        • Git (1)
        • Pygame (11)
        • CV (3)
        • Tensorflow tutorial (4)
        • Kaggle and Dacon (4)
        • 에러 코드 (8)
        • FastAPI (8)
        • Airflow (29)
        • Crawling (6)
        • Django (14)
        • AWS (18)
        • Spark (5)
      • Minding's Reading (30)
        • 머신러닝 딥러닝에 필요한 기초 수학 with 파이.. (2)
        • 칼만필터는 어렵지 않아 (11)
        • 밑바닥부터 시작하는 딥러닝 (6)
        • 메이저리그 야구 통계학 2e (8)
        • 논문읽기 (2)
        • 빅데이터를 지탱하는 기술 (1)
      • Minding's Life (5)
        • 주식 (4)
        • 각종 소식 (1)
  • 블로그 메뉴

    • 홈
    • Baseball
    • Programming
    • Reading
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    FastAPI
    에어플로우
    칼만필터는어렵지않아파이썬
    프로그래머스
    KalmanFilter
    칼만필터는어렵지않아
    django python
    pygame
    KBO
    파이썬
    메이저리그
    django
    딥러닝
    데이터 엔지니어
    머신러닝
    칼만필터는어렵지않아python
    넘파이
    Python
    코딩테스트
    질롱코리아
    Airflow
    mlb stats api
    파이썬게임개발
    파이게임
    데이터분석
    MLB
    칼만필터
    AWS
    게임개발
    야구
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
Minding
[Airflow] Connection과 Hook
상단으로

티스토리툴바