[Kafka/Python] Kafka 설치 방법(Conduktor)

2024. 12. 5. 16:15·Minding's Programming/Knowledge
목차
  1. Kafka 설치하기
  2. Kafka에서 Python을 사용하기 위한 라이브러리 설치
728x90
반응형

실시간 데이터 처리 플랫폼의 대표라고 할 수 있는 Kafka에 대해 공부하기 위해서, Kafka를 어떻게 설치하는지 알아보려고 한다.

 

이 글에서는 Kafka를 Docker Container로 실행할 예정이기 때문에, 기본적으로 Docker가 설치되어 있어야 한다. Docker 설치는 아래 글을 참고하길 바란다. (Windows의 경우 WSL2 설치가 필요할 수 있다.)

 Docker의 개념 및 기본 실행 명령어

 

[Docker] Docker의 개념 및 기본 실행 명령어

Docker Docker는 애플리케이션을 컨테이너라는 독립된 환경에서 실행할 수 있게 해주는 Linux 컨테이너 기반 플랫폼이다. 애플리케이션과 관련된 라이브러리와 종속성을 하나의 패키지로 묶어 어디

minding-deep-learning.tistory.com

 

Kafka 설치하기

kafka는 Docker Compose를 통해 실행할 예정이며, 아래의 Github repo를 이용할 예정이다. (kafka는 기본적으로 오픈 소스로 공식 홈페이지가 따로 있지만, 아래 repo가 docker-compose 형태로 잘 구성해 놓았기 때문에 이를 이용한다.)

https://github.com/conduktor/kafka-stack-docker-compose

 

GitHub - conduktor/kafka-stack-docker-compose: docker compose files to create a fully working kafka stack

docker compose files to create a fully working kafka stack - conduktor/kafka-stack-docker-compose

github.com

 

git clone을 통해 위 repo의 파일들을 다운로드 받아준 뒤, 해당 폴더로 이동한다.

git clone https://github.com/conduktor/kafka-stack-docker-compose.git

cd kafka-stack-docker-compose

 

폴더를 살펴보면 여러가지 yml파일이 보이는데, 그 중 컴퓨터 사양에 따라 하나를 골라서 사용하면 된다.

❯ ls -tl
total 60
-rw-r--r-- 1 minding minding 11345 Dec  5 15:36 LICENSE
-rw-r--r-- 1 minding minding  8089 Dec  5 15:36 README.md
-rw-r--r-- 1 minding minding  1043 Dec  5 15:36 conduktor.yml
-rw-r--r-- 1 minding minding  4833 Dec  5 15:36 full-stack.yml
-rw-r--r-- 1 minding minding  2236 Dec  5 15:36 jmx-exporter.yml
-rwxr-xr-x 1 minding minding  2185 Dec  5 15:36 test.sh
-rw-r--r-- 1 minding minding  3934 Dec  5 15:36 zk-multiple-kafka-multiple-schema-registry.yml
-rw-r--r-- 1 minding minding  3396 Dec  5 15:36 zk-multiple-kafka-multiple.yml
-rw-r--r-- 1 minding minding  1954 Dec  5 15:36 zk-multiple-kafka-single.yml
-rw-r--r-- 1 minding minding  2673 Dec  5 15:36 zk-single-kafka-multiple.yml
-rw-r--r-- 1 minding minding  1326 Dec  5 15:36 zk-single-kafka-single.yml

사양에 따른 yml 파일 실행은 아래와 같다. (각 사양의 기준은 따로 없으니, 실행해보고 가장 잘 실행되는 것을 선택하자.)

  • full-stack.yml: 최고 사양
  • zk-single-kafka-single.yml: zookeeper와 kafka 모두 single로 실행 (저사양)
  • zk-single-kafka-multiple.yml: zookeeper는 single, kafka는 multiple로 실행 (중간 사양)

full-stack.yml 파일을 살펴보자. zookeeper와 kafka 이외에도 kafka connect, kafka schema registry, kafka rest proxy 등 여러 라이브러리들이 함께 있는 것을 알 수 있다. 이 repo를 만든 conduktor 회사의 console도 포함되어 있다.

# full stack

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9001
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka-schema-registry:
    image: confluentinc/cp-schema-registry:7.3.2
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    depends_on:
      - zoo1
      - kafka1

  
  kafka-rest-proxy:
    image: confluentinc/cp-kafka-rest:7.3.2
    hostname: kafka-rest-proxy
    container_name: kafka-rest-proxy
    ports:
      - "8082:8082"
    environment:
      # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
      KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
      KAFKA_REST_HOST_NAME: kafka-rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry

  
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.3.2
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
    depends_on:
      - zoo1
      - kafka1
      - kafka-schema-registry
      - kafka-rest-proxy
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
        confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
        /etc/confluent/docker/run

  
  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.3.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
      KSQL_LISTENERS: http://0.0.0.0:8088/
      KSQL_KSQL_SERVICE_ID: ksqldb-server_
    depends_on:
      - zoo1
      - kafka1

  postgresql:
    hostname: postgresql
    container_name: postgresql
    extends:
      service: postgresql
      file: conduktor.yml 
            
  conduktor-console:
    hostname: conduktor-console
    container_name: conduktor-console
    extends:
      service: conduktor-console
      file: conduktor.yml

volumes:
  pg_data: {}
  conduktor_data: {}

 

이제 docker compose 명령어를 통해 해당 파일을 Docker를 통해 실행시켜준다. 실행에 시간이 다소 걸린다.

docker compose -f full-stack.yml up

 

Docker Desktop 프로그램에서 컨테이너가 실행된 것을 확인할 수 있다.

 

만약 full-stack.yml을 통해 컨테이너를 실행했다면, http://localhost:8080/를 통해 conduktor console 화면도 확인할 수 있다. 로그인 후 아래 화면이 노출되는 것을 확인할 수 있다. (기본 ID: admin@admin.io / PW: admin), 초기 가입 화면을 통해 자신의 마음대로 입력할 수 있다.

 

왼쪽 메뉴바를 통해 topic, schema registry, consumer group 등을 확인할 수 있다.

 

Kafka에서 Python을 사용하기 위한 라이브러리 설치

Kafka를 Python으로 프로그래밍하기 위해서는 별도 라이브러리를 설치해 주어야한다. Python 라이브러리는 크게 2가지가 있다.

  • Confluent Kafka Python: Confluent(Kafka 개발자들이 모여만든 회사)에서 개발한 공식 kafka python 라이브러리
  • Kafka-Python: 오픈소스 kafka python 라이브러리

이 글에서는 두 번째 Kafka-Python을 사용해 볼 예정이다. 아래 pip 명령어를 통해 라이브러리를 설치한다.

pip install kafka-python

 

이제 간단한 Producer를 만들어보자.

# producer.py

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],
   value_serializer=lambda x: dumps(x).encode('utf-8')
)

for j in range(999):
   print("Iteration", j)
   data = {'counter': j}
   producer.send('topic_test', value=data) # key와 headers는 지정되어 있지 않은 상태
   sleep(0.5)

먼저 로컬 kafka 인스턴스를 연결하는 kafka producer 객체를 생성한다. 전송하려는 데이터는 json문자열로 변환 뒤 utf-8로 인코딩해 직렬화한다. 여기서 bootstrap_servers 파라미터는 리스트 형태로 연결할 1개 이상의 broker들을 지정해주는 역할이다.

아래에 있는 반복문에는 0.5초마다 "topic_test"라는 토픽과 반복 횟수 카운터를 데이터로 포함하는 이벤트를 전송한다.

 

이제 위 코드의 파일을 한번 실행해보자.

❯ python producer.py
Iteration 0
Iteration 1
Iteration 2
Iteration 3
Iteration 4
Iteration 5
Iteration 6
Iteration 7
Iteration 8
Iteration 9
Iteration 10
Iteration 11
.
.
...

정상적으로 파일이 실행된 것을 확인했다면, 이제 conduktor console로 돌아가 'Topic' 탭을 살펴보자. 'topic_test'라는 항목을 발견할 수 있을 것이다.

 

이제 consumer도 생성해보자.

from kafka import KafkaConsumer
from json import loads
from time import sleep

consumer = KafkaConsumer(
   'topic_test',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   group_id='my-group-id',
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)
for event in consumer:
   event_data = event.value
   # Do whatever you want
   print(event_data)
   sleep(2)

2초마다 'topic_test'라는 이름을 가진 topic에서 빠른 순서(earliest)대로 데이터를 가져오는 코드다. 실행하면, 위 producer에서 전송한대로 데이터를 받아오는 모습을 확인할 수 있다.

728x90

'Minding's Programming > Knowledge' 카테고리의 다른 글

[QA/Testing] Charles Proxy Tool - 테스트 효율을 높일 수 있는 툴  (0) 2025.02.07
[프로젝트] Catch Me My Capital - 합리적인 투자 의사결정을 위한 금융 데이터 파이프라인 및 백테스팅 도구 (프로젝트 소개 편)  (0) 2025.02.06
[Python/Unittest] Unittest  (0) 2024.12.02
[Docker] Docker Volume  (0) 2024.11.15
[CI/CD] CI, CD 개념 및 Github Actions  (1) 2024.11.14
  1. Kafka 설치하기
  2. Kafka에서 Python을 사용하기 위한 라이브러리 설치
'Minding's Programming/Knowledge' 카테고리의 다른 글
  • [QA/Testing] Charles Proxy Tool - 테스트 효율을 높일 수 있는 툴
  • [프로젝트] Catch Me My Capital - 합리적인 투자 의사결정을 위한 금융 데이터 파이프라인 및 백테스팅 도구 (프로젝트 소개 편)
  • [Python/Unittest] Unittest
  • [Docker] Docker Volume
Minding
Minding
  • Minding
    Today's Minding
    Minding
  • 전체
    오늘
    어제
    • 울고넘는 딥러닝 (278)
      • Minding's Baseball (57)
        • MLB Statcast (29)
        • 머신러닝으로 홈런왕 예측하기 (3)
        • 야구칼럼 (12)
        • 야구 규칙, 용어 (1)
        • 2022-23 질롱 코리아 (8)
        • 류현진 등판경기 (4)
      • Minding's Programming (185)
        • 프로그래머스 코딩테스트 (21)
        • Knowledge (44)
        • Numpy & Pandas (6)
        • Excel (3)
        • Git (1)
        • Pygame (11)
        • CV (3)
        • Tensorflow tutorial (4)
        • Kaggle and Dacon (4)
        • 에러 코드 (8)
        • FastAPI (8)
        • Airflow (29)
        • Crawling (6)
        • Django (14)
        • AWS (18)
        • Spark (5)
      • Minding's Reading (30)
        • 머신러닝 딥러닝에 필요한 기초 수학 with 파이.. (2)
        • 칼만필터는 어렵지 않아 (11)
        • 밑바닥부터 시작하는 딥러닝 (6)
        • 메이저리그 야구 통계학 2e (8)
        • 논문읽기 (2)
        • 빅데이터를 지탱하는 기술 (1)
      • Minding's Life (5)
        • 주식 (4)
        • 각종 소식 (1)
  • 블로그 메뉴

    • 홈
    • Baseball
    • Programming
    • Reading
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    mlb stats api
    머신러닝
    Python
    KBO
    FastAPI
    넘파이
    칼만필터는어렵지않아파이썬
    pygame
    프로그래머스
    파이썬게임개발
    게임개발
    데이터 엔지니어
    django python
    django
    딥러닝
    AWS
    Airflow
    야구
    칼만필터는어렵지않아python
    칼만필터
    파이썬
    데이터분석
    메이저리그
    에어플로우
    질롱코리아
    칼만필터는어렵지않아
    코딩테스트
    KalmanFilter
    MLB
    파이게임
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
Minding
[Kafka/Python] Kafka 설치 방법(Conduktor)

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.