Kafka 객체 SERIALIZER, DESERIALIZER
개요
Kafka를 사용하다 보면 메시지를 Stirng이 아닌 객체(DTO)로 전송하고 싶을 수 있습니다.
이때 Producer, Consumer Config를 알아보겠습니다.
테스트하기 위한 환경
- local에서 실행할 수 있는 SpringBootApplication consumer 1대, producer 1대
- docker, AWS MSK등으로 구성한 kafka 인프라
TestDTO
data class TestDto(
val col1: String,
val col2: String,
)
@Configuration
@EnableKafka
class KafkaProducerConfig {
@Value("\${spring.kafka.producer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVERS: String
@Bean
fun factory(): ProducerFactory<String, TestDto> {
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
return DefaultKafkaProducerFactory(props)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, TestDto> {
return KafkaTemplate(factory())
}
}
StringSerializer대신 JsonSerializer를 사용합니다.
KafkaTemplate의 Type도 TestDto로 정의해 둡니다.
KafkaConsumerConfig
package com.example.study.config
import com.example.study.request.TestDto
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
import org.springframework.kafka.support.serializer.JsonDeserializer
@Configuration
@EnableKafka
class KafkaConsumerConfig {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var BOOTSTRAP_ADDRESS: String
@Value("\${spring.kafka.consumer.auto-offset-reset}")
private lateinit var AUTO_OFFSET_RESET: String
@Value("\${spring.kafka.consumer.enable-auto-commit}")
private lateinit var AUTO_COMMIT: String
@Bean
fun consumerFactory(): ConsumerFactory<String, TestDto> {
val deserializer: JsonDeserializer<TestDto> = JsonDeserializer(TestDto::class.java)
// deserializer.setRemoveTypeHeaders(false);
// deserializer.addTrustedPackages("*");
// deserializer.setUseTypeMapperForKey(true);
val errorHandlingDeserializer = ErrorHandlingDeserializer(deserializer)
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_ADDRESS
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = AUTO_OFFSET_RESET
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = AUTO_COMMIT
return DefaultKafkaConsumerFactory(
props,
StringDeserializer(),
deserializer
// errorHandlingDeserializer
)
}
@Bean
fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
factory.consumerFactory = consumerFactory()
return factory
}
}
deserializer와 errorHandlingDeserializer는 주석처리 해두었습니다.
우선 주석을 하여 다음과 같이 설정한 후 deserializer의 옵션과 ErrorHandlingDeserializer를 알아보겠습니다.
KafkaConsumer
@Component
class KafkaConsumer {
val logger = KotlinLogging.logger {}
@KafkaListener(
topics = ["testTopic"],
groupId = "#{ T(java.util.UUID).randomUUID().toString() }",
containerFactory = "myContainerFactory"
)
fun consume(
@Payload payload: TestDto
) {
logger.info("Consumer start: $payload")
}
}
ConsumerConfig에서 정의한 myContainerFactory Bean 이름을 넣어줍니다.
Producer에 메시지를 발행하여 Consumer 부분 관찰
Consume는 계속적으로 메시지를 consume 하면서 실패합니다.
이때 실패하기 때문에 무한적으로 consume을 수행합니다.
에러 원인은 다음과 같습니다.
Caused by: java.lang.IllegalArgumentException:
The class 'com.example.study.producer.TestDto' is not in the trusted packages:
[java.util, java.lang, com.example.study.request, com.example.study.request.*].
If you believe this class is safe to deserialize, please provide its name.
If the serialization is only done by a trusted source, you can also enable trust all (*).
원인을 해결하기 전 ErrorHandlingDeserializer적용
return DefaultKafkaConsumerFactory(
props,
StringDeserializer(),
errorHandlingDeserializer
)
이제 예외가 단건으로 발생하고 끝납니다.
Caused by: org.springframework.kafka.support.serializer.DeserializationException:
failed to deserialize
DeserializationException 해결
다음 옵션을 활성화하여 해결할 수 있습니다.
deserializer.addTrustedPackages("*");
TestDto의 내부 property를 동일하게 사용할지라도 패키지가 다른 경우에는 직렬화/역직렬화 시에 예외가 발생합니다.
package이름까지 직렬화과정에 포함되기 때문에 주의해야 합니다.
다시 예외 발생
Caused by: org.springframework.messaging.converter.MessageConversionException:
failed to resolve class name. Class not found [com.example.study.producer.TestDto]
Class Name Not Fond 예외가 발생합니다.
다음 옵션을 활성화시킵니다.
deserializer.setUseTypeMapperForKey(true);
kafka는 header에서 값을 꺼내서 Class를 찾습니다.
이때 해당 Class가 없으면 null을 할당하게 되고 해당 Class가 없으면 ClassNotFoundException을 발생시킵니다.
해당 옵션을 활성화하면 Consumer에서 객체를 받을 때 Publisher에서 보낸 오브젝트의 키 값을 기준으로 값을 받습니다.
메서드를 타고 가 보면 다음과 같이 true일 경우에 여러 필드네임을 세팅합니다.
public void setUseForKey(boolean isKey) {
if (isKey) {
setClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME);
setContentClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME);
setKeyClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_KEY_CLASSID_FIELD_NAME);
}
}
이제 Consumer는 잘 동작합니다.
2023-07-07T14:18:23.830+09:00 INFO 58242 --- [ntainer#0-0-C-1]
c.example.study.consumer.KafkaConsumer :
Consumer start: TestDto(col1=1, col2=2)
AckMode 옵션이 yml에서 잘 안 먹는다면..
@Bean
fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
factory.consumerFactory = consumerFactory()
return factory
}
Factory에서 ContainerProperties.AckMode를 강제로 지정