-
kafka Dead Letter Queue(DLQ, DLT)프로젝트/kafka 2023. 8. 7. 00:01728x90
개요
AWS SQS에서는 Dead Letter Queue(DLQ)를 손쉽게 제공하고 연결할 수 있도록 합니다.
메시지가 실패하는 경우 재처리를 몇 번 할지도 console에서 간편하게 설정할 수 있습니다.
Kafka의 경우 메시지 처리에 실패하면 Dead Letter Queue(DLQ)를 어떻게 관리할 수 있을까요?
Kafka에서는 DLQ대신 DLT라는 개념을 사용합니다.
DLT는 Dead Letter Topic의 약자입니다.
SQS DLQ에 대해서는 다음글을 참고해 주세요.
https://junuuu.tistory.com/740
Kafka Message Consume에 실패하는 경우
@Service class KafkaConsumerService { fun messageOccurredException(){ logger.info{ "KafkaConsumerService Start"} throw RuntimeException() } }
Consumer에서 KafkaConsumerService를 호출하여 메시지를 처리하는 과정에서 예외가 발생했다고 가정해 보겠습니다.
@Component class KafkaConsumer( private val kafkaConsumerService: KafkaConsumerService, ) { val logger = KotlinLogging.logger {} @KafkaListener( topics = ["test"], groupId = "#{ T(java.util.UUID).randomUUID().toString() }", containerFactory = "myContainerFactory" ) fun consume( @Payload payload: TestDto, acknowledgment: Acknowledgment, ) { logger.info("Consumer start: $payload") kafkaConsumerService.messageOccurredException() acknowledgment.acknowledge() }
메시지 수신정책에 따라 다르겠지만 Acknowledgment를 통해 메시지 수신을 체크하는 경우로 가정해 보겠습니다.
Consumer는 무한적으로 재처리를 수행할까요?
2023-07-09T17:10:55.534+09:00 ERROR 10537 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for test-0@1
실제로 호출해 보면 10번 정도 Consume을 시도한 뒤 더 이상 시도하지 않습니다.
에러메시지를 살펴보면 DefaultErrorHandler가 maxAttempts(최대 시도 횟수)와 currentAttepts(현재 시도 횟수)를 보여줍니다.
이때도 실패하는 경우 해당 메시지를 skip 합니다.
FixedBackOff는 interval(재시도 사이에 대기할 시간), maxAttempts(작업을 포기하기 전 재시도하는 최대 횟수)를 가집니다.
kafka Dead Letter Queue(DLQ) 구현방법
@Bean fun myContainerFactory( kafkaTemplate: KafkaTemplate<String, TestDto> ): ConcurrentKafkaListenerContainerFactory<String, TestDto> { val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>() val recover = DeadLetterPublishingRecoverer(kafkaTemplate) val fixedBackOff = FixedBackOff(1000L, 2L) val defaultErrorHandler = DefaultErrorHandler(recover, fixedBackOff) factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE factory.consumerFactory = consumerFactory() factory.setCommonErrorHandler(defaultErrorHandler) return factory }
DeafultErrorHandelr를 구현하며 CommonErrorHandler로 지정해 주면 됩니다.
KafkaTemplate도 객체타입을 잘 지정해주어야 DLQ에서 파싱을 할때 문제가 발생하지 않습니다.
이때 1초 간격으로 2회 재시도하고 실패 시 Dead Letter Topic으로 이동합니다.
Topic의 이름은 기본값으로 test.DLT로 들어가게 됩니다.
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
이때 파티션도 지정해주기 때문에 DLQ 파티션과 Queue의 파티션 개수가 다르다면 잘 고려해야 할 것 같습니다.
DLT Consumer 구현
@Component class KafkaDeadLetterQueueConsumer{ val logger = KotlinLogging.logger {} @KafkaListener( topics = ["test.DLT"], groupId = "myGroupId", containerFactory = "myContainerFactory" ) fun consume( @Payload payload: TestDto, acknowledgment: Acknowledgment, ) { logger.info("KafkaDeadLetterQueueConsumer start: $payload") acknowledgment.acknowledge() } }
이후 Producer에서 메시지를 전송하면 1초 간격으로 2회 동안 재시도를 수행하고 Kafka DLQ consumer가 메시지를 읽어 들입니다.
.KafkaDeadLetterQueueConsumer : KafkaDeadLetterQueueConsumer start: TestDto(col1=test, col2=test)
DLT가 발생하면 error 로그를 발생시키거나, 개발자에게 알림을 주도록 하여 처리할 수 있을 것 같습니다.
참고자료
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
https://www.baeldung.com/spring-retry-kafka-consumer
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/DefaultErrorHandler.html
https://kimmayer.tistory.com/entry/Kafka-offset%EC%97%90-%EB%8C%80%ED%95%B4%EC%84%9C
'프로젝트 > kafka' 카테고리의 다른 글
Kafka Consumer 성능 올리기 (0) 2023.08.11 kafka Consumer 그룹 리밸런싱 (0) 2023.08.08 kafka 메시지 삭제 정책 (0) 2023.08.06 Kafka 객체 SERIALIZER, DESERIALIZER (0) 2023.08.05 kafka Consumer group id 랜덤으로 부여 (0) 2023.08.04