ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka 객체 SERIALIZER, DESERIALIZER
    프로젝트/kafka 2023. 8. 5. 00:01

    개요

    Kafka를 사용하다 보면 메시지를 Stirng이 아닌 객체(DTO)로 전송하고 싶을 수 있습니다.

    이때 Producer, Consumer Config를 알아보겠습니다.

     

    테스트하기 위한 환경

    • local에서 실행할 수 있는 SpringBootApplication consumer 1대, producer 1대
    • docker, AWS MSK등으로 구성한 kafka 인프라

    TestDTO

    data class TestDto(
        val col1: String,
        val col2: String,
    )

     

    @Configuration
    @EnableKafka
    class KafkaProducerConfig {
        @Value("\${spring.kafka.producer.bootstrap-servers}")
        private lateinit var BOOTSTRAP_SERVERS: String
    
        @Bean
        fun factory(): ProducerFactory<String, TestDto> {
            val props: MutableMap<String, Any> = HashMap()
            props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS
            props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
            return DefaultKafkaProducerFactory(props)
    
        }
    
        @Bean
        fun kafkaTemplate(): KafkaTemplate<String, TestDto> {
            return KafkaTemplate(factory())
        }
    }

    StringSerializer대신 JsonSerializer를 사용합니다.

    KafkaTemplate의 Type도 TestDto로 정의해 둡니다.

     

    KafkaConsumerConfig

    package com.example.study.config
    
    import com.example.study.request.TestDto
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.springframework.beans.factory.annotation.Value
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.annotation.EnableKafka
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
    import org.springframework.kafka.core.ConsumerFactory
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory
    import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    import org.springframework.kafka.support.serializer.JsonDeserializer
    
    @Configuration
    @EnableKafka
    class KafkaConsumerConfig {
        @Value("\${spring.kafka.consumer.bootstrap-servers}")
        private lateinit var BOOTSTRAP_ADDRESS: String
    
        @Value("\${spring.kafka.consumer.auto-offset-reset}")
        private lateinit var AUTO_OFFSET_RESET: String
    
        @Value("\${spring.kafka.consumer.enable-auto-commit}")
        private lateinit var AUTO_COMMIT: String
    
        @Bean
        fun consumerFactory(): ConsumerFactory<String, TestDto> {
    
            val deserializer: JsonDeserializer<TestDto> = JsonDeserializer(TestDto::class.java)
    //        deserializer.setRemoveTypeHeaders(false);
    //        deserializer.addTrustedPackages("*");
    //        deserializer.setUseTypeMapperForKey(true);
    
            val errorHandlingDeserializer = ErrorHandlingDeserializer(deserializer)
    
            val props: MutableMap<String, Any> = HashMap()
            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_ADDRESS
            props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = AUTO_OFFSET_RESET
            props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = AUTO_COMMIT
            return DefaultKafkaConsumerFactory(
                props,
                StringDeserializer(),
                deserializer
    //            errorHandlingDeserializer
            )
        }
    
        @Bean
        fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
            val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
            factory.consumerFactory = consumerFactory()
            return factory
        }
    }

    deserializer와 errorHandlingDeserializer는 주석처리 해두었습니다.

    우선 주석을 하여 다음과 같이 설정한 후 deserializer의 옵션과 ErrorHandlingDeserializer를 알아보겠습니다.

     

    KafkaConsumer

    @Component
    class KafkaConsumer {
        val logger = KotlinLogging.logger {}
    
        @KafkaListener(
            topics = ["testTopic"],
            groupId = "#{ T(java.util.UUID).randomUUID().toString() }",
            containerFactory = "myContainerFactory"
        )
        fun consume(
            @Payload payload: TestDto
        ) {
            logger.info("Consumer start: $payload")
        }
    }

    ConsumerConfig에서 정의한 myContainerFactory Bean 이름을 넣어줍니다.

     

    Producer에 메시지를 발행하여 Consumer 부분 관찰

    Consume는 계속적으로 메시지를 consume 하면서 실패합니다.

    이때 실패하기 때문에 무한적으로 consume을 수행합니다.

     

    에러 원인은 다음과 같습니다.

    Caused by: java.lang.IllegalArgumentException: 
    The class 'com.example.study.producer.TestDto' is not in the trusted packages: 
    [java.util, java.lang, com.example.study.request, com.example.study.request.*]. 
    If you believe this class is safe to deserialize, please provide its name. 
    If the serialization is only done by a trusted source, you can also enable trust all (*).

     

    원인을 해결하기 전 ErrorHandlingDeserializer적용

    return DefaultKafkaConsumerFactory(
                props,
                StringDeserializer(),
                errorHandlingDeserializer
            )

    이제 예외가 단건으로 발생하고 끝납니다.

     

    Caused by: org.springframework.kafka.support.serializer.DeserializationException:
     failed to deserialize

     

    DeserializationException 해결

    다음 옵션을 활성화하여 해결할 수 있습니다.

    deserializer.addTrustedPackages("*");

     

    TestDto의 내부 property를 동일하게 사용할지라도 패키지가 다른 경우에는 직렬화/역직렬화 시에 예외가 발생합니다.

    package이름까지 직렬화과정에 포함되기 때문에 주의해야 합니다.

     

     

    다시 예외 발생

    Caused by: org.springframework.messaging.converter.MessageConversionException:
     failed to resolve class name. Class not found [com.example.study.producer.TestDto]

    Class Name Not Fond 예외가 발생합니다.

     

    다음 옵션을 활성화시킵니다.

    deserializer.setUseTypeMapperForKey(true);

     

     

    kafka는 header에서 값을 꺼내서 Class를 찾습니다.

    이때 해당 Class가 없으면 null을 할당하게 되고 해당 Class가 없으면 ClassNotFoundException을 발생시킵니다.

    해당 옵션을 활성화하면 Consumer에서 객체를 받을 때 Publisher에서 보낸 오브젝트의 키 값을 기준으로 값을 받습니다.

     

    메서드를 타고 가 보면 다음과 같이 true일 경우에 여러 필드네임을 세팅합니다.

    public void setUseForKey(boolean isKey) {
    		if (isKey) {
    			setClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME);
    			setContentClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME);
    			setKeyClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_KEY_CLASSID_FIELD_NAME);
    		}
    	}

     

    이제 Consumer는 잘 동작합니다.

    2023-07-07T14:18:23.830+09:00  INFO 58242 --- [ntainer#0-0-C-1] 
    c.example.study.consumer.KafkaConsumer   : 
    Consumer start: TestDto(col1=1, col2=2)

     

    AckMode 옵션이 yml에서 잘 안 먹는다면..

    @Bean
        fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
            val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
            factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
            factory.consumerFactory = consumerFactory()
            return factory
        }

    Factory에서 ContainerProperties.AckMode를 강제로 지정

     

    댓글

Designed by Tistory.