현대 오토에버 클라우드 스쿨

팀 프로젝트) Kafka

Gom3rye 2025. 11. 11. 09:32
728x90
반응형

참조: https://nangman14.tistory.com/80

strimzi 공식문서: Deploying and Managing (0.48.0)

awsome strimzi: GitHub - strimzi/awesome-strimzi: A curated list of resources about the Strimzi project

 

kafka namespace 만들기

kubectl create ns kafka
 

helm으로 strimzi 설치

helm install strimzi-cluster-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator -n kafka
 

localpath 설정하고 PV 만들기

localpath-storageclass.yaml

  • 왜 localpath냐??
    • kafka는 disk io 기반이라 읽고 쓰는 작업이 빈번하기 때문에, NFS같은걸 쓰면 네트워크 지연 등의 외부 오버헤드로 성능 저하가 일어날 수 있기 때문.
    • kafka는 기본적을 복제 기능(replication)이 있어 데이터가 다른 노드에도 남아있기 때문에 어느 노드가 죽어도 복사본이 있어 안전하다.
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
 

pv를 위치시킬 실제 노드에서 경로 만들고 권한 주기 (전부 해줘야 함!!)

sudo mkdir -p /var/lib/kafka/data
sudo chmod 777 /var/lib/kafka/data

 

pv.yaml

  • 일단 노드별로 하나씩 pv를 만들어줌 → 3개
  • 기본적으로 localpath는 동적 프로비저닝(pvc가 요청할때 pv만들어주는거)이 불가능하기 때문에 미리 만들어줘야 함.
  • localpath provisioner를 설치하면 pv에 대한 매니페스트를 따로 만들지 않아도 됨
apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-pv-worker1
spec:
  capacity:
    storage: 50Gi
  accessModes: [ "ReadWriteOnce" ]
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /var/lib/kafka/data
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
            - kafka-worker1
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-pv-worker2
spec:
  capacity:
    storage: 50Gi
  accessModes: [ "ReadWriteOnce" ]
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /var/lib/kafka/data
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
            - kafka-worker2
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-pv-worker3
spec:
  capacity:
    storage: 50Gi
  accessModes: [ "ReadWriteOnce" ]
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /var/lib/kafka/data
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
            - kafka-worker3
 

kafka+kafkaNodePool 만들기

kafka.yaml

  • kafka 버전을 4.0.0으로 해줘야 함!!! 정확히는 3.9.x 이상. 왜냐면 주키퍼 안쓰고 KRaft를 쓸건데 이 버전부터 지원되기 때문임. (3.7.x는 안됨)
  • 왜 nodepool같은거 달고다님? → 미래의 유연성과 확장성을 위하야.
  • spec.resource.reuqest.cpu 등의 설정은 브로커 파드 한놈이 점유할 자원인데, 이게 노드 전체 자원보다 작아야 함!! 노드의 다른 기능들도 먹고 살아야 하기 때문에… 이걸 딱맞춰 해버리거나 초과하면 노드 자원이 부족해서 영원한 pending형에 처해지게 됨.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
  namespace: kafka
  annotations:
    strimzi.io/kraft: "enabled"
    strimzi.io/node-pools: "enabled"
spec:
  kafka:
    version: 4.0.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: external
        port: 9094
        type: loadbalancer
        tls: false
    config:
      auto.create.topics.enable: "false"
      default.replication.factor: 3              # ✅ 브로커 수에 맞춰 조정
      min.insync.replicas: 2                     # ✅ 최소 ISR 2 (실무 환경에 맞춤)
      num.partitions: 6
      log.retention.hours: 72
      offsets.topic.replication.factor: 3        # ✅ 내부 토픽도 2
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 1
  entityOperator:
    topicOperator: {}
    userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: knp
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  replicas: 3
  roles:
    - broker
    - controller
  resources:
    requests:
      cpu: "1" # 실제 점유하는 리소스이므로 무조건 노드에 할당한 리소스보다 적어야 한다. 안그럼 무한 pending
      memory: "4Gi"
    limits:
      cpu: "1.5"
      memory: "4Gi"
  storage:
    type: persistent-claim
    class: local-storage
 

외부 접속을 위한 LoadBalancer 설정하기

MetalLB 설치

  • 클라우드 환경 등에서는 제공해주는 LB 쓰면 되지만 온프레미스에는 그런거 없으므로 외부 솔루션을 갖다 쓰도록 하자.
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.14.8/config/manifests/metallb-native.yaml

 

kafka/ipaddresspool.yaml

  • load balancer가 갖다 쓸 IP 범위를 지정해주는 친구. 브로커별로 하나씩, 그리고 대표IP 하나 필요하므로 최소 해당 갯수들 +@의 범위를 잡아줘야함.
  • 해당 IP 범위는 사용중인 네트워크 대역에서 아무도 쓰고있지 않고, 앞으로도 아무도 쓰지 않을 것이 확실해야 함!!!
apiVersion: metallb.io/v1beta1
kind: IPAddressPool
metadata:
  namespace: metallb-system
  name: kafka-pool
spec:
  addresses:
    - 192.168.1.210-192.168.1.220
---
apiVersion: metallb.io/v1beta1
kind: L2Advertisement
metadata:
  namespace: metallb-system
  name: kafka-advertisement
spec:
  ipAddressPools:
    - kafka-pool
 

이제 apply해주고 get services를 확인하면 예쁜 LoadBalancer 친구들을 확인할 수 있을 것.

 

Topic 생성하기

strimzi가 제공하는 강려크한 기능을 활용하면 topic을 직접 브로커에 CLI로 접속해서 만들고 확인하는 대신 무려무려 쿠버네티스 리소스의 형태로, yaml을 이용해 만들고 확인할 수 있음!!!

topic/topics.yaml

  • 로그 종류별로 하나씩 토픽 만들어주도록 yaml 작성
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: service-topic          # 토픽 이름
  namespace: kafka          # Kafka가 설치된 네임스페이스
  labels:
    strimzi.io/cluster: kafka-cluster   # 클러스터 이름 (Kafka 리소스 metadata.name)
spec:
  partitions: 3             # 파티션 수
  replicas: 3               # 복제본 수 (브로커 수와 맞추는 게 안전)
  config:
    retention.ms: 60000     # 메시지 보관 시간 (1분, 테스트용)
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: system-oom-topic          # 토픽 이름
  namespace: kafka          # Kafka가 설치된 네임스페이스
  labels:
    strimzi.io/cluster: kafka-cluster   # 클러스터 이름 (Kafka 리소스 metadata.name)
spec:
  partitions: 3             # 파티션 수
  replicas: 3               # 복제본 수 (브로커 수와 맞추는 게 안전)
  config:
    retention.ms: 60000     # 메시지 보관 시간 (1분, 테스트용)

---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: system-mce-topic          # 토픽 이름
  namespace: kafka          # Kafka가 설치된 네임스페이스
  labels:
    strimzi.io/cluster: kafka-cluster   # 클러스터 이름 (Kafka 리소스 metadata.name)
spec:
  partitions: 3             # 파티션 수
  replicas: 3               # 복제본 수 (브로커 수와 맞추는 게 안전)
  config:
    retention.ms: 60000     # 메시지 보관 시간 (1분, 테스트용)

---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: system-disk-topic          # 토픽 이름
  namespace: kafka          # Kafka가 설치된 네임스페이스
  labels:
    strimzi.io/cluster: kafka-cluster   # 클러스터 이름 (Kafka 리소스 metadata.name)
spec:
  partitions: 3             # 파티션 수
  replicas: 3               # 복제본 수 (브로커 수와 맞추는 게 안전)
  config:
    retention.ms: 60000     # 메시지 보관 시간 (1분, 테스트용)

---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: system-auth-topic          # 토픽 이름
  namespace: kafka          # Kafka가 설치된 네임스페이스
  labels:
    strimzi.io/cluster: kafka-cluster   # 클러스터 이름 (Kafka 리소스 metadata.name)
spec:
  partitions: 3             # 파티션 수
  replicas: 3               # 복제본 수 (브로커 수와 맞추는 게 안전)
  config:
    retention.ms: 60000     # 메시지 보관 시간 (1분, 테스트용)


---


apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: system-kmsg-topic          # 토픽 이름
  namespace: kafka          # Kafka가 설치된 네임스페이스
  labels:
    strimzi.io/cluster: kafka-cluster   # 클러스터 이름 (Kafka 리소스 metadata.name)
spec:
  partitions: 3             # 파티션 수
  replicas: 3               # 복제본 수 (브로커 수와 맞추는 게 안전)
  config:
    retention.ms: 60000     # 메시지 보관 시간 (1분, 테스트용)
 

apply한 후 kubectl get kafkatopics -n kafka 해서 토픽들을 확인해보자.

 

 

준비 완뇨~

이제 kubectl get services -n kafka 에서 나온 LoadBalancer 대표 IP와 Port를 이용해 같은 NAT Network상의 다른 클러스터에서 자유롭게 메시지를 produce하고 consume할 수 있다!!

 

모니터링하기

kubectl get pods -n kafka

-> 파드 이름 킵하기

kubectl exec -it [파드이름] -n kafka -- /bin/bash

-> 브로커 한놈 골라서 속에 들어가기 (kafka-cluster-knp-0 이런 이름일거임)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic [원하는 로그 토픽] --from-beginning

-> 파드 속에서 위 명령어로 구독해두고 기다리면 해당 토픽에 produce된 메시지들이 실시간으로 올라올 것!

728x90
반응형