지난 포스팅에서 Docker를 통해 custom postgresDB 연결을 위한 컨테이너를 만들고 해당 DB에 연결하는 것까지 실습해보았다. ( Postgres 컨테이너 추가해 DB 접속하기 글 링크 )
이제 연결한 PostgresDB에 데이터를 Insert하는 DAG을 실행 시켜보려고 하는데, 어떤 문제가 발생한다.
# Postgres 데이터 Insert하는 DAG
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_python_with_postgres',
start_date=pendulum.datetime(2023,4,1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
def insrt_postgres(ip, port, dbname, user, passwd, **kwargs):
import psycopg2
from contextlib import closing
# closing은 with문을 벗어난 순간 자동으로 conn.close() 함수를 실행시켜 주는 역할
with closing(psycopg2.connect(host=ip, dbname=dbname, user=user, password=passwd, port=int(port))) as conn: # DB서버와 연결하는 Session 열기
with closing(conn.cursor()) as cursor: # cursor: Session을 통해서 서버 간 데이터를 옮기는 객체
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);' # SQL문 입력
cursor.execute(sql,(dag_id,task_id,run_id,msg))
conn.commit()
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_args=['172.28.0.3', '5432', 'minding', 'minding', 'minding']
)
insrt_postgres
psycopg2를 이용해 Postgres DB와 연결하는 insrt_postgres()라는 함수를 만들어 PythonOperator로 실행시키는 DAG을 만들어 실행 시켜보았다. (DAG을 실행하기 전에 미리 TABLE을 만들어두어야 한다!)
DAG 자체는 정상적으로 실행되었고, 데이터 자체도 DB에 정상적으로 반영된 것을 확인할 수 있었다.
그런데 어떤게 문제인걸까?
먼저, DAG 코드 상 접속정보가 노출된다는 것이다. 개인적으로만 사용하는 코드라면 상관 없겠지만, 회사 내 서비스 개발용이라면 이야기가 달라진다.
또한 위 접속정보가 변경된다면 대응이 어렵다는 것이다. 만약 직접 접속하는 DAG을 수백개 만들어뒀다면, 일일이 수정할 수도 없는 노릇이다.
이를 해결할 방법은 두 가지 정도가 떠오른다. User이름, Password 등을 Variable(전역변수)에 등록하고 꺼내오는 것 또는 Hook을 이용하는 것이다. 많은 Airflow 이용자들은 Variable 보다는 Hook 사용을 권장한다. 왜냐하면 Variable 등록이 필요없는 간단한 방법이기 때문이다.
Hook? (+ Connection)
개념
Connection: Airflow webserver UI 화면에서 등록할 수 있는 커넥션 정보
Hook: Airflow에서 외부 솔루셤의 기능을 사용할 수 있도록 미리 구현된 메서드를 가진 클래스
Hook은 Connection 정보를 통해 생성되는 객체이기 때문에, Connection과 뗄레야 뗄 수 없는 사이라고 할 수 있다.(따라서 Hook을 사용하려면 Connection을 먼저 등록해주어야 한다.) Hook이 위와 같은 보안 문제를 해결할 수 있는 이유는 접속 정보를 Connection을 통해 받아오기 때문에, 이 접속 정보가 코드상으로 노출되지 않게 만들 수 있다.
또한 Hook은 특정 솔루션을 다룰 수 있는 메서드가 미리 구현되어 있다는 장점이 있다. Postgres, redis 등의 외부 서비스를 보다 손쉽게 다룰 수 있다.
그러나 Hook은 오퍼레이터/센서와는 달리 task를 만드는 역할은 하지 못하기 때문에, Custom Operator 또는 Python Operator 안의 함수로 사용할 수 있다.
Hook 사용해보기
1) Connection 등록 (Postgres)
Admin 아래에 있는 Connections 항목에 들어가 아래와 같이 Connection을 등록해준다.
Connection_id | 자유로운 이름 |
Connection_type | Postgres |
Host | docker-compose.yaml 파일에 등록한 해당 DB 컨테이너의 고정 IP |
Database | DB 이름 |
Login | 유저 ID |
Password | 유저 비밀번호 |
Port | docker-compose.yaml 파일에 등록한 해당 DB 컨테이너의 포트 |
이후 Hook을 이용한 DAG을 작성해준다.
) as dag:
def insrt_postgres(postgres_conn_id, **kwargs):
from airflow.providers.postgres.hooks.postgres import PostgresHook
from contextlib import closing
postgres_hook = PostgresHook(postgres_conn_id)
with closing(postgres_hook.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
dag_id = kwargs.get('ti').dag_id
task_id = kwargs.get('ti').task_id
run_id = kwargs.get('ti').run_id
msg = 'hook insrt 수행'
sql = 'insert into py_opr_drct_insrt values (%s,%s,%s,%s);'
cursor.execute(sql, (dag_id, task_id, run_id, msg))
conn.commit()
insrt_postgres_with_hook = PythonOperator(
task_id='insrt_postgres_with_hook',
python_callable=insrt_postgres,
op_kwargs={'postgres_conn_id':'conn-db-postgres-custom'}
)
insrt_postgres_with_hook
위에서 작성한 DAG과 크게 다르진 않지만, PostgresHook 이라는 클래스를 불러와 .get_conn()이라는 함수를 사용해 DB와 연결을 시도하고 있다. get_conn() 함수는 위에서 등록한 Connection 정보를 get_connection() 메서드를 통해 불러와서, psycopg2 라이브러리를 이용해 DB와 연결하는 역할을 한다. 이 방법을 쓰면, 위에서 직접 DB를 연결한 것과 같은 방법이지만, 보안 정보를 숨겨줄 수 있다.
이제 위 DAG을 실행해 DB에 정상적으로 데이터가 INSERT되는지 확인해보자.
'hook insrt 수행' 이라는 메세지가 담긴 데이터가 정상적으로 반영된 것을 확인할 수 있다.
'Minding's Programming > Airflow' 카테고리의 다른 글
[Airflow] Connection Type 신규 추가하는 방법 (0) | 2024.07.31 |
---|---|
[Airflow] Postgres Hook bulk_load 문제점, Custom Hook으로 bulk_load() 해보기 (0) | 2024.07.29 |
[Airflow/Docker] Postgres 컨테이너 추가해 DB 접속하기 (0) | 2024.07.26 |
[Airflow/Docker] Docker-compose.yaml 파일 해석 (0) | 2024.07.26 |
[Airflow] Custom Operator 개발해보기 (BaseOperator) (0) | 2024.07.26 |