현대 오토에버 클라우드 스쿨

데이터 센터 이상치 알림 프로젝트

Gom3rye 2025. 9. 4. 10:32
728x90
반응형

데이터센터에서 발생하는 모든 로그들을 분석해서 이상치(ex, 잘못된 접근으로 계속 요청이 들어오는 경우, cpu, 메모리 사용량 피크 등등)를 알람으로 보여줘서 관리자가 보다 쉽게 서버들을 관리할 수 있도록 도와주는 서비스를 만들기로 했다.

 

현재 실제 서버 정보를 받아올 수 없으니 가상의 Input Generator를 생성해서 로그들을 출력하도록 만들었다.

(vm으로 쿠버네티스 클러스터는 구축해놓은 상태)

import time
import random
import json
import sys
import threading
from flask import Flask, Response
from datetime import datetime

# --- 프로메테우스를 위한 메트릭 값 저장 변수 ---
simulated_cpu_usage = 0.0
simulated_memory_usage = 0.0
failed_logins_total = 0
http_requests_total = 0

# --- Flask 웹 애플리케이션 생성 (Prometheus 메트릭 노출용) ---
app = Flask(__name__)

@app.route('/metrics')
def metrics():
    """프로메테우스가 수집할 메트릭을 형식에 맞게 노출하는 엔드포인트"""
    metric_data = []
    metric_data.append('# HELP simulated_cpu_usage_percent The simulated CPU usage of the server.')
    metric_data.append('# TYPE simulated_cpu_usage_percent gauge')
    metric_data.append(f'simulated_cpu_usage_percent {simulated_cpu_usage}')
    
    metric_data.append('\n# HELP simulated_memory_usage_percent The simulated memory usage of the server.')
    metric_data.append('# TYPE simulated_memory_usage_percent gauge')
    metric_data.append(f'simulated_memory_usage_percent {simulated_memory_usage}')

    metric_data.append('\n# HELP failed_logins_total Total number of failed login attempts.')
    metric_data.append('# TYPE failed_logins_total counter')
    metric_data.append(f'failed_logins_total {failed_logins_total}')

    metric_data.append('\n# HELP http_requests_total Total number of http requests.')
    metric_data.append('# TYPE http_requests_total counter')
    metric_data.append(f'http_requests_total {http_requests_total}')
    
    return Response("\n".join(metric_data), mimetype='text/plain')

def log_generator():
    """백그라운드에서 계속해서 JSON 형식의 로그를 생성하는 함수"""
    global simulated_cpu_usage, simulated_memory_usage, failed_logins_total, http_requests_total
    
    users = ["user-a", "user-b", "admin", "guest"]
    ips = ["192.168.1.10", "10.0.0.5", "172.16.0.8", "203.0.113.25"]
    
    while True:
    	# 0.1초에서 1.5초 사이의 랜덤한 간격으로 로그 생성
        time.sleep(random.uniform(0.1, 1.5))
        
        log_type = random.choices(
            population=["login_success", "login_fail", "request", "system_metric", "error"],
            weights=[0.3, 0.1, 0.4, 0.15, 0.05],
            k=1
        )[0]

        # 기본 로그 구조
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "log_type": log_type
        }

        # 시나리오에 따라 로그 필드 추가 및 메트릭 값 업데이트
        if log_type == "login_success":
            log_entry.update({"level": "info", "event": "success", "user": random.choice(users), "ip": random.choice(ips)})
            
        elif log_type == "login_fail":
            log_entry.update({"level": "warning", "event": "failed", "user": random.choice(users), "ip": random.choice(ips), "message": "Failed login attempt"})
            failed_logins_total += 1

        elif log_type == "request":
            status = random.choices(population=[200, 404, 500], weights=[0.9, 0.07, 0.03], k=1)[0]
            log_entry.update({"level": "info", "ip": random.choice(ips), "http_status": status, "path": "/api/data"})
            http_requests_total += 1
            
        elif log_type == "system_metric":
            simulated_cpu_usage = round(random.uniform(10.0, 99.9), 2)
            simulated_memory_usage = round(random.uniform(20.0, 95.0), 2)
            level = "info" if simulated_cpu_usage <= 90.0 else "warning"
            log_entry.update({"level": level, "metrics": {"cpu_usage": simulated_cpu_usage, "memory_usage": simulated_memory_usage}})

        elif log_type == "error":
            log_entry.update({"level": "error", "message": "Database connection timeout"})

        # 최종 JSON 로그를 한 줄로 출력
        # sys.stdout.flush()는 버퍼링 없이 즉시 출력되도록 보장
        print(json.dumps(log_entry))
        sys.stdout.flush()

if __name__ == '__main__':
	# 로그 생성 스레드를 데몬으로 설정하여 메인 스레드 종료 시 함께 종료되도록 함
    log_thread = threading.Thread(target=log_generator, daemon=True)
    log_thread.start()
    # Flask 웹 서버 실행 (메트릭 노출)
    # host='0.0.0.0' 은 컨테이너 외부에서 접근 가능하도록 설정
    app.run(host='0.0.0.0', port=8080)

 

 

로그 수집 및 Kafka 버퍼링

Phase 1: Fluent-bit 배포 (각 노드의 로그 수집기)

Fluent-bit는 DaemonSet으로 배포해 모든 노드에 하나씩 자동으로 실행되도록 한다.

Fluent-bit이 쿠버네티스 클러스터의 로그를 읽으려면 적절한 권한이 필요하기 때문에 fluentbit-rbac.yaml 파일을 생성한다.

# fluentbit-rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluent-bit
  namespace: efkstack
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluent-bit-read
rules:
- apiGroups: [""]
  resources: ["namespaces", "pods", "pods/logs"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: fluent-bit-read
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: fluent-bit-read
subjects:
- kind: ServiceAccount
  name: fluent-bit
  namespace: efkstack

 

Phase 2: Fluent-bit 설정 파일 (ConfigMap)

다음으로는 Fluent-bit이 어디서 로그를 수집해서 어디로 보낼지 정의하는 설정파일 fluentbit-config.yaml을 만들어야 한다.

# fluentbit-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: efkstack
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush        1
        Daemon       Off
        Log_Level    info

    [INPUT]
        Name         tail
        Path         /var/log/containers/*.log
        Parser       docker
        Tag          kube.*
        Mem_Buf_Limit 5MB

    [OUTPUT]
        Name         forward
        Match        *
        Host         fluentd-aggregator.efkstack.svc.cluster.local # fluentd 서비스의 전체 DNS 주소로 변경
        Port         24224
        Retry_Limit  False

 

이제 실제 Fluent-bit을 DaemonSet으로 배포하는 fluentbit-daemonset.yaml 파일을 만든다.

# fluentbit-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluent-bit
  namespace: efkstack
spec:
  selector:
    matchLabels:
      name: fluent-bit
  template:
    metadata:
      labels:
        name: fluent-bit
    spec:
      # RBAC에서 만든 efkstack 네임스페이스의 serviceAccountName을 사용
      serviceAccountName: fluent-bit
      containers:
      - name: fluent-bit
        image: fluent/fluent-bit:2.2.2
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: fluent-bit-config
          mountPath: /fluent-bit/etc/
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: fluent-bit-config
        configMap:
          name: fluent-bit-config

 

작성한 yaml 파일들을 쿠버네티스에 적용한다.

kubectl apply -f fluentbit-rbac.yaml
kubectl apply -f fluentbit-config.yaml
kubectl apply -f fluentbit-daemonset.yaml

Phase 3: Fluentd 배포 (중앙 집계기)

Fluent-bit로부터 로그를 받아 Kafka로 전송하는 중앙 집계기 Fluentd를 배포한다.

# fluentd-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: efkstack
data:
  fluent.conf: |
    # 1. Fluent-bit으로부터 로그를 수신 (Input)
    <source>
      @type forward
      port 24224
      bind 0.0.0.0
    </source>
    
    # 2. 수신한 모든 로그를 처리 (Match)
    <match **>
      @type copy
      # (Store 1) Kafka로 전송
      <store>
        @type kafka2
        # 다른 네임스페이스에 있는 kafka 서비스의 전체 DNS 주소
        # kafkapod 이름.kafkasvc 이름.kafkans 이름
        brokers kafka-0.kafka.kafka.svc.cluster.local:9092
        default_topic log-stream
        # Kafka로 보낼 데이터 형식을 JSON으로 지정
        <format>
          @type json
        </format>
      </store>
      # (Store 2) Fluentd로그를 직접 확인하며 디버깅하기 위해 표준출력으로 표시
      <store>
        @type stdout
      </store>
    </match>

 

fluentd 파드를 관리하는 Deployment와 Fluent-bit가 접속할 수 있도록 네트워크를 열어주는 Service를 생성한다.

fluentd-deployment.yaml 파일로 저장

# fluentd-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fluentd-aggregator
  namespace: efkstack # efkstack 네임스페이스에 배포
spec:
  replicas: 2 # 안정성을 위해 2개의 파드로 실행
  selector:
    matchLabels:
      app: fluentd-aggregator
  template:
    metadata:
      labels:
        app: fluentd-aggregator
    spec:
      containers:
      - name: fluentd
        # Kafka 플러그인이 포함된 공식 이미지 사용
        image: fluent/fluentd-kubernetes-daemonset:v1-debian-kafka
        ports:
        - containerPort: 24224
        volumeMounts:
        - name: config
          mountPath: /fluentd/etc
      volumes:
      - name: config
        configMap:
          name: fluentd-config
---
apiVersion: v1
kind: Service
metadata:
  name: fluentd-aggregator # Fluent-bit 설정의 Host와 이름이 같아야 함
  namespace: efkstack
spec:
  selector:
    app: fluentd-aggregator
  ports:
  - protocol: TCP
    port: 24224
    targetPort: 24224

Phase 4: 지금까지의 파이프라인 동작 확인

STEP 1: 소스(Source) 확인 - Input Generator는 로그를 잘 생성하고 있는가?

가장 먼저, 데이터의 시작점인 Input Generator가 정상적으로 로그를 뿜어내고 있는지 확인합니다.

  1. Input Generator 파드 상태 확인: service 네임스페이스의 파드들이 모두 Running 상태인지 확인합니다.
  2.  
    kubectl get pods -n service
    
  3. Bash
     
  4. 실제 로그 출력 확인: 파드 중 하나의 로그를 실시간으로 확인하여, 이전에 만들었던 JSON 형식의 로그가 터미널에 계속해서 출력되는지 봅니다.[성공 조건] 터미널에 JSON 로그가 계속 올라오면 성공입니다. Ctrl+C로 종료합니다. [문제 발생 시] 파드가 Error 또는 CrashLoopBackOff 상태라면 kubectl describe pod -n service <pod-name> 명령어로 원인을 확인해야 합니다. (예: 이미지 다운로드 실패, 코드 에러 등)
  5. Bash
     
    # 위에서 확인한 파드 이름 중 하나를 복사하여 실행
    kubectl logs -n service <input-generator-pod-name> -f
    

STEP 2: 1차 수집 확인 - Fluent-bit는 로그를 잘 읽고 있는가?

이제 각 노드의 Fluent-bit가 Input Generator의 로그를 잘 수집하고 있는지 확인합니다.

  1. Fluent-bit 파드 상태 확인: efkstack 네임스페이스의 fluent-bit 파드들이 모든 노드에서 Running 상태인지 확인합니다.
  2. Bash
     
    kubectl get pods -n efkstack
    
  3. Fluent-bit 로그에서 전송 기록 확인: Input Generator 파드가 실행되고 있는 노드와 동일한 노드에서 실행 중인 Fluent-bit 파드를 찾아 로그를 확인합니다. 로그에 fluentd-aggregator.efkstack.svc.cluster.local:24224로 데이터를 보냈다는 기록이 있는지 봅니다.[성공 조건] 에러 메시지 없이, 주기적으로 flush 또는 전송 관련 로그가 보이면 성공입니다. [문제 발생 시] Connection refused 와 같은 에러가 보인다면, Fluentd 서비스의 이름이나 네트워크 설정에 문제가 있을 수 있습니다.
  4. Bash
     
    # 먼저 Input Generator 파드가 어느 노드에서 실행 중인지 확인
    kubectl get pods -n service -o wide
    
    # 위에서 찾은 노드 이름으로 fluent-bit 파드를 찾음
    kubectl get pods -n efkstack -o wide | grep <node-name>
    
    # 해당 fluent-bit 파드의 로그를 확인
    kubectl logs -n efkstack <fluent-bit-pod-on-that-node> -f
    

STEP 3: 중앙 집계 확인 - Fluentd는 로그를 잘 수신하고 있는가?

모든 Fluent-bit로부터 로그가 중앙 Fluentd로 잘 모이는지 확인합니다. 이 단계는 디버깅에 가장 유용한 단계입니다.

  1. Fluentd 파드 상태 확인: efkstack 네임스페이스의 fluentd-aggregator 파드 2개가 Running 상태인지 확인합니다.
  2.  
    kubectl get pods -n efkstack
    
  3. Bash
     
  4. Fluentd 실시간 로그 확인 (가장 확실한 방법): 우리는 fluentd.conf 설정에서 <store> @type stdout</store> 옵션을 추가했기 때문에, Fluentd로 들어오는 모든 로그가 파드의 표준 출력으로도 표시됩니다.[성공 조건] 터미널에 Input Generator가 생성한 것과 동일한 JSON 로그들이 쏟아져 나오면 Fluent-bit -> Fluentd 구간이 완벽하게 작동하는 것입니다. [문제 발생 시] 아무 로그도 보이지 않는다면, Fluent-bit에서 Fluentd로 오는 네트워크 경로에 문제가 있는 것입니다. (Service 이름, 포트, NetworkPolicy 등 확인) Kafka connection error가 보인다면 다음 단계의 문제입니다.
  5. Bash
     
    # fluentd-aggregator 파드 중 하나의 로그를 실시간으로 확인
    kubectl logs -n efkstack <fluentd-aggregator-pod-name> -f
    

여기서

Input Generator -> Fluent-bit [ ? -> ] Fluentd -> Kafka

Fluent-bit와 Fluentd 사이의 연결 어딘가에서 문제가 발생하여 데이터가 전달되지 않고 있는 상황 발생

- 트러블 슈팅

fluentd.netpol.yaml

# fluentd-netpol.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-fluentbit-to-fluentd
  namespace: efkstack # Fluentd가 있는 네임스페이스
spec:
  # 이 정책은 fluentd-aggregator 레이블을 가진 파드에 적용됩니다.
  podSelector:
    matchLabels:
      app: fluentd-aggregator
  policyTypes:
  - Ingress
  # Ingress (들어오는 트래픽) 규칙
  ingress:
  # 아래 조건을 만족하는 트래픽을 허용합니다.
  - from:
    # 1. efkstack 네임스페이스 내부의 모든 파드
    - podSelector: {}
    # 2. (선택적) 다른 네임스페이스의 fluent-bit를 위해 추가
    - namespaceSelector:
        matchLabels:
          # 모든 네임스페이스에서 들어오는 것을 허용하려면, 
          # 각 네임스페이스에 특별한 레이블을 추가하고 여기에 명시하거나, 
          # 더 간단하게는 from 필드를 비워두어 모든 소스를 허용할 수 있습니다.
          # 여기서는 우선 같은 네임스페이스 내 통신을 보장합니다.
          kubernetes.io/metadata.name: efkstack
    ports:
    # fluentd의 24224 포트로 들어오는 TCP 트래픽을 허용합니다.
    - protocol: TCP
      port: 24224
kubectl apply -f fluentd-netpol.yaml

kubectl logs -n efkstack <fluentd-aggregator-pod-name> -f 명령어로 로그가 들어오는지 확인

 

STEP 4: 최종 목적지 확인 - Kafka는 로그를 잘 받고 있는가?

마지막으로, Fluentd가 보낸 로그가 Kafka 토픽에 잘 쌓이고 있는지 최종 확인합니다.

  1. Kafka 파드 상태 확인: kafka 네임스페이스의 kafka-0와 주키퍼 파드가 Running 상태인지 확인합니다.
  2.  
    kubectl get pods -n kafka
    
  3. Bash
     
  4. Kafka 토픽 직접 구독 (가장 완벽한 증명): 임시 Kafka 클라이언트 파드를 실행하여 log-stream 토픽의 내용을 실시간으로 확인합니다. 이전에 안내해 드린 명령어와 동일합니다.[성공 조건] 잠시 기다렸을 때, 터미널에 JSON 로그가 실시간으로 출력되기 시작하면, Input Generator부터 Kafka까지의 전체 파이프라인이 성공적으로 구축된 것입니다! [문제 발생 시] Fluentd 로그(STEP 3)에서는 보이는데 여기서 보이지 않는다면, Fluentd의 brokers 주소 설정이나 Kafka 자체의 문제일 가능성이 높습니다. kubectl logs -n efkstack <fluentd-pod>에서 Kafka 관련 에러를 확인해 보세요.
  5. Bash
     
    # kafka-0.kafka.kafka.svc.cluster.local 부분은 본인의 환경에 맞게 확인
    kubectl run kafka-consumer -n kafka -ti --image=bitnami/kafka:3.7.0 --rm --restart=Never -- /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-0.kafka.kafka.svc.cluster.local:9092 --topic log-stream
    

 

 

 

 

 

 

 

 

 

 

 

 

 

 

- Fluentd 파드 확인: kubectl get pods -n efkstack 명령어를 실행하여 fluentd-aggregator-xxxxx 형태의 파드 2개가 Running 상태인지 확인

- Fluentd 로그 확인 (디버깅) :

# 파드 이름 중 하나를 복사하여 실행
kubectl logs -n efkstack <fluentd-aggregator-pod-name> -f 

fluentd 파드의 로그를 직접 확인해 Input Generator가 생성한 JSON 로그들이 정상적으로 수신되고 있는지 볼 수 있다. (ConfigMap의 <store> @type stdout 설정 덕분)

 

최종 동작 확인

# --bootstrap-server 주소를 수정한 최종 확인 명령어
kubectl run kafka-consumer -n kafka -ti --image=bitnami/kafka:3.7.0 --rm --restart=Never -- /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-0.kafka.kafka.svc.cluster.local:9092 --topic log-stream --from-beginning

728x90
반응형