kafka 모듈 구성하기
git repo에 모든 코드가 존재합니다.
https://github.com/Junuu/coupon-system
GitHub - Junuu/coupon-system: 선착순 쿠폰 발급 프로젝트
선착순 쿠폰 발급 프로젝트. Contribute to Junuu/coupon-system development by creating an account on GitHub.
github.com
Kafka 디렉터리 만들기
kafka 디렉터리를 만들고 settings.gradle.kts에 include를 추가합니다.
include("kafka")
이후 gradle refresh
Kafka 모듈에 의존성 추가
kafka 모듈의 build.gradle.kfs
import org.springframework.boot.gradle.tasks.bundling.BootJar
val jar: Jar by tasks
val bootJar: BootJar by tasks
bootJar.enabled = false
jar.enabled = true
dependencies {
implementation("org.springframework.kafka:spring-kafka")
testImplementation("org.springframework.kafka:spring-kafka-test")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.10")
implementation("org.slf4j:slf4j-api:1.7.30")
}
위의 의존성은 kafka를 위한 의존성이고 아래 의존성은 logging을 위한 의존성입니다.
추후 logging 모듈은 따로 common 모듈이나 logging 모듈로 빠질 수도 있습니다.
Kafka 모듈 application-kafka.yml 설정
spring:
profiles:
active: local
kafka:
producer:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
enable-auto-commit: false
listener:
ack-mode: manual
local 환경용으로 설정을 합니다.
카프카 클러스터의 특징으로 각 브로커들이 클러스터 전체 데이터의 일부를 가지고 있습니다.
bootstrap-servers 설정은 카프카 클러스터를 구성하는 전체 브로커에 대해 설정해주지 않아도 클라이언트는 각자의 브로커 내부의 자원 접근하기 위해, 클라이언트가 접근하고자 하는 자원의 위치를 알아야 합니다.
이런 자원들의 메타데이터를 공유하기 위해 bootstrap.servers 설정을 요구합니다.
auto.offset.reset 에서 ealiset는 가장 처음 offset부터를 의미합니다.
enable-auto-commit의 경우 false로 설정하면 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 처리하지 않습니다.
명시적으로 commitSync 메서드를 호출하여 메시지 처리 완료 및 메시지를 가져온 것으로 설정해주어야 합니다.
ack-mode를 manual로 설정하는 경우고 Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋을 진행합니다.
Consumer Config
package com.demo.kafka.config
import com.fasterxml.jackson.databind.deser.std.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import java.util.*
@Configuration
@EnableKafka
class KafkaConsumerConfig {
@Value("\${spring.kafka.consumer.bootstrap-servers}")
private lateinit var BOOTSTRAP_ADDRESS: String
@Value("\${spring.kafka.consumer.auto-offset-reset}")
private lateinit var AUTO_OFFSET_RESET: String
@Value("\${spring.kafka.consumer.enable-auto-commit}")
private lateinit var AUTO_COMMIT: String
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val props: MutableMap<String, Any> = HashMap()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer::class)
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun containerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
}
Producer Config
package com.demo.kafka.config
import org.apache.kafka.clients.producer.ProducerConfig
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import java.util.*
@Configuration
class KafkaProducerConfig {
@Value("\${spring.kafka.producer.bootstrap-servers}")
private lateinit var BOOTSTRAP_SERVERS: String
@Bean
fun factory(): ProducerFactory<String, String> {
val props: MutableMap<String, Any> = HashMap()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.forName("org.apache.kafka.common.serialization.StringSerializer"));
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.forName("org.apache.kafka.common.serialization.StringSerializer"));
return DefaultKafkaProducerFactory(props)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(factory())
}
}
Consumer
package com.demo.kafka.consumer
import mu.KotlinLogging
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component
import java.util.concurrent.CountDownLatch
@Component
class KafkaConsumer{
val logger = KotlinLogging.logger{}
val latch = CountDownLatch(1)
var saveLastPayload: String = "init"
@KafkaListener(topics = ["testTopic"], groupId = "testGroup")
fun consume(@Payload payload: String, acknowledgment: Acknowledgment) {
logger.info(payload)
saveLastPayload = payload
latch.countDown()
// Process
acknowledgment.acknowledge()
}
}
Producer
package com.demo.kafka.producer
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class KafkaProducer(
private val kafkaTemplate: KafkaTemplate<String, String>
) {
fun send(topic: String, payload: String) {
kafkaTemplate.send(topic, payload)
}
}
EnableKafka Annotation
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
@MustBeDocumented
@Import(
KafkaConsumerConfig::class,
KafkaProducerConfig::class,
KafkaConsumer::class,
KafkaProducer::class,
)
annotation class EnableKafka {
}
4개의 클래스를 Import Spring에서 사용할 수 있도록 합니다.
test 디렉터리에서 SpringApplication
import com.demo.kafka.config.EnableKafka
import org.springframework.boot.autoconfigure.SpringBootApplication
@EnableKafka
@SpringBootApplication(scanBasePackages = ["com.demo.kafka"])
class SpringBootApplication {
}
EmbeddedKafkaTest
import com.demo.kafka.SpringBootApplication
import com.demo.kafka.consumer.KafkaConsumer
import com.demo.kafka.producer.KafkaProducer
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.test.context.ActiveProfiles
import org.springframework.test.context.ContextConfiguration
import java.util.concurrent.TimeUnit
@SpringBootTest
@ContextConfiguration(classes = [SpringBootApplication::class])
@ActiveProfiles("local")
@EmbeddedKafka(partitions = 3, brokerProperties = ["listeners=PLAINTEXT://localhost:9092"], ports = [9092])
class EmbeddedKafkaIntegrationTest(
@Autowired private val producer: KafkaProducer,
@Autowired private val consumer: KafkaConsumer,
) {
@Test
fun `임베디드 카프카 테스트`() {
// given
val payload = "embedded Kafka Test"
// when
producer.send("testTopic", payload)
// then
val result = consumer.latch.await(10L, TimeUnit.SECONDS)
assertTrue(result)
assertThat(consumer.saveLastPayload).isEqualTo(payload)
}
}
test application.yml
spring:
profiles:
active: local
kafka:
producer:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:9092
enable-auto-commit: false
listener:
ack-mode: manual
참고자료
https://www.baeldung.com/spring-boot-kafka-testing
https://kafka.apache.org/documentation/
https://velog.io/@pha2616/Apache-Kafka-bootstrap.server
https://always-kimkim.tistory.com/entry/kafka101-configuration-bootstrap-servers