-
kafka 모듈 구성하기프로젝트/선착순 쿠폰 발급 시스템 2023. 4. 20. 00:01728x90
git repo에 모든 코드가 존재합니다.
https://github.com/Junuu/coupon-system
Kafka 디렉터리 만들기
kafka 디렉터리를 만들고 settings.gradle.kts에 include를 추가합니다.
include("kafka")
이후 gradle refresh
Kafka 모듈에 의존성 추가
kafka 모듈의 build.gradle.kfs
import org.springframework.boot.gradle.tasks.bundling.BootJar val jar: Jar by tasks val bootJar: BootJar by tasks bootJar.enabled = false jar.enabled = true dependencies { implementation("org.springframework.kafka:spring-kafka") testImplementation("org.springframework.kafka:spring-kafka-test") implementation("io.github.microutils:kotlin-logging-jvm:2.0.10") implementation("org.slf4j:slf4j-api:1.7.30") }
위의 의존성은 kafka를 위한 의존성이고 아래 의존성은 logging을 위한 의존성입니다.
추후 logging 모듈은 따로 common 모듈이나 logging 모듈로 빠질 수도 있습니다.
Kafka 모듈 application-kafka.yml 설정
spring: profiles: active: local kafka: producer: bootstrap-servers: localhost:9092 consumer: auto-offset-reset: earliest bootstrap-servers: localhost:9092 enable-auto-commit: false listener: ack-mode: manual
local 환경용으로 설정을 합니다.
카프카 클러스터의 특징으로 각 브로커들이 클러스터 전체 데이터의 일부를 가지고 있습니다.
bootstrap-servers 설정은 카프카 클러스터를 구성하는 전체 브로커에 대해 설정해주지 않아도 클라이언트는 각자의 브로커 내부의 자원 접근하기 위해, 클라이언트가 접근하고자 하는 자원의 위치를 알아야 합니다.
이런 자원들의 메타데이터를 공유하기 위해 bootstrap.servers 설정을 요구합니다.
auto.offset.reset 에서 ealiset는 가장 처음 offset부터를 의미합니다.
enable-auto-commit의 경우 false로 설정하면 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 처리하지 않습니다.
명시적으로 commitSync 메서드를 호출하여 메시지 처리 완료 및 메시지를 가져온 것으로 설정해주어야 합니다.
ack-mode를 manual로 설정하는 경우고 Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋을 진행합니다.
Consumer Config
package com.demo.kafka.config import com.fasterxml.jackson.databind.deser.std.StringDeserializer import org.apache.kafka.clients.consumer.ConsumerConfig import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory import java.util.* @Configuration @EnableKafka class KafkaConsumerConfig { @Value("\${spring.kafka.consumer.bootstrap-servers}") private lateinit var BOOTSTRAP_ADDRESS: String @Value("\${spring.kafka.consumer.auto-offset-reset}") private lateinit var AUTO_OFFSET_RESET: String @Value("\${spring.kafka.consumer.enable-auto-commit}") private lateinit var AUTO_COMMIT: String @Bean fun consumerFactory(): ConsumerFactory<String, String> { val props: MutableMap<String, Any> = HashMap() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS) props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer::class) return DefaultKafkaConsumerFactory(props) } @Bean fun containerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { val factory = ConcurrentKafkaListenerContainerFactory<String, String>() factory.consumerFactory = consumerFactory() return factory } }
Producer Config
package com.demo.kafka.config import org.apache.kafka.clients.producer.ProducerConfig import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory import java.util.* @Configuration class KafkaProducerConfig { @Value("\${spring.kafka.producer.bootstrap-servers}") private lateinit var BOOTSTRAP_SERVERS: String @Bean fun factory(): ProducerFactory<String, String> { val props: MutableMap<String, Any> = HashMap() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.forName("org.apache.kafka.common.serialization.StringSerializer")); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.forName("org.apache.kafka.common.serialization.StringSerializer")); return DefaultKafkaProducerFactory(props) } @Bean fun kafkaTemplate(): KafkaTemplate<String, String> { return KafkaTemplate(factory()) } }
Consumer
package com.demo.kafka.consumer import mu.KotlinLogging import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.support.Acknowledgment import org.springframework.messaging.handler.annotation.Payload import org.springframework.stereotype.Component import java.util.concurrent.CountDownLatch @Component class KafkaConsumer{ val logger = KotlinLogging.logger{} val latch = CountDownLatch(1) var saveLastPayload: String = "init" @KafkaListener(topics = ["testTopic"], groupId = "testGroup") fun consume(@Payload payload: String, acknowledgment: Acknowledgment) { logger.info(payload) saveLastPayload = payload latch.countDown() // Process acknowledgment.acknowledge() } }
Producer
package com.demo.kafka.producer import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component @Component class KafkaProducer( private val kafkaTemplate: KafkaTemplate<String, String> ) { fun send(topic: String, payload: String) { kafkaTemplate.send(topic, payload) } }
EnableKafka Annotation
@Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.RUNTIME) @MustBeDocumented @Import( KafkaConsumerConfig::class, KafkaProducerConfig::class, KafkaConsumer::class, KafkaProducer::class, ) annotation class EnableKafka { }
4개의 클래스를 Import Spring에서 사용할 수 있도록 합니다.
test 디렉터리에서 SpringApplication
import com.demo.kafka.config.EnableKafka import org.springframework.boot.autoconfigure.SpringBootApplication @EnableKafka @SpringBootApplication(scanBasePackages = ["com.demo.kafka"]) class SpringBootApplication { }
EmbeddedKafkaTest
import com.demo.kafka.SpringBootApplication import com.demo.kafka.consumer.KafkaConsumer import com.demo.kafka.producer.KafkaProducer import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.kafka.test.context.EmbeddedKafka import org.springframework.test.context.ActiveProfiles import org.springframework.test.context.ContextConfiguration import java.util.concurrent.TimeUnit @SpringBootTest @ContextConfiguration(classes = [SpringBootApplication::class]) @ActiveProfiles("local") @EmbeddedKafka(partitions = 3, brokerProperties = ["listeners=PLAINTEXT://localhost:9092"], ports = [9092]) class EmbeddedKafkaIntegrationTest( @Autowired private val producer: KafkaProducer, @Autowired private val consumer: KafkaConsumer, ) { @Test fun `임베디드 카프카 테스트`() { // given val payload = "embedded Kafka Test" // when producer.send("testTopic", payload) // then val result = consumer.latch.await(10L, TimeUnit.SECONDS) assertTrue(result) assertThat(consumer.saveLastPayload).isEqualTo(payload) } }
test application.yml
spring: profiles: active: local kafka: producer: bootstrap-servers: localhost:9092 consumer: auto-offset-reset: earliest bootstrap-servers: localhost:9092 enable-auto-commit: false listener: ack-mode: manual
참고자료
https://www.baeldung.com/spring-boot-kafka-testing
https://kafka.apache.org/documentation/
https://velog.io/@pha2616/Apache-Kafka-bootstrap.server
https://always-kimkim.tistory.com/entry/kafka101-configuration-bootstrap-servers
'프로젝트 > 선착순 쿠폰 발급 시스템' 카테고리의 다른 글
gradlew build 에러 해결 과정 기록 (0) 2023.04.24 Docker로 Spring + Kafka 실행하기 (0) 2023.04.22 Eventual Consistency란? (0) 2023.04.19 서킷 브레이커 구성하기 (0) 2023.02.28 Client 모듈 구성하기 (0) 2023.02.27