본문 바로가기

Minding's Programming/Knowledge

[Kafka/Python] Kafka 설치 방법(Conduktor)

728x90
반응형

실시간 데이터 처리 플랫폼의 대표라고 할 수 있는 Kafka에 대해 공부하기 위해서, Kafka를 어떻게 설치하는지 알아보려고 한다.

 

이 글에서는 Kafka를 Docker Container로 실행할 예정이기 때문에, 기본적으로 Docker가 설치되어 있어야 한다. Docker 설치는 아래 글을 참고하길 바란다. (Windows의 경우 WSL2 설치가 필요할 수 있다.)

 Docker의 개념 및 기본 실행 명령어

 

[Docker] Docker의 개념 및 기본 실행 명령어

Docker Docker는 애플리케이션을 컨테이너라는 독립된 환경에서 실행할 수 있게 해주는 Linux 컨테이너 기반 플랫폼이다. 애플리케이션과 관련된 라이브러리와 종속성을 하나의 패키지로 묶어 어디

minding-deep-learning.tistory.com

 

Kafka 설치하기

kafka는 Docker Compose를 통해 실행할 예정이며, 아래의 Github repo를 이용할 예정이다. (kafka는 기본적으로 오픈 소스로 공식 홈페이지가 따로 있지만, 아래 repo가 docker-compose 형태로 잘 구성해 놓았기 때문에 이를 이용한다.)

https://github.com/conduktor/kafka-stack-docker-compose

 

GitHub - conduktor/kafka-stack-docker-compose: docker compose files to create a fully working kafka stack

docker compose files to create a fully working kafka stack - conduktor/kafka-stack-docker-compose

github.com

 

git clone을 통해 위 repo의 파일들을 다운로드 받아준 뒤, 해당 폴더로 이동한다.

git clone https://github.com/conduktor/kafka-stack-docker-compose.git

cd kafka-stack-docker-compose

 

폴더를 살펴보면 여러가지 yml파일이 보이는데, 그 중 컴퓨터 사양에 따라 하나를 골라서 사용하면 된다.

❯ ls -tl
total 60
-rw-r--r-- 1 minding minding 11345 Dec  5 15:36 LICENSE
-rw-r--r-- 1 minding minding  8089 Dec  5 15:36 README.md
-rw-r--r-- 1 minding minding  1043 Dec  5 15:36 conduktor.yml
-rw-r--r-- 1 minding minding  4833 Dec  5 15:36 full-stack.yml
-rw-r--r-- 1 minding minding  2236 Dec  5 15:36 jmx-exporter.yml
-rwxr-xr-x 1 minding minding  2185 Dec  5 15:36 test.sh
-rw-r--r-- 1 minding minding  3934 Dec  5 15:36 zk-multiple-kafka-multiple-schema-registry.yml
-rw-r--r-- 1 minding minding  3396 Dec  5 15:36 zk-multiple-kafka-multiple.yml
-rw-r--r-- 1 minding minding  1954 Dec  5 15:36 zk-multiple-kafka-single.yml
-rw-r--r-- 1 minding minding  2673 Dec  5 15:36 zk-single-kafka-multiple.yml
-rw-r--r-- 1 minding minding  1326 Dec  5 15:36 zk-single-kafka-single.yml

사양에 따른 yml 파일 실행은 아래와 같다. (각 사양의 기준은 따로 없으니, 실행해보고 가장 잘 실행되는 것을 선택하자.)

  • full-stack.yml: 최고 사양
  • zk-single-kafka-single.yml: zookeeper와 kafka 모두 single로 실행 (저사양)
  • zk-single-kafka-multiple.yml: zookeeper는 single, kafka는 multiple로 실행 (중간 사양)

full-stack.yml 파일을 살펴보자. zookeeper와 kafka 이외에도 kafka connect, kafka schema registry, kafka rest proxy 등 여러 라이브러리들이 함께 있는 것을 알 수 있다. 이 repo를 만든 conduktor 회사의 console도 포함되어 있다.

# full stack

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9001
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka-schema-registry:
    image: confluentinc/cp-schema-registry:7.3.2
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    depends_on:
      - zoo1
      - kafka1

  
  kafka-rest-proxy:
    image: confluentinc/cp-kafka-rest:7.3.2
    hostname: kafka-rest-proxy
    container_name: kafka-rest-proxy
    ports:
      - "8082:8082"
    environment:
      # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
      KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
      KAFKA_REST_HOST_NAME: kafka-rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry

  
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.3.2
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry
      - kafka-rest-proxy
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
        confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
        /etc/confluent/docker/run

  
  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.3.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      KSQL_LISTENERS: http://0.0.0.0:8088/
      KSQL_KSQL_SERVICE_ID: ksqldb-server_
    depends_on:
      - zoo1
      - kafka1

  postgresql:
    hostname: postgresql
    container_name: postgresql
    extends:
      service: postgresql
      file: conduktor.yml 
            
  conduktor-console:
    hostname: conduktor-console
    container_name: conduktor-console
    extends:
      service: conduktor-console
      file: conduktor.yml

volumes:
  pg_data: {}
  conduktor_data: {}

 

이제 docker compose 명령어를 통해 해당 파일을 Docker를 통해 실행시켜준다. 실행에 시간이 다소 걸린다.

docker compose -f full-stack.yml up

 

Docker Desktop 프로그램에서 컨테이너가 실행된 것을 확인할 수 있다.

 

만약 full-stack.yml을 통해 컨테이너를 실행했다면, http://localhost:8080/를 통해 conduktor console 화면도 확인할 수 있다. 로그인 후 아래 화면이 노출되는 것을 확인할 수 있다. (기본 ID: admin@admin.io / PW: admin), 초기 가입 화면을 통해 자신의 마음대로 입력할 수 있다.

 

왼쪽 메뉴바를 통해 topic, schema registry, consumer group 등을 확인할 수 있다.

 

Kafka에서 Python을 사용하기 위한 라이브러리 설치

Kafka를 Python으로 프로그래밍하기 위해서는 별도 라이브러리를 설치해 주어야한다. Python 라이브러리는 크게 2가지가 있다.

  • Confluent Kafka Python: Confluent(Kafka 개발자들이 모여만든 회사)에서 개발한 공식 kafka python 라이브러리
  • Kafka-Python: 오픈소스 kafka python 라이브러리

이 글에서는 두 번째 Kafka-Python을 사용해 볼 예정이다. 아래 pip 명령어를 통해 라이브러리를 설치한다.

pip install kafka-python

 

이제 간단한 Producer를 만들어보자.

# producer.py

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],
   value_serializer=lambda x: dumps(x).encode('utf-8')
)

for j in range(999):
   print("Iteration", j)
   data = {'counter': j}
   producer.send('topic_test', value=data) # key와 headers는 지정되어 있지 않은 상태
   sleep(0.5)

먼저 로컬 kafka 인스턴스를 연결하는 kafka producer 객체를 생성한다. 전송하려는 데이터는 json문자열로 변환 뒤 utf-8로 인코딩해 직렬화한다. 여기서 bootstrap_servers 파라미터는 리스트 형태로 연결할 1개 이상의 broker들을 지정해주는 역할이다.

아래에 있는 반복문에는 0.5초마다 "topic_test"라는 토픽과 반복 횟수 카운터를 데이터로 포함하는 이벤트를 전송한다.

 

이제 위 코드의 파일을 한번 실행해보자.

❯ python producer.py
Iteration 0
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Iteration 6
Iteration 7
Iteration 8
Iteration 9
Iteration 10
Iteration 11
.
.
...

정상적으로 파일이 실행된 것을 확인했다면, 이제 conduktor console로 돌아가 'Topic' 탭을 살펴보자. 'topic_test'라는 항목을 발견할 수 있을 것이다.

 

이제 consumer도 생성해보자.

from kafka import KafkaConsumer
from json import loads
from time import sleep

consumer = KafkaConsumer(
   'topic_test',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   group_id='my-group-id',
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
   event_data = event.value
   # Do whatever you want
   print(event_data)
   sleep(2)

2초마다 'topic_test'라는 이름을 가진 topic에서 빠른 순서(earliest)대로 데이터를 가져오는 코드다. 실행하면, 위 producer에서 전송한대로 데이터를 받아오는 모습을 확인할 수 있다.

728x90

'Minding's Programming > Knowledge' 카테고리의 다른 글

[Python/Unittest] Unittest  (0) 2024.12.02
[Docker] Docker Volume  (0) 2024.11.15
[CI/CD] CI, CD 개념 및 Github Actions  (1) 2024.11.14
[BI/시각화] Superset  (1) 2024.10.31
[Snowflake] Snowflake 알아보기 (설치 방법)  (0) 2024.10.31