본문 바로가기

Minding's Programming/Airflow

[Airflow] Connection과 Hook

728x90
반응형

지난 포스팅에서 Docker를 통해 custom postgresDB 연결을 위한 컨테이너를 만들고 해당 DB에 연결하는 것까지 실습해보았다. ( Postgres 컨테이너 추가해 DB 접속하기 글 링크 )

 

[Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기

docker-compose.yaml 파일 수정해 Postgres 컨테이너 추가하기 먼저 docker-compose.yaml 파일을 열어 services 항목에 아래 내용을 추가한다.services postgres_custom: image: postgres:13 environment: POSTGRES_USER: minding POSTGRES_

minding-deep-learning.tistory.com

 

이제 연결한 PostgresDB에 데이터를 Insert하는 DAG을 실행 시켜보려고 하는데, 어떤 문제가 발생한다.

# Postgres 데이터 Insert하는 DAG

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='dags_python_with_postgres',
    start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
    schedule=None,
    catchup=False
) as dag:

    
    def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
        import psycopg2
        from contextlib import closing

		# closing은 with문을 벗어난 순간 자동으로 conn.close() 함수를 실행시켜 주는 역할
        with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn: # DB서버와 연결하는 Session 열기
            with closing(conn.cursor()) as cursor: # cursor: Session을 통해서 서버 간 데이터를 옮기는 객체
                dag_id = kwargs.get('ti').dag_id
                task_id = kwargs.get('ti').task_id
                run_id = kwargs.get('ti').run_id
                msg = 'insrt 수행'
                sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);' # SQL문 입력
                cursor.execute(sql,(dag_id,task_id,run_id,msg))
                conn.commit()

    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_args=['172.28.0.3', '5432', 'minding', 'minding', 'minding']
    )
        
    insrt_postgres

psycopg2를 이용해 Postgres DB와 연결하는 insrt_postgres()라는 함수를 만들어 PythonOperator로 실행시키는 DAG을 만들어 실행 시켜보았다. (DAG을 실행하기 전에 미리 TABLE을 만들어두어야 한다!)

 

DAG 자체는 정상적으로 실행되었고, 데이터 자체도 DB에 정상적으로 반영된 것을 확인할 수 있었다.

DBeaver로 확인한 모습

 

그런데 어떤게 문제인걸까?

 

먼저, DAG 코드 상 접속정보가 노출된다는 것이다. 개인적으로만 사용하는 코드라면 상관 없겠지만, 회사 내 서비스 개발용이라면 이야기가 달라진다.

 

또한 위 접속정보가 변경된다면 대응이 어렵다는 것이다. 만약 직접 접속하는 DAG을 수백개 만들어뒀다면, 일일이 수정할 수도 없는 노릇이다.

 

이를 해결할 방법은 두 가지 정도가 떠오른다. User이름, Password 등을 Variable(전역변수)에 등록하고 꺼내오는 것 또는 Hook을 이용하는 것이다. 많은 Airflow 이용자들은 Variable 보다는 Hook 사용을 권장한다. 왜냐하면 Variable 등록이 필요없는 간단한 방법이기 때문이다.

 

 

Hook? (+ Connection)

 

개념

Connection:  Airflow webserver UI 화면에서 등록할 수 있는 커넥션 정보
Hook:  Airflow에서 외부 솔루셤의 기능을 사용할 수 있도록 미리 구현된 메서드를 가진 클래스

 

Hook은 Connection 정보를 통해 생성되는 객체이기 때문에, Connection과 뗄레야 뗄 수 없는 사이라고 할 수 있다.(따라서 Hook을 사용하려면 Connection을 먼저 등록해주어야 한다.) Hook이 위와 같은 보안 문제를 해결할 수 있는 이유는 접속 정보를 Connection을 통해 받아오기 때문에, 이 접속 정보가 코드상으로 노출되지 않게 만들 수 있다.

 

또한 Hook은 특정 솔루션을 다룰 수 있는 메서드가 미리 구현되어 있다는 장점이 있다. Postgres, redis 등의 외부 서비스를 보다 손쉽게 다룰 수 있다.

 

그러나 Hook은 오퍼레이터/센서와는 달리 task를 만드는 역할은 하지 못하기 때문에, Custom Operator 또는 Python Operator 안의 함수로 사용할 수 있다.

 

 

Hook 사용해보기

 

1) Connection 등록 (Postgres)

Admin 아래에 있는 Connections 항목에 들어가 아래와 같이 Connection을 등록해준다.

Connection_id 자유로운 이름
Connection_type Postgres
Host docker-compose.yaml 파일에 등록한 해당 DB 컨테이너의 고정 IP
Database DB 이름
Login 유저 ID
Password 유저 비밀번호
Port docker-compose.yaml 파일에 등록한 해당 DB 컨테이너의 포트

 

이후 Hook을 이용한 DAG을 작성해준다.

) as dag:
    def insrt_postgres(postgres_conn_id, **kwargs):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        from contextlib import closing
        
        postgres_hook = PostgresHook(postgres_conn_id)
        with closing(postgres_hook.get_conn()) as conn:
            with closing(conn.cursor()) as cursor:
                dag_id = kwargs.get('ti').dag_id
                task_id = kwargs.get('ti').task_id
                run_id = kwargs.get('ti').run_id
                msg = 'hook insrt 수행'
                sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
                cursor.execute(sql, (dag_id, task_id, run_id, msg))
                conn.commit()

    insrt_postgres_with_hook = PythonOperator(
        task_id='insrt_postgres_with_hook',
        python_callable=insrt_postgres,
        op_kwargs={'postgres_conn_id':'conn-db-postgres-custom'}
    )
    insrt_postgres_with_hook

위에서 작성한 DAG과 크게 다르진 않지만, PostgresHook 이라는 클래스를 불러와 .get_conn()이라는 함수를 사용해 DB와 연결을 시도하고 있다. get_conn() 함수는 위에서 등록한 Connection 정보를 get_connection() 메서드를 통해 불러와서, psycopg2 라이브러리를 이용해 DB와 연결하는 역할을 한다. 이 방법을 쓰면, 위에서 직접 DB를 연결한 것과 같은 방법이지만, 보안 정보를 숨겨줄 수 있다.

 

이제 위 DAG을 실행해 DB에 정상적으로 데이터가 INSERT되는지 확인해보자.

'hook insrt 수행' 이라는 메세지가 담긴 데이터가 정상적으로 반영된 것을 확인할 수 있다.

 

참고: https://airflow.apache.org/docs/apache-airflow-providers-postgres/5.11.2/_api/airflow/providers/postgres/hooks/postgres/index.html

 

airflow.providers.postgres.hooks.postgres — apache-airflow-providers-postgres Documentation

 

airflow.apache.org

 

728x90