[Spark/pySpark] SparkSQL UDF(User Define Function)
·
Minding's Programming/Spark
UDF?UDF(User Define Function)는 SQL(Spark에서는 DataFrame까지)에서 적용할 수 있는 사용자 정의 함수다. 일반적으로 SQL에서 Scalar함수(UPPER, LOWER 등), Aggregation함수(SUM, MIN, AVG 등)를 제공하고 있지만, 상황에 따라서 특정 계산식이 반복해서 필요할 때가 있다. 그럴 때 유용하게 사용할 수 있는 것이 UDF이다. pySpark에서 UDF를 사용해보기UDF는 크게 두 가지의 종류가 있다.Transformation 함수UDAF(User Define Aggregation Function): Aggregation 환경에서 사용하는 함수(Pyspark에서는 미지원)UDAF의 경우는 스칼라 또는 자바로 구현해야한다. 그렇다면, 다른 함..
[Spark] Spark의 개념, 구조, 프로그램 실행 옵션
·
Minding's Programming/Spark
Spark?Spark는 2013년에 출시된 Scala 기반의 빅데이터 처리 기술로, YARN 등을 분산환경으로 사용한다. 최근엔 MapReduce와 Hive 대신 Spark가 많이 선택받고 있다. Spark의 구성 (3.0 기준)Spark 3.0 기준 아래와 같은 라이브러리로 구성되어 있다.Spark Core: 각 App을 실행시키는 엔진 역할Spark SQL: SQL 등으로 DB, Dataframe 등을 조작할 수 있는 기능Spark ML: 머신러닝 기능을 사용할 수 있음Spark StreamingSpark GraphXSpark의 특징 (vs MapReduce)Spark는 메모리를 우선 사용하며, 메모리가 부족해지면 디스크를 사용한다.MapReduce의 경우 디스크를 사용해 Spark에 비해 속도가 느..
[Hadoop] MapReduce 프로그래밍이란?
·
Minding's Programming/Spark
MapReduce 프로그래밍의 특징MapReduce 프로그래밍은 기본적으로 빅 데이터 처리를 위해 만들어졌기 때문에, 일반 데이터 처리와는 다른 특징이 있다. 큰 특징은 아래와 같다.데이터 셋은 Key, Value의 집합이며 변경 불가(immutable) - 포맷은 하나로 고정데이터 조작은 map과 reduce 2개의 오퍼레이션으로만 가능이 2개의 오퍼레이션은 항상 하나의 쌍으로 연속 실행이 두 오퍼레이션 코드를 개발자가 채워야 함MapReduce 시스템이 Map의 결과를 Reduce단으로 모아줌위 단계를 셔플링이라고 부르며, Network단을 통한 데이터 이동이 발생Map의 결과 중 key가 같은 것을 모아주고 Reduce로 보냄 Map과 ReduceMap: (k, v) --> [(k', v')*]입력..
[Hadoop] 하둡의 분산처리 시스템, YARN 개념 정리
·
Minding's Programming/Spark
YARN은 Hadoop 2.0에서부터 지원되는 하둡의 Resourc Management Layer로, 세부 리소스 관리가 가능한 범용 컴퓨팅 프레임워크이다. HDFS 위에서 실행되며, YARN을 통해 Spark, MapReduce, Tez 등의 다양한 애플리케이션이 실행된다. YARN의 구조 YARN의 구조는 위 그림과 같다. 마스터 노드라고 할 수 있는 'Resource Manager'와 그 아래 슬레이브 노드인 'Node Manager'가 있다. 노드 매니저들은 리소스 매니저의 요구에 따라 자신이 가지고 있는 자원을 일부 넘겨주는 역할을 한다. 노드 매니저가 넘겨준다는 자원은 '컨테이너'라고 불린다. 컨테이너는 어떤 모듈을 실행시키는 구성 요소로, Java의 JVM과 비슷한 존재라고 생각할 수 있다...
[Airflow] Airflow API를 통해 모니터링하기
·
Minding's Programming/Airflow
Airflow APIAirflow는 현재 실행되고 있는 airflow의 상태 등을 알 수 있고, 외부에서 조작이 가능하도록 하는 API를 제공하고 있다. 이 API를 사용하기 위해서는 몇 가지 설정이 필요하다. Airflow API 활성화airflow.cfg의 api 섹션에서 auth_backend의 값을 변경해야 한다. 일반적으로 docker-compose.yaml파일을 사용할 경우 이미 설정이 되어있는 것을 확인할 수 있다.# docker-compose.yaml 파일 &airflow-common-env ... AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend...
[Airflow/Slack] Airflow DAG 실패 시 Slack으로 알림 보내기
·
Minding's Programming/Airflow
Slack은 Webhook이라는 기능을 통해 Post 명령으로 특정 채널에 메시지를 보낼 수 있는 기능을 지원한다. 이를 이용해 Airflow DAG에서 에러가 발생할 때, Slack에 메시지를 보내는 기능을 구현해보자. 1. 알림을 전달할 채널 생성(또는 선택)나는 test할 워크스페이스와 채널을 미리 만들어두었다. 2. Slack app 생성https://api.slack.com/messaging/webhooks Sending messages using incoming webhooksCreate an incoming webhook with a unique URL to which you send a JSON payload with message text and options.api.slack.com위 ..
[Docker] Docker Volume
·
Minding's Programming/Knowledge
Docker Volume?Docker Container가 실행되었다가 중단된다면, 그 안에 있는 데이터들은 일반적으로 유실된다. 하지만 Container에서 DB같은 프로그램이 동작하는거라면, 그 데이터가 유실되면 안될 것이다. 그 데이터를 보장하는 기능이 Docker Volume이라고 할 수 있다. Docker Volume은 Docker Container의 가상 파일 시스템과 호스트 시스템(OS)의 파일 시스템을 맵핑해 기록을 남기는 방식으로 데이터를 저장한다. 위와 같은 방식으로 Container의 특정 폴더 경로를 OS 시스템의 포더 경로와 마운트해 해당 폴더를 공유하는 것이다. Docker Volume 타입Host Volumes: docker run -v를 실행할 때 페어로 지정docker run..
[CI/CD] CI, CD 개념 및 Github Actions
·
Minding's Programming/Knowledge
SW 빌드란? 개발한 소프트웨어를 최종적으로 출시하기 위한 형태로 만드는 것이다. 참여 개발자들이 많을수록 이 과정은 더더욱 중요해지며, 개발이 끝나기 전부터 빌드를 해서 테스트를 진행하면 SW의 안정성이 증대된다. CI (Continuous Integration)CI는 Software Engineering Practice의 하나로, 아래와 같은 기본 원칙을 가지고 있다.코드 Repo는 하나만 유지한다. (Master or Main)코드변경을 최대한 자주 반영테스트를 최대한 추가빌드를 계속적으로 수행 (자동화)성공한 빌드의 프로덕션을 릴리스 (자동화)CD (Continuous Delivery): 배포만약 빌드가 실패할 경우새 코드의 commit으로 테스트가 실패하는 경우, 많은 회사들이 다시 빌드가 성공..