Factory에서 ContainerProperties.AckMode를 강제로 지정
-
Kafka 객체 SERIALIZER, DESERIALIZER프로젝트/kafka 2023. 8. 5. 00:01
개요
Kafka를 사용하다 보면 메시지를 Stirng이 아닌 객체(DTO)로 전송하고 싶을 수 있습니다.
이때 Producer, Consumer Config를 알아보겠습니다.
테스트하기 위한 환경
- local에서 실행할 수 있는 SpringBootApplication consumer 1대, producer 1대
- docker, AWS MSK등으로 구성한 kafka 인프라
TestDTO
data class TestDto( val col1: String, val col2: String, )
@Configuration @EnableKafka class KafkaProducerConfig { @Value("\${spring.kafka.producer.bootstrap-servers}") private lateinit var BOOTSTRAP_SERVERS: String @Bean fun factory(): ProducerFactory<String, TestDto> { val props: MutableMap<String, Any> = HashMap() props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVERS props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java return DefaultKafkaProducerFactory(props) } @Bean fun kafkaTemplate(): KafkaTemplate<String, TestDto> { return KafkaTemplate(factory()) } }
StringSerializer대신 JsonSerializer를 사용합니다.
KafkaTemplate의 Type도 TestDto로 정의해 둡니다.
KafkaConsumerConfig
package com.example.study.config import com.example.study.request.TestDto import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer import org.springframework.kafka.support.serializer.JsonDeserializer @Configuration @EnableKafka class KafkaConsumerConfig { @Value("\${spring.kafka.consumer.bootstrap-servers}") private lateinit var BOOTSTRAP_ADDRESS: String @Value("\${spring.kafka.consumer.auto-offset-reset}") private lateinit var AUTO_OFFSET_RESET: String @Value("\${spring.kafka.consumer.enable-auto-commit}") private lateinit var AUTO_COMMIT: String @Bean fun consumerFactory(): ConsumerFactory<String, TestDto> { val deserializer: JsonDeserializer<TestDto> = JsonDeserializer(TestDto::class.java) // deserializer.setRemoveTypeHeaders(false); // deserializer.addTrustedPackages("*"); // deserializer.setUseTypeMapperForKey(true); val errorHandlingDeserializer = ErrorHandlingDeserializer(deserializer) val props: MutableMap<String, Any> = HashMap() props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_ADDRESS props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = AUTO_OFFSET_RESET props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = AUTO_COMMIT return DefaultKafkaConsumerFactory( props, StringDeserializer(), deserializer // errorHandlingDeserializer ) } @Bean fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> { val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>() factory.consumerFactory = consumerFactory() return factory } }
deserializer와 errorHandlingDeserializer는 주석처리 해두었습니다.
우선 주석을 하여 다음과 같이 설정한 후 deserializer의 옵션과 ErrorHandlingDeserializer를 알아보겠습니다.
KafkaConsumer
@Component class KafkaConsumer { val logger = KotlinLogging.logger {} @KafkaListener( topics = ["testTopic"], groupId = "#{ T(java.util.UUID).randomUUID().toString() }", containerFactory = "myContainerFactory" ) fun consume( @Payload payload: TestDto ) { logger.info("Consumer start: $payload") } }
ConsumerConfig에서 정의한 myContainerFactory Bean 이름을 넣어줍니다.
Producer에 메시지를 발행하여 Consumer 부분 관찰
Consume는 계속적으로 메시지를 consume 하면서 실패합니다.
이때 실패하기 때문에 무한적으로 consume을 수행합니다.
에러 원인은 다음과 같습니다.
Caused by: java.lang.IllegalArgumentException: The class 'com.example.study.producer.TestDto' is not in the trusted packages: [java.util, java.lang, com.example.study.request, com.example.study.request.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
원인을 해결하기 전 ErrorHandlingDeserializer적용
return DefaultKafkaConsumerFactory( props, StringDeserializer(), errorHandlingDeserializer )
이제 예외가 단건으로 발생하고 끝납니다.
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize
DeserializationException 해결
다음 옵션을 활성화하여 해결할 수 있습니다.
deserializer.addTrustedPackages("*");
TestDto의 내부 property를 동일하게 사용할지라도 패키지가 다른 경우에는 직렬화/역직렬화 시에 예외가 발생합니다.
package이름까지 직렬화과정에 포함되기 때문에 주의해야 합니다.
다시 예외 발생
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.study.producer.TestDto]
Class Name Not Fond 예외가 발생합니다.
다음 옵션을 활성화시킵니다.
deserializer.setUseTypeMapperForKey(true);
kafka는 header에서 값을 꺼내서 Class를 찾습니다.
이때 해당 Class가 없으면 null을 할당하게 되고 해당 Class가 없으면 ClassNotFoundException을 발생시킵니다.
해당 옵션을 활성화하면 Consumer에서 객체를 받을 때 Publisher에서 보낸 오브젝트의 키 값을 기준으로 값을 받습니다.
메서드를 타고 가 보면 다음과 같이 true일 경우에 여러 필드네임을 세팅합니다.
public void setUseForKey(boolean isKey) { if (isKey) { setClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME); setContentClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME); setKeyClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_KEY_CLASSID_FIELD_NAME); } }
이제 Consumer는 잘 동작합니다.
2023-07-07T14:18:23.830+09:00 INFO 58242 --- [ntainer#0-0-C-1] c.example.study.consumer.KafkaConsumer : Consumer start: TestDto(col1=1, col2=2)
AckMode 옵션이 yml에서 잘 안 먹는다면..
@Bean fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> { val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>() factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE factory.consumerFactory = consumerFactory() return factory }
'프로젝트 > kafka' 카테고리의 다른 글
kafka Dead Letter Queue(DLQ, DLT) (0) 2023.08.07 kafka 메시지 삭제 정책 (0) 2023.08.06 kafka Consumer group id 랜덤으로 부여 (0) 2023.08.04 kafka consumer offset option (latest vs earliest) (0) 2023.08.03 kafka는 왜 속도가 빠를까? (1) 2023.07.29