Spring 15: 메시징/Kafka
이벤트 기반 아키텍처
1. 메시징 개념
1.1 메시지 큐 (Message Queue)
메시지 큐란?
메시지 큐는 애플리케이션 간 비동기 통신을 가능하게 하는 미들웨어입니다. Producer가 메시지를 큐에 전송하면, Consumer가 나중에 해당 메시지를 처리할 수 있습니다.
메시지 큐의 핵심 특징
- 비동기 처리: 송신자와 수신자가 동시에 활성화될 필요 없음
- 느슨한 결합: 시스템 간 의존성 최소화
- 확장성: 메시지 처리량에 따른 유연한 확장
- 내결함성: 메시지 손실 방지 및 재처리 지원
- 순서 보장: 메시지 처리 순서 제어
전통적인 메시지 큐 vs 현대적 스트리밍
| 구분 | 전통적 큐 (RabbitMQ) | 스트리밍 (Kafka) |
|---|---|---|
| 메시지 저장 | 소비 후 삭제 | 설정된 기간 동안 보관 |
| 처리량 | 중간 수준 | 매우 높음 |
| 복제 소비 | 제한적 | 여러 Consumer Group 지원 |
| 순서 보장 | 큐 단위 | 파티션 단위 |
1.2 비동기 통신 (Asynchronous Communication)
동기 vs 비동기 통신
동기 통신 (Synchronous)
- 요청 후 응답을 기다림
- 블로킹 방식
- 즉시 결과 확인 가능
- 시스템 간 강한 결합
- 장애 전파 위험
Client → Server (요청)
Client ← Server (응답) // 대기
Client 처리 계속비동기 통신 (Asynchronous)
- 요청 후 즉시 다른 작업 수행
- 논블로킹 방식
- 콜백/이벤트로 결과 처리
- 시스템 간 느슨한 결합
- 높은 처리량과 확장성
Client → Queue (메시지)
Client 처리 계속
Queue → Consumer (처리)비동기 통신의 장점
성능 향상
- 높은 처리량 (Throughput)
- 낮은 지연시간 (Latency)
- 리소스 효율적 사용
- 병렬 처리 가능
시스템 안정성
- 장애 격리 (Fault Isolation)
- 백프레셔 처리
- 재시도 메커니즘
- 부분 장애 허용
1.3 Apache Kafka 소개
Kafka란?
Apache Kafka는 LinkedIn에서 개발한 분산 스트리밍 플랫폼으로, 대용량 실시간 데이터 파이프라인과 스트리밍 애플리케이션을 구축하기 위해 설계되었습니다.
Kafka의 핵심 특징
- 높은 처리량: 초당 수백만 메시지 처리
- 낮은 지연시간: 밀리초 단위 응답
- 내구성: 디스크 기반 메시지 저장
- 확장성: 수평적 확장 지원
- 분산 처리: 클러스터 환경 지원
- 복제: 데이터 복제를 통한 가용성
- 순서 보장: 파티션 내 메시지 순서
- 스트림 처리: Kafka Streams 지원
Kafka 아키텍처 개요
Producer
메시지 생성 및 전송
Broker
메시지 저장 및 관리
Consumer
메시지 소비 및 처리
Kafka vs 다른 메시징 시스템
| 특징 | Kafka | RabbitMQ | ActiveMQ | Redis Pub/Sub |
|---|---|---|---|---|
| 처리량 | 매우 높음 | 중간 | 중간 | 높음 |
| 지연시간 | 낮음 | 중간 | 높음 | 매우 낮음 |
| 내구성 | 높음 | 높음 | 높음 | 낮음 |
| 복제 소비 | 우수 | 제한적 | 제한적 | 우수 |
| 운영 복잡도 | 높음 | 중간 | 중간 | 낮음 |
1.4 Kafka 사용 사례
실시간 데이터 파이프라인
- 로그 수집 및 분석
- 메트릭 모니터링
- ETL 파이프라인
- 데이터 레이크 구축
이벤트 기반 아키텍처
- 마이크로서비스 통신
- 도메인 이벤트 발행
- CQRS 패턴 구현
- 이벤트 소싱
스트림 처리
- 실시간 분석
- 복잡 이벤트 처리 (CEP)
- 윈도우 기반 집계
- 패턴 매칭
메시징 시스템
- 비동기 작업 처리
- 알림 시스템
- 배치 작업 큐
- 워크플로우 관리
산업별 활용 사례
금융
- 실시간 거래 처리
- 사기 탐지
- 리스크 관리
- 규제 보고
전자상거래
- 재고 관리
- 주문 처리
- 개인화 추천
- 가격 최적화
IoT
- 센서 데이터 수집
- 디바이스 모니터링
- 예측 유지보수
- 스마트 시티
1.5 핵심 요약
메시징의 핵심 가치
- 시스템 간 느슨한 결합
- 비동기 처리를 통한 성능 향상
- 확장성과 내결함성 확보
- 실시간 데이터 처리 가능
- 복잡한 워크플로우 관리
- 이벤트 기반 아키텍처 구현
Kafka 선택 이유
Kafka는 높은 처리량, 낮은 지연시간, 강력한 내구성을 제공하며, 대규모 분산 시스템에서 안정적인 메시징 플랫폼으로 검증되었습니다. 특히 실시간 스트리밍과 이벤트 기반 아키텍처 구현에 최적화되어 있습니다.
2. Kafka 기초
2.1 Topic과 Partition
Topic (토픽)
Topic은 Kafka에서 메시지를 분류하는 논리적 단위입니다. 데이터베이스의 테이블이나 파일 시스템의 폴더와 유사한 개념으로, 관련된 메시지들을 그룹화하여 관리합니다.
Topic 특징
- 논리적 분류: 메시지 유형별 분리 (예: user-events, order-events)
- 다중 Producer/Consumer: 여러 애플리케이션이 동시 접근 가능
- 영구 저장: 설정된 보존 기간 동안 메시지 유지
- 순서 보장: 파티션 내에서 메시지 순서 유지
Partition (파티션)
Partition은 Topic을 물리적으로 분할한 단위입니다. 각 파티션은 순서가 보장되는 불변의 메시지 시퀀스로, Kafka의 확장성과 병렬 처리의 핵심입니다.
파티션 구조 시각화
파티션의 장점
- 병렬 처리: 여러 Consumer가 동시 처리
- 확장성: 파티션 수만큼 처리량 증가
- 분산 저장: 여러 브로커에 분산 배치
- 내결함성: 복제를 통한 데이터 보호
- 순서 보장: 파티션 내 메시지 순서 유지
- 로드 밸런싱: 메시지 분산 처리
파티션 키 (Partition Key)
파티션 키는 메시지가 어느 파티션으로 전송될지 결정하는 값입니다. 동일한 키를 가진 메시지는 항상 같은 파티션으로 전송되어 순서가 보장됩니다.
파티션 할당 예시
2.2 Consumer Group
Consumer Group이란?
Consumer Group은 동일한 Topic을 소비하는 Consumer들의 논리적 그룹입니다. 그룹 내의 각 Consumer는 서로 다른 파티션을 담당하여 병렬 처리를 가능하게 합니다.
Consumer Group 특징
- 파티션 독점: 각 파티션은 그룹 내 하나의 Consumer만 소비
- 자동 리밸런싱: Consumer 추가/제거 시 파티션 재할당
- 오프셋 관리: 그룹별로 독립적인 오프셋 관리
- 확장성: Consumer 수를 조정하여 처리량 제어
Consumer Group 동작 방식
시나리오 1: Consumer 수 = 파티션 수
최적 상태: 모든 Consumer가 활성화
시나리오 2: Consumer 수 > 파티션 수
비효율: 일부 Consumer가 유휴 상태
리밸런싱 (Rebalancing)
리밸런싱 발생 조건
- Consumer Group에 새로운 Consumer 추가
- 기존 Consumer가 그룹에서 제거 (장애, 종료)
- Topic에 새로운 파티션 추가
- Consumer의 하트비트 타임아웃
리밸런싱 과정
- Group Coordinator가 리밸런싱 시작 신호
- 모든 Consumer가 현재 파티션 소비 중단
- 파티션 재할당 계산 및 통지
- Consumer들이 새로운 파티션으로 소비 재시작
2.3 Offset
Offset이란?
Offset은 파티션 내에서 각 메시지의 고유한 순차 번호입니다. Consumer가 어디까지 메시지를 읽었는지 추적하는 포인터 역할을 합니다.
Offset 구조
Offset 관리 방식
자동 커밋 (Auto Commit)
- 설정된 간격으로 자동 오프셋 커밋
- 간단한 설정, 개발 편의성
- 메시지 중복/손실 가능성
- 기본값: 5초 간격
enable.auto.commit=true
auto.commit.interval.ms=5000수동 커밋 (Manual Commit)
- 애플리케이션에서 직접 커밋 제어
- 정확한 처리 보장
- 복잡한 구현 필요
- At-least-once 보장
enable.auto.commit=false
consumer.commitSync()Offset 저장소
__consumer_offsets Topic
Kafka는 Consumer Group별 오프셋 정보를 내부 토픽인 __consumer_offsets에 저장합니다.
저장 정보
- Consumer Group ID
- Topic 이름
- 파티션 번호
- 커밋된 오프셋
- 메타데이터 (타임스탬프 등)
2.4 Kafka 아키텍처
전체 아키텍처 구조
Producer
메시지 생성자
- • 메시지 발행
- • 파티션 선택
- • 배치 처리
- • 압축
Kafka Cluster
브로커 집합
- • 메시지 저장
- • 복제 관리
- • 리더 선출
- • 메타데이터 관리
Consumer
메시지 소비자
- • 메시지 소비
- • 오프셋 관리
- • 그룹 관리
- • 리밸런싱
핵심 컴포넌트
Broker
- Kafka 서버 인스턴스
- 파티션 데이터 저장
- Producer/Consumer 요청 처리
- 복제 및 동기화
ZooKeeper
- 클러스터 메타데이터 관리
- 브로커 상태 모니터링
- 리더 선출 조정
- 설정 정보 저장
Controller
- 클러스터 관리 담당 브로커
- 파티션 리더 선출
- 브로커 추가/제거 처리
- 메타데이터 변경 전파
Group Coordinator
- Consumer Group 관리
- 오프셋 커밋 처리
- 리밸런싱 조정
- 멤버십 관리
데이터 흐름
1. 메시지 발행 (Produce)
2. 메시지 소비 (Consume)
2.5 핵심 요약
Kafka 핵심 개념
데이터 구조
- Topic: 메시지 분류 단위
- Partition: 물리적 분할 단위
- Offset: 메시지 위치 식별자
처리 방식
- Consumer Group: 병렬 처리
- Rebalancing: 동적 할당
- Commit: 진행 상황 저장
설계 고려사항
파티션 수는 예상 처리량과 Consumer 수를 고려하여 결정하고, Consumer Group 설계 시 리밸런싱 비용과 처리 순서 요구사항을 균형있게 고려해야 합니다. 오프셋 관리 방식은 데이터 일관성 요구사항에 따라 선택합니다.
3. Spring Kafka 설정
3.1 Spring Kafka 기본 설정
의존성 추가
<!-- Maven -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Gradle -->
implementation 'org.springframework.kafka:spring-kafka'application.yml 설정
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false3.2 KafkaTemplate 설정
기본 KafkaTemplate
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}JSON 직렬화 설정
@Bean
public ProducerFactory<String, Object> jsonProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> jsonKafkaTemplate() {
return new KafkaTemplate<>(jsonProducerFactory());
}3.3 @KafkaListener 설정
Consumer Factory 설정
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 동시 처리 스레드 수
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}기본 리스너 구현
@Component
public class MessageListener {
@KafkaListener(topics = "user-events", groupId = "user-service")
public void listen(String message) {
log.info("Received message: {}", message);
// 메시지 처리 로직
}
@KafkaListener(topics = "order-events", groupId = "order-service")
public void listenWithHeaders(
@Payload String message,
@Header("eventType") String eventType,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Received message: {} from topic: {}, partition: {}, offset: {}",
message, topic, partition, offset);
}
}3.4 설정 최적화
Producer 최적화
성능 최적화 설정
// 처리량 최적화
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
// 안정성 최적화
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);Consumer 최적화
처리량 최적화 설정
// 배치 처리 최적화
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// 메모리 최적화
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536);
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, 131072);
// 세션 관리
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);환경별 설정 관리
개발 환경
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
acks: 1
retries: 0
consumer:
auto-offset-reset: latest
enable-auto-commit: true운영 환경
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
producer:
acks: all
retries: 2147483647
enable-idempotence: true
consumer:
auto-offset-reset: earliest
enable-auto-commit: false3.5 핵심 요약
Spring Kafka 설정 포인트
Producer 설정
- KafkaTemplate을 통한 메시지 발송
- 직렬화 방식 선택 (String, JSON)
- 배치 처리 및 압축 설정
Consumer 설정
- @KafkaListener 어노테이션 활용
- 동시성 및 오프셋 관리
- 에러 처리 및 재시도 설정
4. Producer 구현
4.1 메시지 전송
기본 메시지 전송
@Service
public class MessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// 단순 전송
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
// 키와 함께 전송
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
// 파티션 지정 전송
public void sendMessage(String topic, int partition, String key, String message) {
kafkaTemplate.send(topic, partition, key, message);
}
}헤더와 함께 전송
public void sendMessageWithHeaders(String topic, String key, String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
// 헤더 추가
record.headers().add("eventType", "USER_CREATED".getBytes());
record.headers().add("version", "1.0".getBytes());
record.headers().add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
kafkaTemplate.send(record);
}
// JSON 객체 전송
public void sendJsonMessage(String topic, String key, Object payload) {
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, payload);
record.headers().add("contentType", "application/json".getBytes());
jsonKafkaTemplate.send(record);
}4.2 콜백 처리
ListenableFuture 콜백
public void sendMessageWithCallback(String topic, String key, String message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
RecordMetadata metadata = result.getRecordMetadata();
log.info("Message sent successfully to topic: {}, partition: {}, offset: {}",
metadata.topic(), metadata.partition(), metadata.offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("Failed to send message to topic: {}", topic, ex);
// 실패 처리 로직 (재시도, 알림 등)
}
});
}CompletableFuture 활용
public CompletableFuture<SendResult<String, String>> sendMessageAsync(
String topic, String key, String message) {
return kafkaTemplate.send(topic, key, message)
.completable()
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to send message", ex);
// 메트릭 업데이트, 알림 등
updateFailureMetrics(topic);
} else {
log.info("Message sent: {}", result.getRecordMetadata());
updateSuccessMetrics(topic);
}
});
}
// 배치 전송
public List<CompletableFuture<SendResult<String, String>>> sendBatchMessages(
String topic, List<MessageDto> messages) {
return messages.stream()
.map(msg -> sendMessageAsync(topic, msg.getKey(), msg.getValue()))
.collect(Collectors.toList());
}4.3 트랜잭션 처리
트랜잭션 설정
@Configuration
public class KafkaTransactionConfig {
@Bean
public ProducerFactory<String, String> transactionalProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 트랜잭션 설정
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> transactionalKafkaTemplate() {
return new KafkaTemplate<>(transactionalProducerFactory());
}
}트랜잭션 사용
@Service
@Transactional
public class TransactionalMessageService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final UserRepository userRepository;
// 데이터베이스와 Kafka 트랜잭션 연동
@KafkaTransactionManager
public void createUserWithEvent(UserDto userDto) {
// 1. 데이터베이스 저장
User user = userRepository.save(new User(userDto));
// 2. Kafka 메시지 전송 (같은 트랜잭션)
kafkaTemplate.send("user-events", user.getId(),
createUserEvent(user));
// 3. 추가 이벤트 전송
kafkaTemplate.send("audit-events", user.getId(),
createAuditEvent("USER_CREATED", user));
}
// 수동 트랜잭션 관리
public void sendMessagesInTransaction(List<MessageDto> messages) {
kafkaTemplate.executeInTransaction(template -> {
for (MessageDto msg : messages) {
template.send(msg.getTopic(), msg.getKey(), msg.getValue());
}
return null;
});
}
}4.4 직렬화 (Serialization)
커스텀 직렬화
public class UserEventSerializer implements Serializer<UserEvent> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, UserEvent data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing UserEvent", e);
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 설정 초기화
}
@Override
public void close() {
// 리소스 정리
}
}
// Avro 직렬화 예시
public class AvroUserEventSerializer implements Serializer<UserEvent> {
private final SpecificDatumWriter<UserEvent> writer =
new SpecificDatumWriter<>(UserEvent.class);
@Override
public byte[] serialize(String topic, UserEvent data) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(data, encoder);
encoder.flush();
return out.toByteArray();
} catch (IOException e) {
throw new SerializationException("Error serializing Avro UserEvent", e);
}
}
}스키마 레지스트리 연동
Confluent Schema Registry
@Configuration
public class SchemaRegistryConfig {
@Bean
public ProducerFactory<String, UserEvent> avroProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
props.put("auto.register.schemas", false);
props.put("use.latest.version", true);
return new DefaultKafkaProducerFactory<>(props);
}
}4.5 핵심 요약
Producer 구현 포인트
메시지 전송
- KafkaTemplate을 통한 다양한 전송 방식
- 헤더와 메타데이터 활용
- 비동기 콜백 처리
고급 기능
- 트랜잭션을 통한 일관성 보장
- 커스텀 직렬화 구현
- 스키마 레지스트리 연동
5. Consumer 구현
5.1 메시지 수신
기본 Consumer 구현
@Component
public class UserEventConsumer {
private static final Logger log = LoggerFactory.getLogger(UserEventConsumer.class);
@KafkaListener(topics = "user-events", groupId = "user-service")
public void handleUserEvent(String message) {
log.info("Received user event: {}", message);
// 메시지 처리 로직
processUserEvent(message);
}
@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrderEvent(
@Payload String message,
@Header("eventType") String eventType,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
try {
log.info("Processing {} event from {}:{} at offset {}",
eventType, topic, partition, offset);
processOrderEvent(message, eventType);
// 수동 커밋
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process order event", e);
// 에러 처리 로직
}
}
}배치 처리
@KafkaListener(topics = "batch-events",
groupId = "batch-processor",
containerFactory = "batchKafkaListenerContainerFactory")
public void handleBatchEvents(List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
log.info("Received batch of {} messages", records.size());
try {
List<EventDto> events = records.stream()
.map(record -> parseEvent(record.value()))
.collect(Collectors.toList());
// 배치 처리
processBatchEvents(events);
// 배치 커밋
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process batch events", e);
// 배치 실패 처리
}
}
// JSON 역직렬화
@KafkaListener(topics = "json-events", groupId = "json-processor")
public void handleJsonEvent(@Payload UserEvent event,
ConsumerRecord<String, UserEvent> record) {
log.info("Received user event: {} for user: {}",
event.getEventType(), event.getUserId());
// 타입 안전한 처리
switch (event.getEventType()) {
case USER_CREATED:
handleUserCreated(event);
break;
case USER_UPDATED:
handleUserUpdated(event);
break;
case USER_DELETED:
handleUserDeleted(event);
break;
}
}5.2 에러 처리
에러 핸들러 설정
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 에러 핸들러 설정
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 3L))); // 1초 간격으로 3번 재시도
return factory;
}
// 커스텀 에러 핸들러
@Component
public class KafkaErrorHandler implements CommonErrorHandler {
@Override
public void handleOtherException(Exception thrownException,
Consumer<?, ?> consumer,
MessageListenerContainer container,
boolean batchListener) {
log.error("Kafka consumer error", thrownException);
// 메트릭 업데이트
updateErrorMetrics(thrownException);
// 알림 발송
sendErrorNotification(thrownException);
}
}예외별 처리 전략
예외 분류 및 처리
@KafkaListener(topics = "critical-events", groupId = "critical-processor")
public void handleCriticalEvent(String message, Acknowledgment ack) {
try {
processCriticalEvent(message);
ack.acknowledge();
} catch (ValidationException e) {
// 검증 오류 - 재시도 불필요
log.warn("Invalid message format: {}", message, e);
sendToDeadLetterQueue(message, e);
ack.acknowledge(); // 스킵
} catch (BusinessException e) {
// 비즈니스 로직 오류 - 재시도 가능
log.error("Business logic error: {}", message, e);
throw e; // 재시도 트리거
} catch (ExternalServiceException e) {
// 외부 서비스 오류 - 지연 후 재시도
log.error("External service error: {}", message, e);
scheduleDelayedRetry(message, e);
ack.acknowledge();
} catch (Exception e) {
// 예상치 못한 오류
log.error("Unexpected error: {}", message, e);
sendToDeadLetterQueue(message, e);
ack.acknowledge();
}
}5.3 재시도 메커니즘
재시도 설정
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
retryKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 지수 백오프 재시도
ExponentialBackOff backOff = new ExponentialBackOff();
backOff.setInitialInterval(1000L); // 1초
backOff.setMultiplier(2.0); // 2배씩 증가
backOff.setMaxInterval(10000L); // 최대 10초
backOff.setMaxElapsedTime(60000L); // 최대 1분
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(record, exception) -> {
// 최종 실패 시 처리
log.error("Final retry failed for record: {}", record, exception);
sendToDeadLetterQueue(record, exception);
}, backOff);
// 재시도하지 않을 예외 설정
errorHandler.addNotRetryableExceptions(ValidationException.class);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
// 조건부 재시도
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
dltStrategy = DltStrategy.FAIL_ON_ERROR,
include = {TransientException.class}
)
@KafkaListener(topics = "retry-events", groupId = "retry-processor")
public void handleRetryableEvent(String message) {
log.info("Processing retryable event: {}", message);
if (shouldFail()) {
throw new TransientException("Temporary failure");
}
processEvent(message);
}커스텀 재시도 로직
@Component
public class RetryableMessageProcessor {
private final RetryTemplate retryTemplate;
public RetryableMessageProcessor() {
this.retryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2, 10000)
.retryOn(TransientException.class)
.build();
}
@KafkaListener(topics = "custom-retry-events", groupId = "custom-retry")
public void handleMessage(String message, Acknowledgment ack) {
try {
retryTemplate.execute(context -> {
log.info("Attempt {} for message: {}",
context.getRetryCount() + 1, message);
processMessage(message);
return null;
});
ack.acknowledge();
} catch (Exception e) {
log.error("All retry attempts failed for message: {}", message, e);
handleFinalFailure(message, e);
ack.acknowledge();
}
}
private void handleFinalFailure(String message, Exception e) {
// DLQ 전송
kafkaTemplate.send("failed-events-dlq", message);
// 알림 발송
notificationService.sendFailureAlert(message, e);
// 메트릭 업데이트
meterRegistry.counter("kafka.consumer.final_failure").increment();
}
}5.4 DLQ (Dead Letter Queue)
DLQ 설정
DLQ 구현
@Component
public class DeadLetterQueueHandler {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendToDeadLetterQueue(ConsumerRecord<String, String> record,
Exception exception) {
String dlqTopic = record.topic() + ".DLQ";
ProducerRecord<String, String> dlqRecord =
new ProducerRecord<>(dlqTopic, record.key(), record.value());
// 원본 메타데이터 보존
dlqRecord.headers().add("original.topic", record.topic().getBytes());
dlqRecord.headers().add("original.partition",
String.valueOf(record.partition()).getBytes());
dlqRecord.headers().add("original.offset",
String.valueOf(record.offset()).getBytes());
dlqRecord.headers().add("failure.reason",
exception.getMessage().getBytes());
dlqRecord.headers().add("failure.timestamp",
String.valueOf(System.currentTimeMillis()).getBytes());
dlqRecord.headers().add("failure.class",
exception.getClass().getName().getBytes());
kafkaTemplate.send(dlqRecord);
log.warn("Message sent to DLQ: topic={}, key={}, reason={}",
dlqTopic, record.key(), exception.getMessage());
}
}
// DLQ 메시지 처리
@KafkaListener(topics = "*.DLQ", groupId = "dlq-processor")
public void handleDeadLetterMessage(ConsumerRecord<String, String> record) {
String originalTopic = new String(record.headers().lastHeader("original.topic").value());
String failureReason = new String(record.headers().lastHeader("failure.reason").value());
log.info("Processing DLQ message from original topic: {}, reason: {}",
originalTopic, failureReason);
// DLQ 메시지 분석 및 처리
analyzeDlqMessage(record);
// 필요시 수동 재처리
if (canRetryMessage(record)) {
retryOriginalMessage(record);
}
}DLQ 모니터링
@Component
public class DlqMonitoringService {
private final MeterRegistry meterRegistry;
@EventListener
public void handleDlqEvent(DlqMessageEvent event) {
// 메트릭 업데이트
meterRegistry.counter("kafka.dlq.messages",
"topic", event.getOriginalTopic(),
"reason", event.getFailureReason())
.increment();
// 임계치 확인
checkDlqThreshold(event.getOriginalTopic());
}
@Scheduled(fixedRate = 60000) // 1분마다
public void monitorDlqSize() {
// DLQ 크기 모니터링
Map<String, Long> dlqSizes = getDlqTopicSizes();
dlqSizes.forEach((topic, size) -> {
meterRegistry.gauge("kafka.dlq.size", Tags.of("topic", topic), size);
if (size > DLQ_SIZE_THRESHOLD) {
alertService.sendDlqSizeAlert(topic, size);
}
});
}
}5.5 핵심 요약
Consumer 구현 포인트
메시지 처리
- @KafkaListener를 통한 선언적 처리
- 배치 처리 및 수동 커밋
- 헤더와 메타데이터 활용
안정성 확보
- 예외별 차별화된 에러 처리
- 지능적 재시도 메커니즘
- DLQ를 통한 실패 메시지 관리
6. 실전 활용
6.1 이벤트 기반 아키텍처
도메인 이벤트 설계
// 도메인 이벤트 정의
public abstract class DomainEvent {
private final String eventId;
private final LocalDateTime occurredAt;
private final String aggregateId;
private final String eventType;
protected DomainEvent(String aggregateId, String eventType) {
this.eventId = UUID.randomUUID().toString();
this.occurredAt = LocalDateTime.now();
this.aggregateId = aggregateId;
this.eventType = eventType;
}
}
// 구체적인 이벤트
public class UserCreatedEvent extends DomainEvent {
private final String userId;
private final String email;
private final String name;
public UserCreatedEvent(String userId, String email, String name) {
super(userId, "USER_CREATED");
this.userId = userId;
this.email = email;
this.name = name;
}
}
public class OrderPlacedEvent extends DomainEvent {
private final String orderId;
private final String customerId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
public OrderPlacedEvent(String orderId, String customerId,
List<OrderItem> items, BigDecimal totalAmount) {
super(orderId, "ORDER_PLACED");
this.orderId = orderId;
this.customerId = customerId;
this.items = items;
this.totalAmount = totalAmount;
}
}이벤트 발행자
@Component
public class DomainEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
public void publishEvent(DomainEvent event) {
try {
String topic = getTopicForEvent(event.getEventType());
String key = event.getAggregateId();
ProducerRecord<String, Object> record =
new ProducerRecord<>(topic, key, event);
// 이벤트 메타데이터 추가
record.headers().add("eventId", event.getEventId().getBytes());
record.headers().add("eventType", event.getEventType().getBytes());
record.headers().add("occurredAt",
event.getOccurredAt().toString().getBytes());
record.headers().add("aggregateId", event.getAggregateId().getBytes());
kafkaTemplate.send(record)
.addCallback(
result -> log.info("Event published: {}", event.getEventId()),
failure -> log.error("Failed to publish event: {}",
event.getEventId(), failure)
);
} catch (Exception e) {
log.error("Error publishing event: {}", event, e);
throw new EventPublishingException("Failed to publish event", e);
}
}
private String getTopicForEvent(String eventType) {
return switch (eventType) {
case "USER_CREATED", "USER_UPDATED", "USER_DELETED" -> "user-events";
case "ORDER_PLACED", "ORDER_CANCELLED", "ORDER_SHIPPED" -> "order-events";
case "PAYMENT_PROCESSED", "PAYMENT_FAILED" -> "payment-events";
default -> "domain-events";
};
}
}6.2 주문 처리 시스템
주문 서비스
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final DomainEventPublisher eventPublisher;
public Order createOrder(CreateOrderRequest request) {
// 1. 주문 생성
Order order = Order.builder()
.customerId(request.getCustomerId())
.items(request.getItems())
.totalAmount(calculateTotal(request.getItems()))
.status(OrderStatus.PENDING)
.build();
Order savedOrder = orderRepository.save(order);
// 2. 도메인 이벤트 발행
OrderPlacedEvent event = new OrderPlacedEvent(
savedOrder.getId(),
savedOrder.getCustomerId(),
savedOrder.getItems(),
savedOrder.getTotalAmount()
);
eventPublisher.publishEvent(event);
return savedOrder;
}
public void cancelOrder(String orderId, String reason) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.cancel(reason);
orderRepository.save(order);
// 주문 취소 이벤트 발행
OrderCancelledEvent event = new OrderCancelledEvent(
orderId, reason, LocalDateTime.now()
);
eventPublisher.publishEvent(event);
}
}재고 서비스
@Component
public class InventoryEventHandler {
private final InventoryService inventoryService;
private final DomainEventPublisher eventPublisher;
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderEvent(OrderPlacedEvent event) {
try {
log.info("Processing inventory for order: {}", event.getOrderId());
// 재고 확인 및 차감
List<InventoryReservation> reservations =
inventoryService.reserveItems(event.getItems());
if (reservations.stream().allMatch(InventoryReservation::isSuccessful)) {
// 재고 확보 성공
InventoryReservedEvent reservedEvent = new InventoryReservedEvent(
event.getOrderId(), reservations
);
eventPublisher.publishEvent(reservedEvent);
} else {
// 재고 부족
InventoryReservationFailedEvent failedEvent =
new InventoryReservationFailedEvent(
event.getOrderId(),
getFailedItems(reservations)
);
eventPublisher.publishEvent(failedEvent);
}
} catch (Exception e) {
log.error("Failed to process inventory for order: {}",
event.getOrderId(), e);
InventoryProcessingFailedEvent errorEvent =
new InventoryProcessingFailedEvent(
event.getOrderId(), e.getMessage()
);
eventPublisher.publishEvent(errorEvent);
}
}
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderCancellation(OrderCancelledEvent event) {
log.info("Releasing inventory for cancelled order: {}", event.getOrderId());
inventoryService.releaseReservation(event.getOrderId());
InventoryReleasedEvent releasedEvent = new InventoryReleasedEvent(
event.getOrderId(), LocalDateTime.now()
);
eventPublisher.publishEvent(releasedEvent);
}
}6.3 SAGA 패턴
SAGA 오케스트레이터
주문 처리 SAGA
@Component
public class OrderProcessingSaga {
private final SagaStateRepository sagaStateRepository;
private final DomainEventPublisher eventPublisher;
@KafkaListener(topics = "order-events", groupId = "order-saga")
public void handleOrderPlaced(OrderPlacedEvent event) {
// SAGA 상태 초기화
SagaState sagaState = SagaState.builder()
.sagaId(UUID.randomUUID().toString())
.orderId(event.getOrderId())
.currentStep(SagaStep.INVENTORY_RESERVATION)
.status(SagaStatus.IN_PROGRESS)
.build();
sagaStateRepository.save(sagaState);
// 첫 번째 단계: 재고 예약 요청
RequestInventoryReservationCommand command =
new RequestInventoryReservationCommand(
sagaState.getSagaId(),
event.getOrderId(),
event.getItems()
);
eventPublisher.publishEvent(command);
}
@KafkaListener(topics = "inventory-events", groupId = "order-saga")
public void handleInventoryReserved(InventoryReservedEvent event) {
SagaState sagaState = sagaStateRepository.findByOrderId(event.getOrderId())
.orElseThrow(() -> new SagaNotFoundException(event.getOrderId()));
// 다음 단계: 결제 처리
sagaState.moveToNextStep(SagaStep.PAYMENT_PROCESSING);
sagaStateRepository.save(sagaState);
ProcessPaymentCommand command = new ProcessPaymentCommand(
sagaState.getSagaId(),
event.getOrderId(),
sagaState.getPaymentAmount()
);
eventPublisher.publishEvent(command);
}
@KafkaListener(topics = "payment-events", groupId = "order-saga")
public void handlePaymentProcessed(PaymentProcessedEvent event) {
SagaState sagaState = sagaStateRepository.findByOrderId(event.getOrderId())
.orElseThrow(() -> new SagaNotFoundException(event.getOrderId()));
// 최종 단계: 주문 확정
sagaState.complete();
sagaStateRepository.save(sagaState);
ConfirmOrderCommand command = new ConfirmOrderCommand(
sagaState.getSagaId(),
event.getOrderId()
);
eventPublisher.publishEvent(command);
}
// 보상 트랜잭션 처리
@KafkaListener(topics = "inventory-events", groupId = "order-saga")
public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
SagaState sagaState = sagaStateRepository.findByOrderId(event.getOrderId())
.orElseThrow(() -> new SagaNotFoundException(event.getOrderId()));
// SAGA 실패 처리
sagaState.fail("Inventory reservation failed");
sagaStateRepository.save(sagaState);
// 주문 취소
CancelOrderCommand command = new CancelOrderCommand(
sagaState.getSagaId(),
event.getOrderId(),
"Insufficient inventory"
);
eventPublisher.publishEvent(command);
}
}보상 트랜잭션
실패 시 롤백 처리
@Component
public class CompensationHandler {
@KafkaListener(topics = "payment-events", groupId = "compensation-handler")
public void handlePaymentFailed(PaymentFailedEvent event) {
log.info("Starting compensation for failed payment: {}", event.getOrderId());
// 1. 재고 예약 해제
ReleaseInventoryReservationCommand releaseCommand =
new ReleaseInventoryReservationCommand(
event.getOrderId(),
"Payment failed"
);
eventPublisher.publishEvent(releaseCommand);
// 2. 주문 상태 변경
CancelOrderCommand cancelCommand = new CancelOrderCommand(
event.getOrderId(),
"Payment processing failed"
);
eventPublisher.publishEvent(cancelCommand);
// 3. 고객 알림
NotifyCustomerCommand notifyCommand = new NotifyCustomerCommand(
event.getCustomerId(),
"ORDER_PAYMENT_FAILED",
Map.of("orderId", event.getOrderId(), "reason", event.getFailureReason())
);
eventPublisher.publishEvent(notifyCommand);
}
@KafkaListener(topics = "saga-events", groupId = "compensation-handler")
public void handleSagaTimeout(SagaTimeoutEvent event) {
log.warn("SAGA timeout detected: {}", event.getSagaId());
SagaState sagaState = sagaStateRepository.findById(event.getSagaId())
.orElseThrow(() -> new SagaNotFoundException(event.getSagaId()));
// 현재 단계에 따른 보상 처리
switch (sagaState.getCurrentStep()) {
case INVENTORY_RESERVATION:
// 재고 예약 중 타임아웃 - 주문 취소만
cancelOrder(sagaState.getOrderId(), "Timeout during inventory reservation");
break;
case PAYMENT_PROCESSING:
// 결제 중 타임아웃 - 재고 해제 + 주문 취소
releaseInventoryAndCancelOrder(sagaState.getOrderId(), "Timeout during payment");
break;
case ORDER_CONFIRMATION:
// 확정 중 타임아웃 - 수동 확인 필요
escalateToManualReview(sagaState.getOrderId(), "Timeout during confirmation");
break;
}
sagaState.timeout();
sagaStateRepository.save(sagaState);
}
}SAGA 상태 관리
@Entity
public class SagaState {
@Id
private String sagaId;
private String orderId;
private SagaStep currentStep;
private SagaStatus status;
private LocalDateTime startedAt;
private LocalDateTime completedAt;
private String failureReason;
@ElementCollection
@MapKeyColumn(name = "step_name")
@Column(name = "step_status")
private Map<String, String> stepStatuses = new HashMap<>();
public void moveToNextStep(SagaStep nextStep) {
this.stepStatuses.put(this.currentStep.name(), "COMPLETED");
this.currentStep = nextStep;
this.stepStatuses.put(nextStep.name(), "IN_PROGRESS");
}
public void complete() {
this.status = SagaStatus.COMPLETED;
this.completedAt = LocalDateTime.now();
this.stepStatuses.put(this.currentStep.name(), "COMPLETED");
}
public void fail(String reason) {
this.status = SagaStatus.FAILED;
this.failureReason = reason;
this.completedAt = LocalDateTime.now();
this.stepStatuses.put(this.currentStep.name(), "FAILED");
}
}
// SAGA 모니터링
@Component
public class SagaMonitor {
@Scheduled(fixedRate = 30000) // 30초마다
public void checkSagaTimeouts() {
LocalDateTime timeoutThreshold = LocalDateTime.now().minusMinutes(5);
List<SagaState> timedOutSagas = sagaStateRepository
.findByStatusAndStartedAtBefore(SagaStatus.IN_PROGRESS, timeoutThreshold);
timedOutSagas.forEach(saga -> {
log.warn("SAGA timeout detected: {}", saga.getSagaId());
SagaTimeoutEvent timeoutEvent = new SagaTimeoutEvent(
saga.getSagaId(),
saga.getOrderId(),
saga.getCurrentStep()
);
eventPublisher.publishEvent(timeoutEvent);
});
}
}6.4 핵심 요약
실전 활용 패턴
이벤트 기반 설계
- 도메인 이벤트를 통한 서비스 간 통신
- 느슨한 결합과 높은 확장성
- 이벤트 소싱과 CQRS 패턴
분산 트랜잭션
- SAGA 패턴을 통한 일관성 보장
- 보상 트랜잭션으로 롤백 처리
- 상태 관리와 타임아웃 처리
7. 정리
7.1 Kafka 설계 가이드
토픽 설계 원칙
토픽 명명 규칙
user-events, order-events, payment-eventsprod.user-events, dev.user-eventsuser-events-v1, user-events-v2user-events.DLQ, user-events.retry파티션 수 결정
- 처리량 기준: 예상 TPS × 안전 계수 (2-3배)
- Consumer 수: 최대 Consumer 수와 동일하게 설정
- 순서 보장: 순서가 중요한 경우 파티션 수 최소화
- 확장성: 향후 확장을 고려하여 여유있게 설정
성능 최적화 가이드
Producer 최적화
- 배치 크기: 16KB-64KB 권장
- 압축: snappy 또는 lz4 사용
- Linger 시간: 지연 허용 범위 내에서 설정
- Idempotence: 중복 방지를 위해 활성화
Consumer 최적화
- Fetch 크기: 네트워크 대역폭에 맞게 조정
- Poll 레코드: 처리 능력에 맞게 제한
- 세션 타임아웃: 리밸런싱 빈도 고려
- 동시성: CPU 코어 수에 맞게 설정
7.2 모니터링
핵심 메트릭
| 구분 | 메트릭 | 설명 | 임계값 |
|---|---|---|---|
| Producer | record-send-rate | 초당 전송 메시지 수 | 예상 TPS 대비 모니터링 |
| Producer | record-error-rate | 전송 실패율 | < 1% |
| Consumer | records-consumed-rate | 초당 소비 메시지 수 | Producer 대비 균형 |
| Consumer | records-lag-max | 최대 지연 메시지 수 | < 1000 |
| Broker | UnderReplicatedPartitions | 복제 부족 파티션 | = 0 |
모니터링 구현
@Component
public class KafkaMetricsCollector {
private final MeterRegistry meterRegistry;
private final KafkaTemplate<String, String> kafkaTemplate;
@EventListener
public void handleProducerMetrics(ProducerMetricsEvent event) {
// Producer 메트릭 수집
meterRegistry.counter("kafka.producer.records.sent",
"topic", event.getTopic())
.increment();
meterRegistry.timer("kafka.producer.send.duration",
"topic", event.getTopic())
.record(event.getDuration(), TimeUnit.MILLISECONDS);
}
@EventListener
public void handleConsumerMetrics(ConsumerMetricsEvent event) {
// Consumer 메트릭 수집
meterRegistry.counter("kafka.consumer.records.consumed",
"topic", event.getTopic(),
"group", event.getGroupId())
.increment();
meterRegistry.gauge("kafka.consumer.lag",
Tags.of("topic", event.getTopic(),
"partition", String.valueOf(event.getPartition())),
event.getLag());
}
@Scheduled(fixedRate = 30000) // 30초마다
public void collectBrokerMetrics() {
// JMX를 통한 브로커 메트릭 수집
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
// Under-replicated partitions
ObjectName objectName = new ObjectName(
"kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
Integer underReplicated = (Integer) server.getAttribute(objectName, "Value");
meterRegistry.gauge("kafka.broker.under_replicated_partitions", underReplicated);
// Request rate
ObjectName requestName = new ObjectName(
"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce");
Double requestRate = (Double) server.getAttribute(requestName, "OneMinuteRate");
meterRegistry.gauge("kafka.broker.produce_requests_per_sec", requestRate);
} catch (Exception e) {
log.error("Failed to collect broker metrics", e);
}
}
}7.3 베스트 프랙티스
개발 가이드라인
✅ 권장사항
- 멱등성 보장을 위한 고유 키 사용
- 스키마 진화를 고려한 메시지 설계
- 적절한 파티션 키 선택으로 순서 보장
- DLQ를 통한 실패 메시지 처리
- 환경별 설정 분리
- 메트릭과 로깅을 통한 모니터링
- 트랜잭션 범위 최소화
- 배치 처리로 성능 최적화
❌ 주의사항
- 과도한 파티션 수로 인한 오버헤드
- 큰 메시지 크기로 인한 성능 저하
- 동기식 처리로 인한 블로킹
- 무한 재시도로 인한 리소스 낭비
- 스키마 호환성 무시
- 에러 처리 로직 누락
- Consumer Group 설계 미흡
- 오프셋 관리 소홀
운영 가이드라인
운영 체크리스트
배포 전 확인
- 토픽 생성 및 파티션 수 확인
- Consumer Group 설정 검증
- 스키마 호환성 테스트
- 성능 테스트 수행
- 장애 시나리오 테스트
운영 중 모니터링
- Consumer Lag 모니터링
- 에러율 및 DLQ 크기 확인
- 브로커 리소스 사용률
- 네트워크 및 디스크 I/O
- 리밸런싱 빈도 확인
트러블슈팅 가이드
일반적인 문제와 해결책
7.4 최종 요약
Spring Kafka 마스터리 로드맵
기초 단계
- • Kafka 개념 이해
- • Spring Kafka 설정
- • 기본 Producer/Consumer
- • 에러 처리 구현
중급 단계
- • 트랜잭션 처리
- • 커스텀 직렬화
- • 배치 처리 최적화
- • 모니터링 구현
고급 단계
- • 이벤트 기반 아키텍처
- • SAGA 패턴 구현
- • 스키마 레지스트리
- • 운영 최적화
핵심 가치
Spring Kafka는 단순한 메시징 도구를 넘어 현대적인 분산 시스템의 핵심 인프라입니다. 이벤트 기반 아키텍처를 통해 시스템 간 느슨한 결합을 실현하고, 높은 처리량과 확장성을 제공하며, 복잡한 비즈니스 워크플로우를 안정적으로 관리할 수 있습니다. 올바른 설계와 구현을 통해 견고하고 확장 가능한 시스템을 구축할 수 있습니다.
다음 단계
- 실제 프로젝트에 Spring Kafka 적용
- 성능 테스트 및 최적화 경험
- Kafka Streams 학습
- Confluent Platform 활용
- 클라우드 환경 운영 경험
- 커뮤니티 참여 및 기여