Gom3rye

팀 프로젝트) AWS Lambda 함수 본문

현대 오토에버 클라우드 스쿨

팀 프로젝트) AWS Lambda 함수

Gom3rye 2025. 11. 11. 09:39
728x90
반응형

 

ElasticAlert가 API gateway로 요청을 보내면 그걸 받아서 SNS를 콜해주는 친구이다.

 

  • serviceAlertHandler
import json
import boto3
import os

# boto3 SNS 클라이언트 초기화
sns_client = boto3.client('sns')
# 환경 변수에서 SNS Topic ARN 가져오기
# 하드코딩보다 환경 변수 사용이 훨씬 안전하고 유연합니다.
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')

def lambda_handler(event, context):
    # API Gateway를 통해 전달된 ElastAlert2의 데이터는 'body'에 담겨 있습니다.
    try:
        # event['body']는 문자열이므로 json 객체로 파싱합니다.
        alert_data = json.loads(event.get('body', '{}'))

        # 이메일 제목과 내용 구성
        rule_name = alert_data.get('rule_name', 'N/A')
        subject = f"🚨  [ElastAlert] 서비스 에러가 발생했습니다 !!!"

        # pre-formatted text로 이쁘게 만듭니다.
        message = f"""
        - 감지된 API : {alert_data.get('API', 'N/A')}
        - 발생 시각: {alert_data.get('@timestamp', 'N/A')}
        - SourceIP : {alert_data.get('sourceIP', 'N/A')}
        - {alert_data.get('result', 'N/A')} 에러
        빠른 조치 바랍니다 꾸벅 -.-
        """

        # SNS Topic으로 메시지 발행
        response = sns_client.publish(
            TopicArn=SNS_TOPIC_ARN,
            Subject=subject,
            Message=message
        )

        print("Message sent to SNS. Message ID: ", response['MessageId'])

        # API Gateway에 성공 응답 반환
        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Alert successfully sent to SNS!'})
        }

    except Exception as e:
        print(f"Error processing alert: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }
 
  • systemKmsgAlertHandler
 
import json
import boto3
import os

sns_client = boto3.client('sns')
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')

PRIORITY_MAP = {
 0: "emergency",
 1: "alert",
 2: "critical",
 3: "error", 
 4: "warning",
 5: "notice",
 6: "informational",
 7: "debug"
}

def lambda_handler(event, context):
 try:
     # event['body']는 문자열 → JSON 변환
     alert_data = json.loads(event.get('body', '{}'))
     subject = "🚨  [ElastAlert] 커널 수준의 이벤트가 발생했습니다 !!!"

     # priority 숫자를 의미로 변환
     priority_num = int(alert_data.get("priority", -1))
     severity = PRIORITY_MAP.get(priority_num, str(priority_num))

     # 이메일 메시지 구성
     message = f"""
    {alert_data.get('timestamp', 'N/A')}에 서버 {alert_data.get('host', 'N/A')}에서 
    심각도 [{severity}] 이벤트가 발생했습니다.
    log: {alert_data.get('message', 'N/A')}
    빠른 조치 바랍니다 꾸벅 -.- 
"""

     # SNS 발행
     response = sns_client.publish(
         TopicArn=SNS_TOPIC_ARN,
         Subject=subject,
         Message=message
     )

     print("Message sent to SNS. Message ID:", response['MessageId'])

     return {
         'statusCode': 200,
         'body': json.dumps({'message': 'Alert successfully sent to SNS!'})
     }

 except Exception as e:
     print(f"Error processing alert: {e}")
     return {
         'statusCode': 500,
         'body': json.dumps({'error': str(e)})
     }​
  • systemAuthAlertHandler
import json
import boto3
import os

sns_client = boto3.client('sns')
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')

def lambda_handler(event, context):
    try:
        alert_list = json.loads(event.get('body', '[]'))
        if not isinstance(alert_list, list):
            alert_list = [alert_list]

        subject = "🚨 [ElastAlert] 비정상적인 로그인 시도가 감지되었습니다 !!!"

        # 중복 제거한 사용자 아이디 수집
        users_set = set()
        ip_set = set()
        timestamps = []

        for alert in alert_list:
            users_set.add(alert.get('user', 'N/A'))
            ip_set.add(alert.get('ip', 'N/A'))
            timestamps.append(alert.get('timestamp', 'N/A'))

        # 첫 timestamp ~ 마지막 timestamp 범위 표시
        start_time = timestamps[0] if timestamps else 'N/A'
        end_time = timestamps[-1] if timestamps else 'N/A'

        # IP, user 배열 출력
        message = f"""
        {start_time} ~ {end_time} 동안 
        IP - {', '.join(ip_set)} 에서 10회 이상의 로그인 실패가 발생했습니다. 
        대상 ID: [{', '.join(users_set)}] 
        빠른 조치 바랍니다 꾸벅 -.-
        """

        response = sns_client.publish(
            TopicArn=SNS_TOPIC_ARN,
            Subject=subject,
            Message=message
        )

        print("Message sent to SNS. Message ID:", response['MessageId'])

        return {
            'statusCode': 200,
            'body': json.dumps({'message': f'{len(users_set)} unique users successfully sent to SNS!'})
        }

    except Exception as e:
        print(f"Error processing alert: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }
 

 

728x90
반응형