프로젝트/kafka

Kafka 객체 SERIALIZER, DESERIALIZER

Junuuu 2023. 8. 5. 00:01
728x90

개요

Kafka를 사용하다 보면 메시지를 Stirng이 아닌 객체(DTO)로 전송하고 싶을 수 있습니다.

이때 Producer, Consumer Config를 알아보겠습니다.

 

테스트하기 위한 환경

  • local에서 실행할 수 있는 SpringBootApplication consumer 1대, producer 1대
  • docker, AWS MSK등으로 구성한 kafka 인프라

TestDTO

data class TestDto(
    val col1: String,
    val col2: String,
)

 

@Configuration
@EnableKafka
class KafkaProducerConfig {
    @Value("\${spring.kafka.producer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVERS: String

    @Bean
    fun factory(): ProducerFactory<String, TestDto> {
        val props: MutableMap<String, Any> = HashMap()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
        return DefaultKafkaProducerFactory(props)

    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, TestDto> {
        return KafkaTemplate(factory())
    }
}

StringSerializer대신 JsonSerializer를 사용합니다.

KafkaTemplate의 Type도 TestDto로 정의해 둡니다.

 

KafkaConsumerConfig

package com.example.study.config

import com.example.study.request.TestDto
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
import org.springframework.kafka.support.serializer.JsonDeserializer

@Configuration
@EnableKafka
class KafkaConsumerConfig {
    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_ADDRESS: String

    @Value("\${spring.kafka.consumer.auto-offset-reset}")
    private lateinit var AUTO_OFFSET_RESET: String

    @Value("\${spring.kafka.consumer.enable-auto-commit}")
    private lateinit var AUTO_COMMIT: String

    @Bean
    fun consumerFactory(): ConsumerFactory<String, TestDto> {

        val deserializer: JsonDeserializer<TestDto> = JsonDeserializer(TestDto::class.java)
//        deserializer.setRemoveTypeHeaders(false);
//        deserializer.addTrustedPackages("*");
//        deserializer.setUseTypeMapperForKey(true);

        val errorHandlingDeserializer = ErrorHandlingDeserializer(deserializer)

        val props: MutableMap<String, Any> = HashMap()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_ADDRESS
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = AUTO_OFFSET_RESET
        props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = AUTO_COMMIT
        return DefaultKafkaConsumerFactory(
            props,
            StringDeserializer(),
            deserializer
//            errorHandlingDeserializer
        )
    }

    @Bean
    fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
        factory.consumerFactory = consumerFactory()
        return factory
    }
}

deserializer와 errorHandlingDeserializer는 주석처리 해두었습니다.

우선 주석을 하여 다음과 같이 설정한 후 deserializer의 옵션과 ErrorHandlingDeserializer를 알아보겠습니다.

 

KafkaConsumer

@Component
class KafkaConsumer {
    val logger = KotlinLogging.logger {}

    @KafkaListener(
        topics = ["testTopic"],
        groupId = "#{ T(java.util.UUID).randomUUID().toString() }",
        containerFactory = "myContainerFactory"
    )
    fun consume(
        @Payload payload: TestDto
    ) {
        logger.info("Consumer start: $payload")
    }
}

ConsumerConfig에서 정의한 myContainerFactory Bean 이름을 넣어줍니다.

 

Producer에 메시지를 발행하여 Consumer 부분 관찰

Consume는 계속적으로 메시지를 consume 하면서 실패합니다.

이때 실패하기 때문에 무한적으로 consume을 수행합니다.

 

에러 원인은 다음과 같습니다.

Caused by: java.lang.IllegalArgumentException: 
The class 'com.example.study.producer.TestDto' is not in the trusted packages: 
[java.util, java.lang, com.example.study.request, com.example.study.request.*]. 
If you believe this class is safe to deserialize, please provide its name. 
If the serialization is only done by a trusted source, you can also enable trust all (*).

 

원인을 해결하기 전 ErrorHandlingDeserializer적용

return DefaultKafkaConsumerFactory(
            props,
            StringDeserializer(),
            errorHandlingDeserializer
        )

이제 예외가 단건으로 발생하고 끝납니다.

 

Caused by: org.springframework.kafka.support.serializer.DeserializationException:
 failed to deserialize

 

DeserializationException 해결

다음 옵션을 활성화하여 해결할 수 있습니다.

deserializer.addTrustedPackages("*");

 

TestDto의 내부 property를 동일하게 사용할지라도 패키지가 다른 경우에는 직렬화/역직렬화 시에 예외가 발생합니다.

package이름까지 직렬화과정에 포함되기 때문에 주의해야 합니다.

 

 

다시 예외 발생

Caused by: org.springframework.messaging.converter.MessageConversionException:
 failed to resolve class name. Class not found [com.example.study.producer.TestDto]

Class Name Not Fond 예외가 발생합니다.

 

다음 옵션을 활성화시킵니다.

deserializer.setUseTypeMapperForKey(true);

 

 

kafka는 header에서 값을 꺼내서 Class를 찾습니다.

이때 해당 Class가 없으면 null을 할당하게 되고 해당 Class가 없으면 ClassNotFoundException을 발생시킵니다.

해당 옵션을 활성화하면 Consumer에서 객체를 받을 때 Publisher에서 보낸 오브젝트의 키 값을 기준으로 값을 받습니다.

 

메서드를 타고 가 보면 다음과 같이 true일 경우에 여러 필드네임을 세팅합니다.

public void setUseForKey(boolean isKey) {
		if (isKey) {
			setClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME);
			setContentClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME);
			setKeyClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_KEY_CLASSID_FIELD_NAME);
		}
	}

 

이제 Consumer는 잘 동작합니다.

2023-07-07T14:18:23.830+09:00  INFO 58242 --- [ntainer#0-0-C-1] 
c.example.study.consumer.KafkaConsumer   : 
Consumer start: TestDto(col1=1, col2=2)

 

AckMode 옵션이 yml에서 잘 안 먹는다면..

@Bean
    fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
        factory.consumerFactory = consumerFactory()
        return factory
    }

Factory에서 ContainerProperties.AckMode를 강제로 지정