kafka Dead Letter Queue(DLQ, DLT)
개요
AWS SQS에서는 Dead Letter Queue(DLQ)를 손쉽게 제공하고 연결할 수 있도록 합니다.
메시지가 실패하는 경우 재처리를 몇 번 할지도 console에서 간편하게 설정할 수 있습니다.
Kafka의 경우 메시지 처리에 실패하면 Dead Letter Queue(DLQ)를 어떻게 관리할 수 있을까요?
Kafka에서는 DLQ대신 DLT라는 개념을 사용합니다.
DLT는 Dead Letter Topic의 약자입니다.
SQS DLQ에 대해서는 다음글을 참고해 주세요.
https://junuuu.tistory.com/740
Kafka Message Consume에 실패하는 경우
@Service
class KafkaConsumerService {
fun messageOccurredException(){
logger.info{ "KafkaConsumerService Start"}
throw RuntimeException()
}
}
Consumer에서 KafkaConsumerService를 호출하여 메시지를 처리하는 과정에서 예외가 발생했다고 가정해 보겠습니다.
@Component
class KafkaConsumer(
private val kafkaConsumerService: KafkaConsumerService,
) {
val logger = KotlinLogging.logger {}
@KafkaListener(
topics = ["test"],
groupId = "#{ T(java.util.UUID).randomUUID().toString() }",
containerFactory = "myContainerFactory"
)
fun consume(
@Payload payload: TestDto,
acknowledgment: Acknowledgment,
) {
logger.info("Consumer start: $payload")
kafkaConsumerService.messageOccurredException()
acknowledgment.acknowledge()
}
메시지 수신정책에 따라 다르겠지만 Acknowledgment를 통해 메시지 수신을 체크하는 경우로 가정해 보겠습니다.
Consumer는 무한적으로 재처리를 수행할까요?
2023-07-09T17:10:55.534+09:00 ERROR 10537 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler :
Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for test-0@1
실제로 호출해 보면 10번 정도 Consume을 시도한 뒤 더 이상 시도하지 않습니다.
에러메시지를 살펴보면 DefaultErrorHandler가 maxAttempts(최대 시도 횟수)와 currentAttepts(현재 시도 횟수)를 보여줍니다.
이때도 실패하는 경우 해당 메시지를 skip 합니다.
FixedBackOff는 interval(재시도 사이에 대기할 시간), maxAttempts(작업을 포기하기 전 재시도하는 최대 횟수)를 가집니다.
kafka Dead Letter Queue(DLQ) 구현방법
@Bean
fun myContainerFactory(
kafkaTemplate: KafkaTemplate<String, TestDto>
): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
val recover = DeadLetterPublishingRecoverer(kafkaTemplate)
val fixedBackOff = FixedBackOff(1000L, 2L)
val defaultErrorHandler = DefaultErrorHandler(recover, fixedBackOff)
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
factory.consumerFactory = consumerFactory()
factory.setCommonErrorHandler(defaultErrorHandler)
return factory
}
DeafultErrorHandelr를 구현하며 CommonErrorHandler로 지정해 주면 됩니다.
KafkaTemplate도 객체타입을 잘 지정해주어야 DLQ에서 파싱을 할때 문제가 발생하지 않습니다.
이때 1초 간격으로 2회 재시도하고 실패 시 Dead Letter Topic으로 이동합니다.
Topic의 이름은 기본값으로 test.DLT로 들어가게 됩니다.
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
이때 파티션도 지정해주기 때문에 DLQ 파티션과 Queue의 파티션 개수가 다르다면 잘 고려해야 할 것 같습니다.
DLT Consumer 구현
@Component
class KafkaDeadLetterQueueConsumer{
val logger = KotlinLogging.logger {}
@KafkaListener(
topics = ["test.DLT"],
groupId = "myGroupId",
containerFactory = "myContainerFactory"
)
fun consume(
@Payload payload: TestDto,
acknowledgment: Acknowledgment,
) {
logger.info("KafkaDeadLetterQueueConsumer start: $payload")
acknowledgment.acknowledge()
}
}
이후 Producer에서 메시지를 전송하면 1초 간격으로 2회 동안 재시도를 수행하고 Kafka DLQ consumer가 메시지를 읽어 들입니다.
.KafkaDeadLetterQueueConsumer : KafkaDeadLetterQueueConsumer start:
TestDto(col1=test, col2=test)
DLT가 발생하면 error 로그를 발생시키거나, 개발자에게 알림을 주도록 하여 처리할 수 있을 것 같습니다.
참고자료
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
https://www.baeldung.com/spring-retry-kafka-consumer
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/DefaultErrorHandler.html
https://kimmayer.tistory.com/entry/Kafka-offset%EC%97%90-%EB%8C%80%ED%95%B4%EC%84%9C