Theory
고급 2-3시간 이론 + 실습

Spring 15: 메시징/Kafka

이벤트 기반 아키텍처

KafkaMessage QueueEvent DrivenProducerConsumer

1. 메시징 개념

1.1 메시지 큐 (Message Queue)

메시지 큐란?

메시지 큐는 애플리케이션 간 비동기 통신을 가능하게 하는 미들웨어입니다. Producer가 메시지를 큐에 전송하면, Consumer가 나중에 해당 메시지를 처리할 수 있습니다.

메시지 큐의 핵심 특징
  • 비동기 처리: 송신자와 수신자가 동시에 활성화될 필요 없음
  • 느슨한 결합: 시스템 간 의존성 최소화
  • 확장성: 메시지 처리량에 따른 유연한 확장
  • 내결함성: 메시지 손실 방지 및 재처리 지원
  • 순서 보장: 메시지 처리 순서 제어

전통적인 메시지 큐 vs 현대적 스트리밍

구분전통적 큐 (RabbitMQ)스트리밍 (Kafka)
메시지 저장소비 후 삭제설정된 기간 동안 보관
처리량중간 수준매우 높음
복제 소비제한적여러 Consumer Group 지원
순서 보장큐 단위파티션 단위

1.2 비동기 통신 (Asynchronous Communication)

동기 vs 비동기 통신

동기 통신 (Synchronous)
  • 요청 후 응답을 기다림
  • 블로킹 방식
  • 즉시 결과 확인 가능
  • 시스템 간 강한 결합
  • 장애 전파 위험
Client → Server (요청)
Client ← Server (응답) // 대기
Client 처리 계속
비동기 통신 (Asynchronous)
  • 요청 후 즉시 다른 작업 수행
  • 논블로킹 방식
  • 콜백/이벤트로 결과 처리
  • 시스템 간 느슨한 결합
  • 높은 처리량과 확장성
Client → Queue (메시지)
Client 처리 계속
Queue → Consumer (처리)

비동기 통신의 장점

성능 향상
  • 높은 처리량 (Throughput)
  • 낮은 지연시간 (Latency)
  • 리소스 효율적 사용
  • 병렬 처리 가능
시스템 안정성
  • 장애 격리 (Fault Isolation)
  • 백프레셔 처리
  • 재시도 메커니즘
  • 부분 장애 허용

1.3 Apache Kafka 소개

Kafka란?

Apache Kafka는 LinkedIn에서 개발한 분산 스트리밍 플랫폼으로, 대용량 실시간 데이터 파이프라인과 스트리밍 애플리케이션을 구축하기 위해 설계되었습니다.

Kafka의 핵심 특징
  • 높은 처리량: 초당 수백만 메시지 처리
  • 낮은 지연시간: 밀리초 단위 응답
  • 내구성: 디스크 기반 메시지 저장
  • 확장성: 수평적 확장 지원
  • 분산 처리: 클러스터 환경 지원
  • 복제: 데이터 복제를 통한 가용성
  • 순서 보장: 파티션 내 메시지 순서
  • 스트림 처리: Kafka Streams 지원

Kafka 아키텍처 개요

Producer
Broker
Consumer

Producer

메시지 생성 및 전송

Broker

메시지 저장 및 관리

Consumer

메시지 소비 및 처리

Kafka vs 다른 메시징 시스템

특징KafkaRabbitMQActiveMQRedis Pub/Sub
처리량매우 높음중간중간높음
지연시간낮음중간높음매우 낮음
내구성높음높음높음낮음
복제 소비우수제한적제한적우수
운영 복잡도높음중간중간낮음

1.4 Kafka 사용 사례

실시간 데이터 파이프라인

  • 로그 수집 및 분석
  • 메트릭 모니터링
  • ETL 파이프라인
  • 데이터 레이크 구축
예시: 웹 서버 로그 → Kafka → Elasticsearch → Kibana

이벤트 기반 아키텍처

  • 마이크로서비스 통신
  • 도메인 이벤트 발행
  • CQRS 패턴 구현
  • 이벤트 소싱
예시: 주문 생성 → 재고 차감 → 결제 처리 → 배송 준비

스트림 처리

  • 실시간 분석
  • 복잡 이벤트 처리 (CEP)
  • 윈도우 기반 집계
  • 패턴 매칭
예시: 클릭스트림 → 실시간 추천 → 개인화 콘텐츠

메시징 시스템

  • 비동기 작업 처리
  • 알림 시스템
  • 배치 작업 큐
  • 워크플로우 관리
예시: 이메일 발송 → 이미지 처리 → 보고서 생성

산업별 활용 사례

금융
  • 실시간 거래 처리
  • 사기 탐지
  • 리스크 관리
  • 규제 보고
전자상거래
  • 재고 관리
  • 주문 처리
  • 개인화 추천
  • 가격 최적화
IoT
  • 센서 데이터 수집
  • 디바이스 모니터링
  • 예측 유지보수
  • 스마트 시티

1.5 핵심 요약

메시징의 핵심 가치

  • 시스템 간 느슨한 결합
  • 비동기 처리를 통한 성능 향상
  • 확장성과 내결함성 확보
  • 실시간 데이터 처리 가능
  • 복잡한 워크플로우 관리
  • 이벤트 기반 아키텍처 구현

Kafka 선택 이유

Kafka는 높은 처리량, 낮은 지연시간, 강력한 내구성을 제공하며, 대규모 분산 시스템에서 안정적인 메시징 플랫폼으로 검증되었습니다. 특히 실시간 스트리밍과 이벤트 기반 아키텍처 구현에 최적화되어 있습니다.

2. Kafka 기초

2.1 Topic과 Partition

Topic (토픽)

Topic은 Kafka에서 메시지를 분류하는 논리적 단위입니다. 데이터베이스의 테이블이나 파일 시스템의 폴더와 유사한 개념으로, 관련된 메시지들을 그룹화하여 관리합니다.

Topic 특징
  • 논리적 분류: 메시지 유형별 분리 (예: user-events, order-events)
  • 다중 Producer/Consumer: 여러 애플리케이션이 동시 접근 가능
  • 영구 저장: 설정된 보존 기간 동안 메시지 유지
  • 순서 보장: 파티션 내에서 메시지 순서 유지

Partition (파티션)

Partition은 Topic을 물리적으로 분할한 단위입니다. 각 파티션은 순서가 보장되는 불변의 메시지 시퀀스로, Kafka의 확장성과 병렬 처리의 핵심입니다.

파티션 구조 시각화
Topic: user-events
Partition 0
[msg0] [msg3] [msg6]
Partition 1
[msg1] [msg4] [msg7]
Partition 2
[msg2] [msg5] [msg8]
파티션의 장점
  • 병렬 처리: 여러 Consumer가 동시 처리
  • 확장성: 파티션 수만큼 처리량 증가
  • 분산 저장: 여러 브로커에 분산 배치
  • 내결함성: 복제를 통한 데이터 보호
  • 순서 보장: 파티션 내 메시지 순서 유지
  • 로드 밸런싱: 메시지 분산 처리

파티션 키 (Partition Key)

파티션 키는 메시지가 어느 파티션으로 전송될지 결정하는 값입니다. 동일한 키를 가진 메시지는 항상 같은 파티션으로 전송되어 순서가 보장됩니다.

파티션 할당 예시
key="user123" → hash(user123) % 3 = 1 → Partition 1
key="user456" → hash(user456) % 3 = 0 → Partition 0
key="user789" → hash(user789) % 3 = 2 → Partition 2

2.2 Consumer Group

Consumer Group이란?

Consumer Group은 동일한 Topic을 소비하는 Consumer들의 논리적 그룹입니다. 그룹 내의 각 Consumer는 서로 다른 파티션을 담당하여 병렬 처리를 가능하게 합니다.

Consumer Group 특징
  • 파티션 독점: 각 파티션은 그룹 내 하나의 Consumer만 소비
  • 자동 리밸런싱: Consumer 추가/제거 시 파티션 재할당
  • 오프셋 관리: 그룹별로 독립적인 오프셋 관리
  • 확장성: Consumer 수를 조정하여 처리량 제어

Consumer Group 동작 방식

시나리오 1: Consumer 수 = 파티션 수
Partition 0
Consumer A
Partition 1
Consumer B
Partition 2
Consumer C

최적 상태: 모든 Consumer가 활성화

시나리오 2: Consumer 수 > 파티션 수
Partition 0
Consumer A
Partition 1
Consumer B
Partition 2
Consumer C
-
Consumer D (유휴)

비효율: 일부 Consumer가 유휴 상태

리밸런싱 (Rebalancing)

리밸런싱 발생 조건
  • Consumer Group에 새로운 Consumer 추가
  • 기존 Consumer가 그룹에서 제거 (장애, 종료)
  • Topic에 새로운 파티션 추가
  • Consumer의 하트비트 타임아웃
리밸런싱 과정
  1. Group Coordinator가 리밸런싱 시작 신호
  2. 모든 Consumer가 현재 파티션 소비 중단
  3. 파티션 재할당 계산 및 통지
  4. Consumer들이 새로운 파티션으로 소비 재시작

2.3 Offset

Offset이란?

Offset은 파티션 내에서 각 메시지의 고유한 순차 번호입니다. Consumer가 어디까지 메시지를 읽었는지 추적하는 포인터 역할을 합니다.

Offset 구조
0
1
2
3
4
5
...
■ 읽은 메시지 |■ 현재 위치 |■ 읽지 않은 메시지

Offset 관리 방식

자동 커밋 (Auto Commit)
  • 설정된 간격으로 자동 오프셋 커밋
  • 간단한 설정, 개발 편의성
  • 메시지 중복/손실 가능성
  • 기본값: 5초 간격
enable.auto.commit=true
auto.commit.interval.ms=5000
수동 커밋 (Manual Commit)
  • 애플리케이션에서 직접 커밋 제어
  • 정확한 처리 보장
  • 복잡한 구현 필요
  • At-least-once 보장
enable.auto.commit=false
consumer.commitSync()

Offset 저장소

__consumer_offsets Topic

Kafka는 Consumer Group별 오프셋 정보를 내부 토픽인 __consumer_offsets에 저장합니다.

저장 정보
  • Consumer Group ID
  • Topic 이름
  • 파티션 번호
  • 커밋된 오프셋
  • 메타데이터 (타임스탬프 등)

2.4 Kafka 아키텍처

전체 아키텍처 구조

Producer

메시지 생성자

  • • 메시지 발행
  • • 파티션 선택
  • • 배치 처리
  • • 압축
Kafka Cluster

브로커 집합

  • • 메시지 저장
  • • 복제 관리
  • • 리더 선출
  • • 메타데이터 관리
Consumer

메시지 소비자

  • • 메시지 소비
  • • 오프셋 관리
  • • 그룹 관리
  • • 리밸런싱

핵심 컴포넌트

Broker
  • Kafka 서버 인스턴스
  • 파티션 데이터 저장
  • Producer/Consumer 요청 처리
  • 복제 및 동기화
ZooKeeper
  • 클러스터 메타데이터 관리
  • 브로커 상태 모니터링
  • 리더 선출 조정
  • 설정 정보 저장
Controller
  • 클러스터 관리 담당 브로커
  • 파티션 리더 선출
  • 브로커 추가/제거 처리
  • 메타데이터 변경 전파
Group Coordinator
  • Consumer Group 관리
  • 오프셋 커밋 처리
  • 리밸런싱 조정
  • 멤버십 관리

데이터 흐름

1. 메시지 발행 (Produce)
Producer
Partition Leader
Replicas
2. 메시지 소비 (Consume)
Partition
Consumer
Offset Commit

2.5 핵심 요약

Kafka 핵심 개념

데이터 구조
  • Topic: 메시지 분류 단위
  • Partition: 물리적 분할 단위
  • Offset: 메시지 위치 식별자
처리 방식
  • Consumer Group: 병렬 처리
  • Rebalancing: 동적 할당
  • Commit: 진행 상황 저장

설계 고려사항

파티션 수는 예상 처리량과 Consumer 수를 고려하여 결정하고, Consumer Group 설계 시 리밸런싱 비용과 처리 순서 요구사항을 균형있게 고려해야 합니다. 오프셋 관리 방식은 데이터 일관성 요구사항에 따라 선택합니다.

3. Spring Kafka 설정

3.1 Spring Kafka 기본 설정

의존성 추가

<!-- Maven -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<!-- Gradle -->
implementation 'org.springframework.kafka:spring-kafka'

application.yml 설정

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
    consumer:
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false

3.2 KafkaTemplate 설정

기본 KafkaTemplate

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

JSON 직렬화 설정

@Bean
public ProducerFactory<String, Object> jsonProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> jsonKafkaTemplate() {
    return new KafkaTemplate<>(jsonProducerFactory());
}

3.3 @KafkaListener 설정

Consumer Factory 설정

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3); // 동시 처리 스레드 수
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

기본 리스너 구현

@Component
public class MessageListener {

    @KafkaListener(topics = "user-events", groupId = "user-service")
    public void listen(String message) {
        log.info("Received message: {}", message);
        // 메시지 처리 로직
    }

    @KafkaListener(topics = "order-events", groupId = "order-service")
    public void listenWithHeaders(
            @Payload String message,
            @Header("eventType") String eventType,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) long offset) {
        
        log.info("Received message: {} from topic: {}, partition: {}, offset: {}", 
                message, topic, partition, offset);
    }
}

3.4 설정 최적화

Producer 최적화

성능 최적화 설정
// 처리량 최적화
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);

// 안정성 최적화
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

Consumer 최적화

처리량 최적화 설정
// 배치 처리 최적화
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);

// 메모리 최적화
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536);
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);

// 세션 관리
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);

환경별 설정 관리

개발 환경
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: 1
      retries: 0
    consumer:
      auto-offset-reset: latest
      enable-auto-commit: true
운영 환경
spring:
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    producer:
      acks: all
      retries: 2147483647
      enable-idempotence: true
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false

3.5 핵심 요약

Spring Kafka 설정 포인트

Producer 설정
  • KafkaTemplate을 통한 메시지 발송
  • 직렬화 방식 선택 (String, JSON)
  • 배치 처리 및 압축 설정
Consumer 설정
  • @KafkaListener 어노테이션 활용
  • 동시성 및 오프셋 관리
  • 에러 처리 및 재시도 설정

4. Producer 구현

4.1 메시지 전송

기본 메시지 전송

@Service
public class MessageProducer {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    // 단순 전송
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
    
    // 키와 함께 전송
    public void sendMessage(String topic, String key, String message) {
        kafkaTemplate.send(topic, key, message);
    }
    
    // 파티션 지정 전송
    public void sendMessage(String topic, int partition, String key, String message) {
        kafkaTemplate.send(topic, partition, key, message);
    }
}

헤더와 함께 전송

public void sendMessageWithHeaders(String topic, String key, String message) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
    
    // 헤더 추가
    record.headers().add("eventType", "USER_CREATED".getBytes());
    record.headers().add("version", "1.0".getBytes());
    record.headers().add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
    
    kafkaTemplate.send(record);
}

// JSON 객체 전송
public void sendJsonMessage(String topic, String key, Object payload) {
    ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, payload);
    record.headers().add("contentType", "application/json".getBytes());
    
    jsonKafkaTemplate.send(record);
}

4.2 콜백 처리

ListenableFuture 콜백

public void sendMessageWithCallback(String topic, String key, String message) {
    ListenableFuture<SendResult<String, String>> future = 
        kafkaTemplate.send(topic, key, message);
    
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            RecordMetadata metadata = result.getRecordMetadata();
            log.info("Message sent successfully to topic: {}, partition: {}, offset: {}", 
                    metadata.topic(), metadata.partition(), metadata.offset());
        }
        
        @Override
        public void onFailure(Throwable ex) {
            log.error("Failed to send message to topic: {}", topic, ex);
            // 실패 처리 로직 (재시도, 알림 등)
        }
    });
}

CompletableFuture 활용

public CompletableFuture<SendResult<String, String>> sendMessageAsync(
        String topic, String key, String message) {
    
    return kafkaTemplate.send(topic, key, message)
        .completable()
        .whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("Failed to send message", ex);
                // 메트릭 업데이트, 알림 등
                updateFailureMetrics(topic);
            } else {
                log.info("Message sent: {}", result.getRecordMetadata());
                updateSuccessMetrics(topic);
            }
        });
}

// 배치 전송
public List<CompletableFuture<SendResult<String, String>>> sendBatchMessages(
        String topic, List<MessageDto> messages) {
    
    return messages.stream()
        .map(msg -> sendMessageAsync(topic, msg.getKey(), msg.getValue()))
        .collect(Collectors.toList());
}

4.3 트랜잭션 처리

트랜잭션 설정

@Configuration
public class KafkaTransactionConfig {
    
    @Bean
    public ProducerFactory<String, String> transactionalProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // 트랜잭션 설정
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-1");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Bean
    public KafkaTemplate<String, String> transactionalKafkaTemplate() {
        return new KafkaTemplate<>(transactionalProducerFactory());
    }
}

트랜잭션 사용

@Service
@Transactional
public class TransactionalMessageService {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final UserRepository userRepository;
    
    // 데이터베이스와 Kafka 트랜잭션 연동
    @KafkaTransactionManager
    public void createUserWithEvent(UserDto userDto) {
        // 1. 데이터베이스 저장
        User user = userRepository.save(new User(userDto));
        
        // 2. Kafka 메시지 전송 (같은 트랜잭션)
        kafkaTemplate.send("user-events", user.getId(), 
            createUserEvent(user));
        
        // 3. 추가 이벤트 전송
        kafkaTemplate.send("audit-events", user.getId(), 
            createAuditEvent("USER_CREATED", user));
    }
    
    // 수동 트랜잭션 관리
    public void sendMessagesInTransaction(List<MessageDto> messages) {
        kafkaTemplate.executeInTransaction(template -> {
            for (MessageDto msg : messages) {
                template.send(msg.getTopic(), msg.getKey(), msg.getValue());
            }
            return null;
        });
    }
}

4.4 직렬화 (Serialization)

커스텀 직렬화

public class UserEventSerializer implements Serializer<UserEvent> {
    
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(String topic, UserEvent data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing UserEvent", e);
        }
    }
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 설정 초기화
    }
    
    @Override
    public void close() {
        // 리소스 정리
    }
}

// Avro 직렬화 예시
public class AvroUserEventSerializer implements Serializer<UserEvent> {
    
    private final SpecificDatumWriter<UserEvent> writer = 
        new SpecificDatumWriter<>(UserEvent.class);
    
    @Override
    public byte[] serialize(String topic, UserEvent data) {
        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
            writer.write(data, encoder);
            encoder.flush();
            return out.toByteArray();
        } catch (IOException e) {
            throw new SerializationException("Error serializing Avro UserEvent", e);
        }
    }
}

스키마 레지스트리 연동

Confluent Schema Registry
@Configuration
public class SchemaRegistryConfig {
    
    @Bean
    public ProducerFactory<String, UserEvent> avroProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                  KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("auto.register.schemas", false);
        props.put("use.latest.version", true);
        
        return new DefaultKafkaProducerFactory<>(props);
    }
}

4.5 핵심 요약

Producer 구현 포인트

메시지 전송
  • KafkaTemplate을 통한 다양한 전송 방식
  • 헤더와 메타데이터 활용
  • 비동기 콜백 처리
고급 기능
  • 트랜잭션을 통한 일관성 보장
  • 커스텀 직렬화 구현
  • 스키마 레지스트리 연동

5. Consumer 구현

5.1 메시지 수신

기본 Consumer 구현

@Component
public class UserEventConsumer {
    
    private static final Logger log = LoggerFactory.getLogger(UserEventConsumer.class);
    
    @KafkaListener(topics = "user-events", groupId = "user-service")
    public void handleUserEvent(String message) {
        log.info("Received user event: {}", message);
        // 메시지 처리 로직
        processUserEvent(message);
    }
    
    @KafkaListener(topics = "order-events", groupId = "order-service")
    public void handleOrderEvent(
            @Payload String message,
            @Header("eventType") String eventType,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {
        
        try {
            log.info("Processing {} event from {}:{} at offset {}", 
                    eventType, topic, partition, offset);
            
            processOrderEvent(message, eventType);
            
            // 수동 커밋
            ack.acknowledge();
            
        } catch (Exception e) {
            log.error("Failed to process order event", e);
            // 에러 처리 로직
        }
    }
}

배치 처리

@KafkaListener(topics = "batch-events", 
               groupId = "batch-processor",
               containerFactory = "batchKafkaListenerContainerFactory")
public void handleBatchEvents(List<ConsumerRecord<String, String>> records,
                             Acknowledgment ack) {
    
    log.info("Received batch of {} messages", records.size());
    
    try {
        List<EventDto> events = records.stream()
            .map(record -> parseEvent(record.value()))
            .collect(Collectors.toList());
        
        // 배치 처리
        processBatchEvents(events);
        
        // 배치 커밋
        ack.acknowledge();
        
    } catch (Exception e) {
        log.error("Failed to process batch events", e);
        // 배치 실패 처리
    }
}

// JSON 역직렬화
@KafkaListener(topics = "json-events", groupId = "json-processor")
public void handleJsonEvent(@Payload UserEvent event,
                           ConsumerRecord<String, UserEvent> record) {
    
    log.info("Received user event: {} for user: {}", 
            event.getEventType(), event.getUserId());
    
    // 타입 안전한 처리
    switch (event.getEventType()) {
        case USER_CREATED:
            handleUserCreated(event);
            break;
        case USER_UPDATED:
            handleUserUpdated(event);
            break;
        case USER_DELETED:
            handleUserDeleted(event);
            break;
    }
}

5.2 에러 처리

에러 핸들러 설정

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
    kafkaListenerContainerFactory() {
    
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    
    // 에러 핸들러 설정
    factory.setCommonErrorHandler(new DefaultErrorHandler(
        new FixedBackOff(1000L, 3L))); // 1초 간격으로 3번 재시도
    
    return factory;
}

// 커스텀 에러 핸들러
@Component
public class KafkaErrorHandler implements CommonErrorHandler {
    
    @Override
    public void handleOtherException(Exception thrownException, 
                                   Consumer<?, ?> consumer, 
                                   MessageListenerContainer container, 
                                   boolean batchListener) {
        
        log.error("Kafka consumer error", thrownException);
        
        // 메트릭 업데이트
        updateErrorMetrics(thrownException);
        
        // 알림 발송
        sendErrorNotification(thrownException);
    }
}

예외별 처리 전략

예외 분류 및 처리
@KafkaListener(topics = "critical-events", groupId = "critical-processor")
public void handleCriticalEvent(String message, Acknowledgment ack) {
    try {
        processCriticalEvent(message);
        ack.acknowledge();
        
    } catch (ValidationException e) {
        // 검증 오류 - 재시도 불필요
        log.warn("Invalid message format: {}", message, e);
        sendToDeadLetterQueue(message, e);
        ack.acknowledge(); // 스킵
        
    } catch (BusinessException e) {
        // 비즈니스 로직 오류 - 재시도 가능
        log.error("Business logic error: {}", message, e);
        throw e; // 재시도 트리거
        
    } catch (ExternalServiceException e) {
        // 외부 서비스 오류 - 지연 후 재시도
        log.error("External service error: {}", message, e);
        scheduleDelayedRetry(message, e);
        ack.acknowledge();
        
    } catch (Exception e) {
        // 예상치 못한 오류
        log.error("Unexpected error: {}", message, e);
        sendToDeadLetterQueue(message, e);
        ack.acknowledge();
    }
}

5.3 재시도 메커니즘

재시도 설정

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
    retryKafkaListenerContainerFactory() {
    
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    
    // 지수 백오프 재시도
    ExponentialBackOff backOff = new ExponentialBackOff();
    backOff.setInitialInterval(1000L);      // 1초
    backOff.setMultiplier(2.0);             // 2배씩 증가
    backOff.setMaxInterval(10000L);         // 최대 10초
    backOff.setMaxElapsedTime(60000L);      // 최대 1분
    
    DefaultErrorHandler errorHandler = new DefaultErrorHandler(
        (record, exception) -> {
            // 최종 실패 시 처리
            log.error("Final retry failed for record: {}", record, exception);
            sendToDeadLetterQueue(record, exception);
        }, backOff);
    
    // 재시도하지 않을 예외 설정
    errorHandler.addNotRetryableExceptions(ValidationException.class);
    
    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

// 조건부 재시도
@RetryableTopic(
    attempts = "3",
    backoff = @Backoff(delay = 1000, multiplier = 2.0),
    dltStrategy = DltStrategy.FAIL_ON_ERROR,
    include = {TransientException.class}
)
@KafkaListener(topics = "retry-events", groupId = "retry-processor")
public void handleRetryableEvent(String message) {
    log.info("Processing retryable event: {}", message);
    
    if (shouldFail()) {
        throw new TransientException("Temporary failure");
    }
    
    processEvent(message);
}

커스텀 재시도 로직

@Component
public class RetryableMessageProcessor {
    
    private final RetryTemplate retryTemplate;
    
    public RetryableMessageProcessor() {
        this.retryTemplate = RetryTemplate.builder()
            .maxAttempts(3)
            .exponentialBackoff(1000, 2, 10000)
            .retryOn(TransientException.class)
            .build();
    }
    
    @KafkaListener(topics = "custom-retry-events", groupId = "custom-retry")
    public void handleMessage(String message, Acknowledgment ack) {
        try {
            retryTemplate.execute(context -> {
                log.info("Attempt {} for message: {}", 
                        context.getRetryCount() + 1, message);
                
                processMessage(message);
                return null;
            });
            
            ack.acknowledge();
            
        } catch (Exception e) {
            log.error("All retry attempts failed for message: {}", message, e);
            handleFinalFailure(message, e);
            ack.acknowledge();
        }
    }
    
    private void handleFinalFailure(String message, Exception e) {
        // DLQ 전송
        kafkaTemplate.send("failed-events-dlq", message);
        
        // 알림 발송
        notificationService.sendFailureAlert(message, e);
        
        // 메트릭 업데이트
        meterRegistry.counter("kafka.consumer.final_failure").increment();
    }
}

5.4 DLQ (Dead Letter Queue)

DLQ 설정

DLQ 구현
@Component
public class DeadLetterQueueHandler {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendToDeadLetterQueue(ConsumerRecord<String, String> record, 
                                     Exception exception) {
        
        String dlqTopic = record.topic() + ".DLQ";
        
        ProducerRecord<String, String> dlqRecord = 
            new ProducerRecord<>(dlqTopic, record.key(), record.value());
        
        // 원본 메타데이터 보존
        dlqRecord.headers().add("original.topic", record.topic().getBytes());
        dlqRecord.headers().add("original.partition", 
                               String.valueOf(record.partition()).getBytes());
        dlqRecord.headers().add("original.offset", 
                               String.valueOf(record.offset()).getBytes());
        dlqRecord.headers().add("failure.reason", 
                               exception.getMessage().getBytes());
        dlqRecord.headers().add("failure.timestamp", 
                               String.valueOf(System.currentTimeMillis()).getBytes());
        dlqRecord.headers().add("failure.class", 
                               exception.getClass().getName().getBytes());
        
        kafkaTemplate.send(dlqRecord);
        
        log.warn("Message sent to DLQ: topic={}, key={}, reason={}", 
                dlqTopic, record.key(), exception.getMessage());
    }
}

// DLQ 메시지 처리
@KafkaListener(topics = "*.DLQ", groupId = "dlq-processor")
public void handleDeadLetterMessage(ConsumerRecord<String, String> record) {
    
    String originalTopic = new String(record.headers().lastHeader("original.topic").value());
    String failureReason = new String(record.headers().lastHeader("failure.reason").value());
    
    log.info("Processing DLQ message from original topic: {}, reason: {}", 
            originalTopic, failureReason);
    
    // DLQ 메시지 분석 및 처리
    analyzeDlqMessage(record);
    
    // 필요시 수동 재처리
    if (canRetryMessage(record)) {
        retryOriginalMessage(record);
    }
}

DLQ 모니터링

@Component
public class DlqMonitoringService {
    
    private final MeterRegistry meterRegistry;
    
    @EventListener
    public void handleDlqEvent(DlqMessageEvent event) {
        // 메트릭 업데이트
        meterRegistry.counter("kafka.dlq.messages", 
                             "topic", event.getOriginalTopic(),
                             "reason", event.getFailureReason())
                     .increment();
        
        // 임계치 확인
        checkDlqThreshold(event.getOriginalTopic());
    }
    
    @Scheduled(fixedRate = 60000) // 1분마다
    public void monitorDlqSize() {
        // DLQ 크기 모니터링
        Map<String, Long> dlqSizes = getDlqTopicSizes();
        
        dlqSizes.forEach((topic, size) -> {
            meterRegistry.gauge("kafka.dlq.size", Tags.of("topic", topic), size);
            
            if (size > DLQ_SIZE_THRESHOLD) {
                alertService.sendDlqSizeAlert(topic, size);
            }
        });
    }
}

5.5 핵심 요약

Consumer 구현 포인트

메시지 처리
  • @KafkaListener를 통한 선언적 처리
  • 배치 처리 및 수동 커밋
  • 헤더와 메타데이터 활용
안정성 확보
  • 예외별 차별화된 에러 처리
  • 지능적 재시도 메커니즘
  • DLQ를 통한 실패 메시지 관리

6. 실전 활용

6.1 이벤트 기반 아키텍처

도메인 이벤트 설계

// 도메인 이벤트 정의
public abstract class DomainEvent {
    private final String eventId;
    private final LocalDateTime occurredAt;
    private final String aggregateId;
    private final String eventType;
    
    protected DomainEvent(String aggregateId, String eventType) {
        this.eventId = UUID.randomUUID().toString();
        this.occurredAt = LocalDateTime.now();
        this.aggregateId = aggregateId;
        this.eventType = eventType;
    }
}

// 구체적인 이벤트
public class UserCreatedEvent extends DomainEvent {
    private final String userId;
    private final String email;
    private final String name;
    
    public UserCreatedEvent(String userId, String email, String name) {
        super(userId, "USER_CREATED");
        this.userId = userId;
        this.email = email;
        this.name = name;
    }
}

public class OrderPlacedEvent extends DomainEvent {
    private final String orderId;
    private final String customerId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    
    public OrderPlacedEvent(String orderId, String customerId, 
                           List<OrderItem> items, BigDecimal totalAmount) {
        super(orderId, "ORDER_PLACED");
        this.orderId = orderId;
        this.customerId = customerId;
        this.items = items;
        this.totalAmount = totalAmount;
    }
}

이벤트 발행자

@Component
public class DomainEventPublisher {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;
    
    public void publishEvent(DomainEvent event) {
        try {
            String topic = getTopicForEvent(event.getEventType());
            String key = event.getAggregateId();
            
            ProducerRecord<String, Object> record = 
                new ProducerRecord<>(topic, key, event);
            
            // 이벤트 메타데이터 추가
            record.headers().add("eventId", event.getEventId().getBytes());
            record.headers().add("eventType", event.getEventType().getBytes());
            record.headers().add("occurredAt", 
                               event.getOccurredAt().toString().getBytes());
            record.headers().add("aggregateId", event.getAggregateId().getBytes());
            
            kafkaTemplate.send(record)
                .addCallback(
                    result -> log.info("Event published: {}", event.getEventId()),
                    failure -> log.error("Failed to publish event: {}", 
                                       event.getEventId(), failure)
                );
                
        } catch (Exception e) {
            log.error("Error publishing event: {}", event, e);
            throw new EventPublishingException("Failed to publish event", e);
        }
    }
    
    private String getTopicForEvent(String eventType) {
        return switch (eventType) {
            case "USER_CREATED", "USER_UPDATED", "USER_DELETED" -> "user-events";
            case "ORDER_PLACED", "ORDER_CANCELLED", "ORDER_SHIPPED" -> "order-events";
            case "PAYMENT_PROCESSED", "PAYMENT_FAILED" -> "payment-events";
            default -> "domain-events";
        };
    }
}

6.2 주문 처리 시스템

주문 서비스

@Service
@Transactional
public class OrderService {
    
    private final OrderRepository orderRepository;
    private final DomainEventPublisher eventPublisher;
    
    public Order createOrder(CreateOrderRequest request) {
        // 1. 주문 생성
        Order order = Order.builder()
            .customerId(request.getCustomerId())
            .items(request.getItems())
            .totalAmount(calculateTotal(request.getItems()))
            .status(OrderStatus.PENDING)
            .build();
        
        Order savedOrder = orderRepository.save(order);
        
        // 2. 도메인 이벤트 발행
        OrderPlacedEvent event = new OrderPlacedEvent(
            savedOrder.getId(),
            savedOrder.getCustomerId(),
            savedOrder.getItems(),
            savedOrder.getTotalAmount()
        );
        
        eventPublisher.publishEvent(event);
        
        return savedOrder;
    }
    
    public void cancelOrder(String orderId, String reason) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
        
        order.cancel(reason);
        orderRepository.save(order);
        
        // 주문 취소 이벤트 발행
        OrderCancelledEvent event = new OrderCancelledEvent(
            orderId, reason, LocalDateTime.now()
        );
        
        eventPublisher.publishEvent(event);
    }
}

재고 서비스

@Component
public class InventoryEventHandler {
    
    private final InventoryService inventoryService;
    private final DomainEventPublisher eventPublisher;
    
    @KafkaListener(topics = "order-events", groupId = "inventory-service")
    public void handleOrderEvent(OrderPlacedEvent event) {
        try {
            log.info("Processing inventory for order: {}", event.getOrderId());
            
            // 재고 확인 및 차감
            List<InventoryReservation> reservations = 
                inventoryService.reserveItems(event.getItems());
            
            if (reservations.stream().allMatch(InventoryReservation::isSuccessful)) {
                // 재고 확보 성공
                InventoryReservedEvent reservedEvent = new InventoryReservedEvent(
                    event.getOrderId(), reservations
                );
                eventPublisher.publishEvent(reservedEvent);
                
            } else {
                // 재고 부족
                InventoryReservationFailedEvent failedEvent = 
                    new InventoryReservationFailedEvent(
                        event.getOrderId(), 
                        getFailedItems(reservations)
                    );
                eventPublisher.publishEvent(failedEvent);
            }
            
        } catch (Exception e) {
            log.error("Failed to process inventory for order: {}", 
                     event.getOrderId(), e);
            
            InventoryProcessingFailedEvent errorEvent = 
                new InventoryProcessingFailedEvent(
                    event.getOrderId(), e.getMessage()
                );
            eventPublisher.publishEvent(errorEvent);
        }
    }
    
    @KafkaListener(topics = "order-events", groupId = "inventory-service")
    public void handleOrderCancellation(OrderCancelledEvent event) {
        log.info("Releasing inventory for cancelled order: {}", event.getOrderId());
        
        inventoryService.releaseReservation(event.getOrderId());
        
        InventoryReleasedEvent releasedEvent = new InventoryReleasedEvent(
            event.getOrderId(), LocalDateTime.now()
        );
        eventPublisher.publishEvent(releasedEvent);
    }
}

6.3 SAGA 패턴

SAGA 오케스트레이터

주문 처리 SAGA
@Component
public class OrderProcessingSaga {
    
    private final SagaStateRepository sagaStateRepository;
    private final DomainEventPublisher eventPublisher;
    
    @KafkaListener(topics = "order-events", groupId = "order-saga")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // SAGA 상태 초기화
        SagaState sagaState = SagaState.builder()
            .sagaId(UUID.randomUUID().toString())
            .orderId(event.getOrderId())
            .currentStep(SagaStep.INVENTORY_RESERVATION)
            .status(SagaStatus.IN_PROGRESS)
            .build();
        
        sagaStateRepository.save(sagaState);
        
        // 첫 번째 단계: 재고 예약 요청
        RequestInventoryReservationCommand command = 
            new RequestInventoryReservationCommand(
                sagaState.getSagaId(),
                event.getOrderId(),
                event.getItems()
            );
        
        eventPublisher.publishEvent(command);
    }
    
    @KafkaListener(topics = "inventory-events", groupId = "order-saga")
    public void handleInventoryReserved(InventoryReservedEvent event) {
        SagaState sagaState = sagaStateRepository.findByOrderId(event.getOrderId())
            .orElseThrow(() -> new SagaNotFoundException(event.getOrderId()));
        
        // 다음 단계: 결제 처리
        sagaState.moveToNextStep(SagaStep.PAYMENT_PROCESSING);
        sagaStateRepository.save(sagaState);
        
        ProcessPaymentCommand command = new ProcessPaymentCommand(
            sagaState.getSagaId(),
            event.getOrderId(),
            sagaState.getPaymentAmount()
        );
        
        eventPublisher.publishEvent(command);
    }
    
    @KafkaListener(topics = "payment-events", groupId = "order-saga")
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        SagaState sagaState = sagaStateRepository.findByOrderId(event.getOrderId())
            .orElseThrow(() -> new SagaNotFoundException(event.getOrderId()));
        
        // 최종 단계: 주문 확정
        sagaState.complete();
        sagaStateRepository.save(sagaState);
        
        ConfirmOrderCommand command = new ConfirmOrderCommand(
            sagaState.getSagaId(),
            event.getOrderId()
        );
        
        eventPublisher.publishEvent(command);
    }
    
    // 보상 트랜잭션 처리
    @KafkaListener(topics = "inventory-events", groupId = "order-saga")
    public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
        SagaState sagaState = sagaStateRepository.findByOrderId(event.getOrderId())
            .orElseThrow(() -> new SagaNotFoundException(event.getOrderId()));
        
        // SAGA 실패 처리
        sagaState.fail("Inventory reservation failed");
        sagaStateRepository.save(sagaState);
        
        // 주문 취소
        CancelOrderCommand command = new CancelOrderCommand(
            sagaState.getSagaId(),
            event.getOrderId(),
            "Insufficient inventory"
        );
        
        eventPublisher.publishEvent(command);
    }
}

보상 트랜잭션

실패 시 롤백 처리
@Component
public class CompensationHandler {
    
    @KafkaListener(topics = "payment-events", groupId = "compensation-handler")
    public void handlePaymentFailed(PaymentFailedEvent event) {
        log.info("Starting compensation for failed payment: {}", event.getOrderId());
        
        // 1. 재고 예약 해제
        ReleaseInventoryReservationCommand releaseCommand = 
            new ReleaseInventoryReservationCommand(
                event.getOrderId(),
                "Payment failed"
            );
        eventPublisher.publishEvent(releaseCommand);
        
        // 2. 주문 상태 변경
        CancelOrderCommand cancelCommand = new CancelOrderCommand(
            event.getOrderId(),
            "Payment processing failed"
        );
        eventPublisher.publishEvent(cancelCommand);
        
        // 3. 고객 알림
        NotifyCustomerCommand notifyCommand = new NotifyCustomerCommand(
            event.getCustomerId(),
            "ORDER_PAYMENT_FAILED",
            Map.of("orderId", event.getOrderId(), "reason", event.getFailureReason())
        );
        eventPublisher.publishEvent(notifyCommand);
    }
    
    @KafkaListener(topics = "saga-events", groupId = "compensation-handler")
    public void handleSagaTimeout(SagaTimeoutEvent event) {
        log.warn("SAGA timeout detected: {}", event.getSagaId());
        
        SagaState sagaState = sagaStateRepository.findById(event.getSagaId())
            .orElseThrow(() -> new SagaNotFoundException(event.getSagaId()));
        
        // 현재 단계에 따른 보상 처리
        switch (sagaState.getCurrentStep()) {
            case INVENTORY_RESERVATION:
                // 재고 예약 중 타임아웃 - 주문 취소만
                cancelOrder(sagaState.getOrderId(), "Timeout during inventory reservation");
                break;
                
            case PAYMENT_PROCESSING:
                // 결제 중 타임아웃 - 재고 해제 + 주문 취소
                releaseInventoryAndCancelOrder(sagaState.getOrderId(), "Timeout during payment");
                break;
                
            case ORDER_CONFIRMATION:
                // 확정 중 타임아웃 - 수동 확인 필요
                escalateToManualReview(sagaState.getOrderId(), "Timeout during confirmation");
                break;
        }
        
        sagaState.timeout();
        sagaStateRepository.save(sagaState);
    }
}

SAGA 상태 관리

@Entity
public class SagaState {
    @Id
    private String sagaId;
    private String orderId;
    private SagaStep currentStep;
    private SagaStatus status;
    private LocalDateTime startedAt;
    private LocalDateTime completedAt;
    private String failureReason;
    
    @ElementCollection
    @MapKeyColumn(name = "step_name")
    @Column(name = "step_status")
    private Map<String, String> stepStatuses = new HashMap<>();
    
    public void moveToNextStep(SagaStep nextStep) {
        this.stepStatuses.put(this.currentStep.name(), "COMPLETED");
        this.currentStep = nextStep;
        this.stepStatuses.put(nextStep.name(), "IN_PROGRESS");
    }
    
    public void complete() {
        this.status = SagaStatus.COMPLETED;
        this.completedAt = LocalDateTime.now();
        this.stepStatuses.put(this.currentStep.name(), "COMPLETED");
    }
    
    public void fail(String reason) {
        this.status = SagaStatus.FAILED;
        this.failureReason = reason;
        this.completedAt = LocalDateTime.now();
        this.stepStatuses.put(this.currentStep.name(), "FAILED");
    }
}

// SAGA 모니터링
@Component
public class SagaMonitor {
    
    @Scheduled(fixedRate = 30000) // 30초마다
    public void checkSagaTimeouts() {
        LocalDateTime timeoutThreshold = LocalDateTime.now().minusMinutes(5);
        
        List<SagaState> timedOutSagas = sagaStateRepository
            .findByStatusAndStartedAtBefore(SagaStatus.IN_PROGRESS, timeoutThreshold);
        
        timedOutSagas.forEach(saga -> {
            log.warn("SAGA timeout detected: {}", saga.getSagaId());
            
            SagaTimeoutEvent timeoutEvent = new SagaTimeoutEvent(
                saga.getSagaId(),
                saga.getOrderId(),
                saga.getCurrentStep()
            );
            
            eventPublisher.publishEvent(timeoutEvent);
        });
    }
}

6.4 핵심 요약

실전 활용 패턴

이벤트 기반 설계
  • 도메인 이벤트를 통한 서비스 간 통신
  • 느슨한 결합과 높은 확장성
  • 이벤트 소싱과 CQRS 패턴
분산 트랜잭션
  • SAGA 패턴을 통한 일관성 보장
  • 보상 트랜잭션으로 롤백 처리
  • 상태 관리와 타임아웃 처리

7. 정리

7.1 Kafka 설계 가이드

토픽 설계 원칙

토픽 명명 규칙
도메인 기반: user-events, order-events, payment-events
환경 구분: prod.user-events, dev.user-events
버전 관리: user-events-v1, user-events-v2
DLQ 구분: user-events.DLQ, user-events.retry
파티션 수 결정
  • 처리량 기준: 예상 TPS × 안전 계수 (2-3배)
  • Consumer 수: 최대 Consumer 수와 동일하게 설정
  • 순서 보장: 순서가 중요한 경우 파티션 수 최소화
  • 확장성: 향후 확장을 고려하여 여유있게 설정

성능 최적화 가이드

Producer 최적화
  • 배치 크기: 16KB-64KB 권장
  • 압축: snappy 또는 lz4 사용
  • Linger 시간: 지연 허용 범위 내에서 설정
  • Idempotence: 중복 방지를 위해 활성화
Consumer 최적화
  • Fetch 크기: 네트워크 대역폭에 맞게 조정
  • Poll 레코드: 처리 능력에 맞게 제한
  • 세션 타임아웃: 리밸런싱 빈도 고려
  • 동시성: CPU 코어 수에 맞게 설정

7.2 모니터링

핵심 메트릭

구분메트릭설명임계값
Producerrecord-send-rate초당 전송 메시지 수예상 TPS 대비 모니터링
Producerrecord-error-rate전송 실패율< 1%
Consumerrecords-consumed-rate초당 소비 메시지 수Producer 대비 균형
Consumerrecords-lag-max최대 지연 메시지 수< 1000
BrokerUnderReplicatedPartitions복제 부족 파티션= 0

모니터링 구현

@Component
public class KafkaMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    @EventListener
    public void handleProducerMetrics(ProducerMetricsEvent event) {
        // Producer 메트릭 수집
        meterRegistry.counter("kafka.producer.records.sent",
                             "topic", event.getTopic())
                     .increment();
        
        meterRegistry.timer("kafka.producer.send.duration",
                           "topic", event.getTopic())
                     .record(event.getDuration(), TimeUnit.MILLISECONDS);
    }
    
    @EventListener
    public void handleConsumerMetrics(ConsumerMetricsEvent event) {
        // Consumer 메트릭 수집
        meterRegistry.counter("kafka.consumer.records.consumed",
                             "topic", event.getTopic(),
                             "group", event.getGroupId())
                     .increment();
        
        meterRegistry.gauge("kafka.consumer.lag",
                           Tags.of("topic", event.getTopic(),
                                  "partition", String.valueOf(event.getPartition())),
                           event.getLag());
    }
    
    @Scheduled(fixedRate = 30000) // 30초마다
    public void collectBrokerMetrics() {
        // JMX를 통한 브로커 메트릭 수집
        try {
            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
            
            // Under-replicated partitions
            ObjectName objectName = new ObjectName(
                "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
            Integer underReplicated = (Integer) server.getAttribute(objectName, "Value");
            
            meterRegistry.gauge("kafka.broker.under_replicated_partitions", underReplicated);
            
            // Request rate
            ObjectName requestName = new ObjectName(
                "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce");
            Double requestRate = (Double) server.getAttribute(requestName, "OneMinuteRate");
            
            meterRegistry.gauge("kafka.broker.produce_requests_per_sec", requestRate);
            
        } catch (Exception e) {
            log.error("Failed to collect broker metrics", e);
        }
    }
}

7.3 베스트 프랙티스

개발 가이드라인

✅ 권장사항
  • 멱등성 보장을 위한 고유 키 사용
  • 스키마 진화를 고려한 메시지 설계
  • 적절한 파티션 키 선택으로 순서 보장
  • DLQ를 통한 실패 메시지 처리
  • 환경별 설정 분리
  • 메트릭과 로깅을 통한 모니터링
  • 트랜잭션 범위 최소화
  • 배치 처리로 성능 최적화
❌ 주의사항
  • 과도한 파티션 수로 인한 오버헤드
  • 큰 메시지 크기로 인한 성능 저하
  • 동기식 처리로 인한 블로킹
  • 무한 재시도로 인한 리소스 낭비
  • 스키마 호환성 무시
  • 에러 처리 로직 누락
  • Consumer Group 설계 미흡
  • 오프셋 관리 소홀

운영 가이드라인

운영 체크리스트
배포 전 확인
  • 토픽 생성 및 파티션 수 확인
  • Consumer Group 설정 검증
  • 스키마 호환성 테스트
  • 성능 테스트 수행
  • 장애 시나리오 테스트
운영 중 모니터링
  • Consumer Lag 모니터링
  • 에러율 및 DLQ 크기 확인
  • 브로커 리소스 사용률
  • 네트워크 및 디스크 I/O
  • 리밸런싱 빈도 확인

트러블슈팅 가이드

일반적인 문제와 해결책
Consumer Lag 증가:Consumer 인스턴스 증가, 배치 크기 조정, 처리 로직 최적화
메시지 중복:멱등성 키 구현, 중복 제거 로직 추가
순서 보장 실패:파티션 키 검증, 단일 파티션 사용 고려
리밸런싱 빈발:세션 타임아웃 조정, 하트비트 간격 최적화

7.4 최종 요약

Spring Kafka 마스터리 로드맵

기초 단계
  • • Kafka 개념 이해
  • • Spring Kafka 설정
  • • 기본 Producer/Consumer
  • • 에러 처리 구현
중급 단계
  • • 트랜잭션 처리
  • • 커스텀 직렬화
  • • 배치 처리 최적화
  • • 모니터링 구현
고급 단계
  • • 이벤트 기반 아키텍처
  • • SAGA 패턴 구현
  • • 스키마 레지스트리
  • • 운영 최적화

핵심 가치

Spring Kafka는 단순한 메시징 도구를 넘어 현대적인 분산 시스템의 핵심 인프라입니다. 이벤트 기반 아키텍처를 통해 시스템 간 느슨한 결합을 실현하고, 높은 처리량과 확장성을 제공하며, 복잡한 비즈니스 워크플로우를 안정적으로 관리할 수 있습니다. 올바른 설계와 구현을 통해 견고하고 확장 가능한 시스템을 구축할 수 있습니다.

다음 단계

  • 실제 프로젝트에 Spring Kafka 적용
  • 성능 테스트 및 최적화 경험
  • Kafka Streams 학습
  • Confluent Platform 활용
  • 클라우드 환경 운영 경험
  • 커뮤니티 참여 및 기여