프로젝트/kafka

kafka Dead Letter Queue(DLQ, DLT)

Junuuu 2023. 8. 7. 00:01
728x90

개요

AWS SQS에서는 Dead Letter Queue(DLQ)를 손쉽게 제공하고 연결할 수 있도록 합니다.

메시지가 실패하는 경우 재처리를 몇 번 할지도 console에서 간편하게 설정할 수 있습니다.

Kafka의 경우 메시지 처리에 실패하면 Dead Letter Queue(DLQ)를 어떻게 관리할 수 있을까요?

Kafka에서는 DLQ대신 DLT라는 개념을 사용합니다.

DLT는 Dead Letter Topic의 약자입니다.

 

SQS DLQ에 대해서는 다음글을 참고해 주세요.

https://junuuu.tistory.com/740

 

[AWS] SQS DLQ 설정하기

DLQ란 무엇인가? DLQ(Dead Letter Queue)는 소프트웨어 시스템에서 오류로 인해 처리할 수 없는 메시지를 임시로 저장하는 특수한 유형의 메시지 대기열입니다. DLQ는 왜 사용할까요? 전송된 메시지가

junuuu.tistory.com

 

 

Kafka Message Consume에 실패하는 경우

@Service
class KafkaConsumerService {

    fun messageOccurredException(){
        logger.info{ "KafkaConsumerService Start"}
        throw RuntimeException()
    }
}

Consumer에서 KafkaConsumerService를 호출하여 메시지를 처리하는 과정에서 예외가 발생했다고 가정해 보겠습니다.

 

 

@Component
class KafkaConsumer(
    private val kafkaConsumerService: KafkaConsumerService,
) {
    val logger = KotlinLogging.logger {}

    @KafkaListener(
        topics = ["test"],
        groupId = "#{ T(java.util.UUID).randomUUID().toString() }",
        containerFactory = "myContainerFactory"
    )
    fun consume(
        @Payload payload: TestDto,
        acknowledgment: Acknowledgment,
    ) {
        logger.info("Consumer start: $payload")
        kafkaConsumerService.messageOccurredException()
        acknowledgment.acknowledge()
    }

메시지 수신정책에 따라 다르겠지만 Acknowledgment를 통해 메시지 수신을 체크하는 경우로 가정해 보겠습니다.

 

 

Consumer는 무한적으로 재처리를 수행할까요?

2023-07-09T17:10:55.534+09:00 ERROR 10537 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   :
Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for test-0@1

실제로 호출해 보면 10번 정도 Consume을 시도한 뒤 더 이상 시도하지 않습니다.

에러메시지를 살펴보면 DefaultErrorHandler가 maxAttempts(최대 시도 횟수)와 currentAttepts(현재 시도 횟수)를 보여줍니다.

이때도 실패하는 경우 해당 메시지를 skip 합니다.

 

FixedBackOff는 interval(재시도 사이에 대기할 시간), maxAttempts(작업을 포기하기 전 재시도하는 최대 횟수)를 가집니다.

 

 

kafka Dead Letter Queue(DLQ) 구현방법

@Bean
    fun myContainerFactory(
        kafkaTemplate: KafkaTemplate<String, TestDto>
    ): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
        val recover = DeadLetterPublishingRecoverer(kafkaTemplate)
        val fixedBackOff = FixedBackOff(1000L, 2L)
        val defaultErrorHandler = DefaultErrorHandler(recover, fixedBackOff)
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
        factory.consumerFactory = consumerFactory()
        factory.setCommonErrorHandler(defaultErrorHandler)
        return factory
    }

DeafultErrorHandelr를 구현하며 CommonErrorHandler로 지정해 주면 됩니다.

KafkaTemplate도 객체타입을 잘 지정해주어야 DLQ에서 파싱을 할때 문제가 발생하지 않습니다.

 

이때 1초 간격으로 2회 재시도하고 실패 시 Dead Letter Topic으로 이동합니다.

Topic의 이름은 기본값으로 test.DLT로 들어가게 됩니다.

DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

 

이때 파티션도 지정해주기 때문에 DLQ 파티션과 Queue의 파티션 개수가 다르다면 잘 고려해야 할 것 같습니다.

 

 

DLT Consumer 구현

@Component
class KafkaDeadLetterQueueConsumer{
    val logger = KotlinLogging.logger {}

    @KafkaListener(
        topics = ["test.DLT"],
        groupId = "myGroupId",
        containerFactory = "myContainerFactory"
    )
    fun consume(
        @Payload payload: TestDto,
        acknowledgment: Acknowledgment,
    ) {
        logger.info("KafkaDeadLetterQueueConsumer start: $payload")
        acknowledgment.acknowledge()
    }
}

이후 Producer에서 메시지를 전송하면 1초 간격으로 2회 동안 재시도를 수행하고 Kafka DLQ consumer가 메시지를 읽어 들입니다.

.KafkaDeadLetterQueueConsumer     : KafkaDeadLetterQueueConsumer start:
TestDto(col1=test, col2=test)

DLT가 발생하면 error 로그를 발생시키거나, 개발자에게 알림을 주도록 하여 처리할 수 있을 것 같습니다.

 

 

참고자료

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

https://www.baeldung.com/spring-retry-kafka-consumer

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/DefaultErrorHandler.html

https://kimmayer.tistory.com/entry/Kafka-offset%EC%97%90-%EB%8C%80%ED%95%B4%EC%84%9C