실시간 데이터 처리 플랫폼의 대표라고 할 수 있는 Kafka에 대해 공부하기 위해서, Kafka를 어떻게 설치하는지 알아보려고 한다.
이 글에서는 Kafka를 Docker Container로 실행할 예정이기 때문에, 기본적으로 Docker가 설치되어 있어야 한다. Docker 설치는 아래 글을 참고하길 바란다. (Windows의 경우 WSL2 설치가 필요할 수 있다.)
Kafka 설치하기
kafka는 Docker Compose를 통해 실행할 예정이며, 아래의 Github repo를 이용할 예정이다. (kafka는 기본적으로 오픈 소스로 공식 홈페이지가 따로 있지만, 아래 repo가 docker-compose 형태로 잘 구성해 놓았기 때문에 이를 이용한다.)
https://github.com/conduktor/kafka-stack-docker-compose
git clone을 통해 위 repo의 파일들을 다운로드 받아준 뒤, 해당 폴더로 이동한다.
git clone https://github.com/conduktor/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
폴더를 살펴보면 여러가지 yml파일이 보이는데, 그 중 컴퓨터 사양에 따라 하나를 골라서 사용하면 된다.
❯ ls -tl
total 60
-rw-r--r-- 1 minding minding 11345 Dec 5 15:36 LICENSE
-rw-r--r-- 1 minding minding 8089 Dec 5 15:36 README.md
-rw-r--r-- 1 minding minding 1043 Dec 5 15:36 conduktor.yml
-rw-r--r-- 1 minding minding 4833 Dec 5 15:36 full-stack.yml
-rw-r--r-- 1 minding minding 2236 Dec 5 15:36 jmx-exporter.yml
-rwxr-xr-x 1 minding minding 2185 Dec 5 15:36 test.sh
-rw-r--r-- 1 minding minding 3934 Dec 5 15:36 zk-multiple-kafka-multiple-schema-registry.yml
-rw-r--r-- 1 minding minding 3396 Dec 5 15:36 zk-multiple-kafka-multiple.yml
-rw-r--r-- 1 minding minding 1954 Dec 5 15:36 zk-multiple-kafka-single.yml
-rw-r--r-- 1 minding minding 2673 Dec 5 15:36 zk-single-kafka-multiple.yml
-rw-r--r-- 1 minding minding 1326 Dec 5 15:36 zk-single-kafka-single.yml
사양에 따른 yml 파일 실행은 아래와 같다. (각 사양의 기준은 따로 없으니, 실행해보고 가장 잘 실행되는 것을 선택하자.)
- full-stack.yml: 최고 사양
- zk-single-kafka-single.yml: zookeeper와 kafka 모두 single로 실행 (저사양)
- zk-single-kafka-multiple.yml: zookeeper는 single, kafka는 multiple로 실행 (중간 사양)
full-stack.yml 파일을 살펴보자. zookeeper와 kafka 이외에도 kafka connect, kafka schema registry, kafka rest proxy 등 여러 라이브러리들이 함께 있는 것을 알 수 있다. 이 repo를 만든 conduktor 회사의 console도 포함되어 있다.
# full stack
version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9001
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
hostname: kafka-schema-registry
container_name: kafka-schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- zoo1
- kafka1
kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.2
hostname: kafka-rest-proxy
container_name: kafka-rest-proxy
ports:
- "8082:8082"
environment:
# KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
KAFKA_REST_HOST_NAME: kafka-rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
volumes:
- ./connectors:/etc/kafka-connect/jars/
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
- kafka-rest-proxy
command:
- bash
- -c
- |
confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
/etc/confluent/docker/run
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.3.2
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
KSQL_LISTENERS: http://0.0.0.0:8088/
KSQL_KSQL_SERVICE_ID: ksqldb-server_
depends_on:
- zoo1
- kafka1
postgresql:
hostname: postgresql
container_name: postgresql
extends:
service: postgresql
file: conduktor.yml
conduktor-console:
hostname: conduktor-console
container_name: conduktor-console
extends:
service: conduktor-console
file: conduktor.yml
volumes:
pg_data: {}
conduktor_data: {}
이제 docker compose 명령어를 통해 해당 파일을 Docker를 통해 실행시켜준다. 실행에 시간이 다소 걸린다.
docker compose -f full-stack.yml up
Docker Desktop 프로그램에서 컨테이너가 실행된 것을 확인할 수 있다.
만약 full-stack.yml을 통해 컨테이너를 실행했다면, http://localhost:8080/를 통해 conduktor console 화면도 확인할 수 있다. 로그인 후 아래 화면이 노출되는 것을 확인할 수 있다. (기본 ID: admin@admin.io / PW: admin), 초기 가입 화면을 통해 자신의 마음대로 입력할 수 있다.
왼쪽 메뉴바를 통해 topic, schema registry, consumer group 등을 확인할 수 있다.
Kafka에서 Python을 사용하기 위한 라이브러리 설치
Kafka를 Python으로 프로그래밍하기 위해서는 별도 라이브러리를 설치해 주어야한다. Python 라이브러리는 크게 2가지가 있다.
- Confluent Kafka Python: Confluent(Kafka 개발자들이 모여만든 회사)에서 개발한 공식 kafka python 라이브러리
- Kafka-Python: 오픈소스 kafka python 라이브러리
이 글에서는 두 번째 Kafka-Python을 사용해 볼 예정이다. 아래 pip 명령어를 통해 라이브러리를 설치한다.
pip install kafka-python
이제 간단한 Producer를 만들어보자.
# producer.py
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
for j in range(999):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data) # key와 headers는 지정되어 있지 않은 상태
sleep(0.5)
먼저 로컬 kafka 인스턴스를 연결하는 kafka producer 객체를 생성한다. 전송하려는 데이터는 json문자열로 변환 뒤 utf-8로 인코딩해 직렬화한다. 여기서 bootstrap_servers 파라미터는 리스트 형태로 연결할 1개 이상의 broker들을 지정해주는 역할이다.
아래에 있는 반복문에는 0.5초마다 "topic_test"라는 토픽과 반복 횟수 카운터를 데이터로 포함하는 이벤트를 전송한다.
이제 위 코드의 파일을 한번 실행해보자.
❯ python producer.py
Iteration 0
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Iteration 6
Iteration 7
Iteration 8
Iteration 9
Iteration 10
Iteration 11
.
.
...
정상적으로 파일이 실행된 것을 확인했다면, 이제 conduktor console로 돌아가 'Topic' 탭을 살펴보자. 'topic_test'라는 항목을 발견할 수 있을 것이다.
이제 consumer도 생성해보자.
from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
'topic_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-id',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
event_data = event.value
# Do whatever you want
print(event_data)
sleep(2)
2초마다 'topic_test'라는 이름을 가진 topic에서 빠른 순서(earliest)대로 데이터를 가져오는 코드다. 실행하면, 위 producer에서 전송한대로 데이터를 받아오는 모습을 확인할 수 있다.
'Minding's Programming > Knowledge' 카테고리의 다른 글
[Python/Unittest] Unittest (0) | 2024.12.02 |
---|---|
[Docker] Docker Volume (0) | 2024.11.15 |
[CI/CD] CI, CD 개념 및 Github Actions (1) | 2024.11.14 |
[BI/시각화] Superset (1) | 2024.10.31 |
[Snowflake] Snowflake 알아보기 (설치 방법) (0) | 2024.10.31 |