본문 바로가기

Minding's Programming/Airflow

[Airflow/Xcom] Airflow에서 Xcom 사용해보기

728x90
반응형

Xcom? (Cross Communication)

 

Xcom은 Airflow DAG 내 task 간의 데이터 공유를 위해 사용되는 기술이다.

ex) task1의 수행 중 내용이나 결과를 task2에서 사용 또는 입력으로 주고 싶은 경우

 

주로 작은 규모의 데이터 공유를 위해 사용되며, 1GB 이상의 데이터 공유를 위해서는 AWS S3, HDFS 등의 외부 솔루션 사용이 필요하다. (Xcom 내용은 메타 DB(내부 DB)의 테이블에 값이 저장되기 때문)

 

 

Python 오퍼레이터에서 Xcom을 사용하는 방법

 

파이썬 오퍼레이터에서 Xcom을 사용하는 방법에는 크게 2가지가 있다.

 

1) **kwargs에 존재하는 ti(task_instance) 객체 활용

# 데이터 xcom에 업로드
@task(task_id = 'python_xcom_push_task')
def xcom_push(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key="result1", value="value_1")
    ti.xcom_push(key="result2", value=[1,2,3])
    
# 데이터 xcom에서 꺼내와 사용
@task(task_id = 'python_xcom_pull_task')
def xcom_pull(**kwargs):
    ti = kwargs['ti']
    value_key1 = ti.xcom_pull(key="result1") # value_1 이라는 스트링이 해당 변수에 할당
    value_key2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task') # key값이 겹칠 경우 task 이름 명시
    print(value_key1) # 'value_1'
    print(value_key2) # [1,2,3]

template 변수에서 task_instance라는 객체를 얻을 수 있는데, task_instance 객체가 가진 .xcom_push() 메서드를 활용할 수 있다. key/value 형태로 xcom에 데이터를 업로드할 수 있다.

 

해당 데이터를 사용할 때에는 xcom_pull() 메서드를 활용해 해당 데이터를 이용할 수 있다. 이전에 저장한 key값을 이용해 value값을 저장한다.

 

2) Python 함수의 return 값 활용(1)

# Xcom에 데이터 업로드
@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
    transaction_value = 'status Good'
    return transaction_value
    
# Xcom에서 데이터 가져오기
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
    print(status)
    
# task 데코레이터 사용 시 함수 입/출력 관계만으로 task flow 정의 가능
# return값이 자연스럽게 다음 task로 전달되어 사용 가능
xcom_pull_by_return(xcom_push_by_return())

특정 task의 return값을 또 다른 task의 인자값으로 받는 형식으로 코드를 구성하면 Xcom을 이용해 데이터를 공유할 수 있다. Airflow는 return값이 있을 경우에 자동으로 메타 DB에 저장되기 때문이다. 이 경우에는 @task 데코레이터를 이용해 함수 입/출력 관계만으로 task flow를 정의해야 한다.

 

Python 함수의 return 값 활용(2)

# Xcom에 데이터 업로드
@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
    transaction_value = 'status Good'
    return transaction_value
    
# Xcom에서 데이터 불러오기
@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
    ti = kwargs['ti']
    pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return')
    print(pull_value)
    
xcom_push_by_return() >> xcom_pull_by_return()

위에서 보았듯이 return값은 자동으로 Xcom에 저장된다.(key='return_value', task_ids=task_id로 저장됨)

따라서, 이 경우에도 **kwargs에 존재하는 ti 객체를 활용하여 xcom_pull() 메서드를 활용해 데이터를 불러올 수 있다. return한 값을 꺼낼 때에는 key를 명시하지 않고 task_ids에 해당하는 파라미터만 전달해도 사용할 수 있다.

 

 

Bash 오퍼레이터에서 Xcom을 사용하는 방법

 

Bash 오퍼레이터는 template을 이용할 수 있는 env, bash_command 파라미터에서 xocm을 사용할 수 있다. template을 이용하여 Xcom에 push/pull을 할 수 있기 때문이다.

bash_push = BashOperator(
    task_id='bash_push',
    bash_command="echo START && "
    			"echo XCOM PUSHED "
                "{{ ti.xcom_push (key='bash_pushed', value='first_bash_message') }} && "
                "echo COMPLETE" # 마지막 출력문은 자동으로 return_value에 저장된다.
)                

bash_pull = BashOperator(
    task_id = 'bash_pull',
    env={'PUSHED_VALUE':"{{ ti.xcom_pull (key='bash_pushed') }}", 
    	'RETURN_VALUE':"{{ ti.xcom_pull (task_ids='bash_push') }}"}, # task_ids만 지정하면 key= 'return_value'를 의미
    bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
    do_xcom_push=False # 마지막 출력문을 return_value에 저장하지 않음
)

파이썬 오퍼레이터에서 ti 객체를 불러와서 사용했듯이 template 문법에서도 ti객체의 xcom_push() 메서드를 이용해 Xcom을 사용할 수 있다. Bash 오퍼레이터에서는 마지막 출력문이 자동으로 return_value에 저장되어 이 또한 Xcom에서 불러와 사용할 수 있다.

XCom에 저장된 값
위 DAG의 로그

 

 

Python & Bash 오퍼레이터 간 Xcom 사용하는 방법 (오퍼레이터 혼합)

 

1) Python --> Bash 오퍼레이터 Xcom 전달

@task(task_id='python_push')
def python_push_xcom():
    result_dict = {'status':'Good','data':[1,2,3],'options_cnt':100}
    return result_dict
    
bash_pull = BashOperator(
    task_id='bash_pull',
    env={
        'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
        'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
        'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
        },
    bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)

python_push_xcom() >> bash_pull

파이썬 오퍼레이터에서 result_dict라는 딕셔너리를 리턴을 통해 Xcom에 저장한 후, bash 오퍼레이터에서 이전에 사용했던 것처럼 ti 객체를 이용해 위 task_id에 해당하는 리턴값의 데이터를 받을 수 있다. 이후 env에 키 값에 따른 value값을 각각의 변수에 저장해준 뒤 출력하는 코드이다.

 

2) Bash --> Python 오퍼레이터 Xcom 전달

bash_push = BashOperator(
task_id = 'bash_push',
bash_command='echo PUSH_START '
             '{{ti.xcom_push(key="bash_pushed", value=200)}} && '
             'echo PUSH_COMPLETE'
)

@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
    ti = kwargs['ti']
    status_value = ti.xcom_pull(key='bash_pushed')
    return_value = ti.xcom_pull(task_ids='bash_push')
    print(str(status_value))
    print(return_value)
    
bash_push >> python_pull_xcom()

Bash 오퍼레이터에서 python 오퍼레이터로 전달해줄 때에도 마찬가지이다. 불러올 때 kwargs 안에 있는 ti 객체를 이용해 bash에서 xcom에 저장된 값을 이용할 수 있다.

 

이처럼 다른 오퍼레이터에서도 template을 사용할 수 있는 파라미터에 한해서 XCom을 이용한 데이터 공유를 할 수 있다.

728x90