본문 바로가기

Minding's Programming/Spark

[Spark/pySpark] SparkSQL UDF(User Define Function)

728x90
반응형

UDF?

UDF(User Define Function)는 SQL(Spark에서는 DataFrame까지)에서 적용할 수 있는 사용자 정의 함수다. 일반적으로 SQL에서 Scalar함수(UPPER, LOWER 등), Aggregation함수(SUM, MIN, AVG 등)를 제공하고 있지만, 상황에 따라서 특정 계산식이 반복해서 필요할 때가 있다. 그럴 때 유용하게 사용할 수 있는 것이 UDF이다.

 

pySpark에서 UDF를 사용해보기

UDF는 크게 두 가지의 종류가 있다.

  • Transformation 함수
  • UDAF(User Define Aggregation Function): Aggregation 환경에서 사용하는 함수(Pyspark에서는 미지원)

UDAF의 경우는 스칼라 또는 자바로 구현해야한다. 그렇다면, 다른 함수는 파이썬 환경에서 어떻게 구현할 수 있을까?

  • Python lambda 함수
  • Python 일반 함수
  • Python Pandas 함수

위와 같이 일반적으로 사용되는 함수를 지정한 뒤, 해당 함수를 UDF로 등록해주면 된다. 3가지 방법 중에서는 Pandas함수가 가장 추천되는 방식이다. 레코드 하나씩 함수를 적용하는 것이 아닌, 데이터 전체를 Series 형태로 받아 처리할 수 있기 때문이다. UDF는 아래 메서드를 이용해 등록 가능하다.

  • pyspark.sql.functions.udf - 데이터프레임에서만 사용 가능
  • spark.udf.register - SQL 모두에서 사용 가능
  • pyspark.sql.functions.pandas_udf - Pandas 함수 등록 가능(데이터프레임에서만 사용 가능)

아래는 UDF를 파이썬 함수로 정의한 뒤, 등록과 사용하는 예시이다.

# UDF 등록 및 사용 예시
import pyspark.sql.functions as F
from pyspark.sql.types import *
 
upperUDF = F.udf(lambda z:z.upper())
# 사용 시 withColumn() 메서드 사용
df.withColumn("Curated Name", upperUDF("Name"))
# 파이썬 일반 함수 등록해 SQL로 사용하기
def upper(s):
   return s.upper()
   
# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()

 # DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
# Pandas UDF Scalar 함수 - 가장 추천되는 방식!
from pyspark.sql.functions import pandas_udf
import pandas as pd

# 데코레이터를 통해 udf를 annotation, Series 형태로 데이터를 처리(벌크 처리)
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
	return s.str.upper()
    
# upper_udf라는 이름으로 함수 등록
upperUDF = spark.udf.register("upper_udf", upper_udf2)
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()

 

위 3가지 방법 중 pandad udf가 좀 더 추천되는 이유는 데이터 처리를 Series 단위로 한꺼번에 처리할 수 있다는 장점과 함께, Apache Arrow를 통한 데이터 전송이 가능하기 때문이다. Apache Arrow를 통해 데이터 전송할 경우, python 객체를 java 객체로 바꿔 처리할 수 있어 좀 더 효율적이고 빠른 처리가 가능하다.

 

덤으로, 이 경우는 pyspark에서는 할 수 없었던 Aggregation 함수도 등록할 수 있다.

 # Pandas UDF로 Spark DataFrame/SQL에 Aggregation 함수 사용
 from pyspark.sql.functions import pandas_udf
 import pandas as pd
 
 @pandas_udf(FloatType())
 def average(v: pd.Series) -> float:
   return v.mean()
   
 averageUDF = spark.udf.register('average', average)
 spark.sql('SELECT average(b) FROM test').show()
 df.agg(averageUDF("b").alias("count")).show()

 

 

 

728x90