ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka 모듈 구성하기
    프로젝트/선착순 쿠폰 발급 시스템 2023. 4. 20. 00:01
    728x90

    git repo에 모든 코드가 존재합니다.

    https://github.com/Junuu/coupon-system

     

    GitHub - Junuu/coupon-system: 선착순 쿠폰 발급 프로젝트

    선착순 쿠폰 발급 프로젝트. Contribute to Junuu/coupon-system development by creating an account on GitHub.

    github.com

     

    Kafka 디렉터리 만들기

    kafka 디렉터리를 만들고 settings.gradle.kts에 include를 추가합니다.

    include("kafka")

    이후 gradle refresh

     

    Kafka 모듈에 의존성 추가

    kafka 모듈의 build.gradle.kfs

    import org.springframework.boot.gradle.tasks.bundling.BootJar
    
    val jar: Jar by tasks
    val bootJar: BootJar by tasks
    
    bootJar.enabled = false
    jar.enabled = true
    
    
    
    dependencies {
    	implementation("org.springframework.kafka:spring-kafka")
    	testImplementation("org.springframework.kafka:spring-kafka-test")
        
    	implementation("io.github.microutils:kotlin-logging-jvm:2.0.10")
    	implementation("org.slf4j:slf4j-api:1.7.30")
    }

    위의 의존성은 kafka를 위한 의존성이고 아래 의존성은 logging을 위한 의존성입니다.

    추후 logging 모듈은 따로 common 모듈이나 logging 모듈로 빠질 수도 있습니다.

     

    Kafka 모듈 application-kafka.yml 설정

    spring:
      profiles:
        active: local
    
      kafka:
        producer:
          bootstrap-servers: localhost:9092
        consumer:
          auto-offset-reset: earliest
          bootstrap-servers: localhost:9092
          enable-auto-commit: false
        listener:
          ack-mode: manual

    local 환경용으로 설정을 합니다.

     

    카프카 클러스터의 특징으로 각 브로커들이 클러스터 전체 데이터의 일부를 가지고 있습니다.

     

    bootstrap-servers 설정은 카프카 클러스터를 구성하는 전체 브로커에 대해 설정해주지 않아도 클라이언트는 각자의 브로커 내부의 자원 접근하기 위해, 클라이언트가 접근하고자 하는 자원의 위치를 알아야 합니다.

     

    이런 자원들의 메타데이터를 공유하기 위해 bootstrap.servers 설정을 요구합니다.

     

    auto.offset.reset 에서 ealiset는 가장 처음 offset부터를 의미합니다.

    https://blog.voidmainvoid.net/305

     

    enable-auto-commit의 경우 false로 설정하면 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 처리하지 않습니다.

    명시적으로 commitSync 메서드를 호출하여 메시지 처리 완료 및 메시지를 가져온 것으로 설정해주어야 합니다.

     

    ack-mode를 manual로 설정하는 경우고 Acknowledgement.acknowledge()  메서드가 호출되면 다음번 poll() 때 커밋을 진행합니다.

     

    Consumer Config

    package com.demo.kafka.config
    
    import com.fasterxml.jackson.databind.deser.std.StringDeserializer
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.springframework.beans.factory.annotation.Value
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
    import org.springframework.kafka.core.ConsumerFactory
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory
    import java.util.*
    
    
    @Configuration
    @EnableKafka
    class KafkaConsumerConfig {
        @Value("\${spring.kafka.consumer.bootstrap-servers}")
        private lateinit var BOOTSTRAP_ADDRESS: String
    
        @Value("\${spring.kafka.consumer.auto-offset-reset}")
        private lateinit var AUTO_OFFSET_RESET: String
    
        @Value("\${spring.kafka.consumer.enable-auto-commit}")
        private lateinit var AUTO_COMMIT: String
    
        @Bean
        fun consumerFactory(): ConsumerFactory<String, String> {
            val props: MutableMap<String, Any> = HashMap()
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS)
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET)
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT)
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class)
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer::class)
            return DefaultKafkaConsumerFactory(props)
        }
    
        @Bean
        fun containerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
            val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
            factory.consumerFactory = consumerFactory()
            return factory
        }
    }

     

    Producer Config

    package com.demo.kafka.config
    
    import org.apache.kafka.clients.producer.ProducerConfig
    import org.springframework.beans.factory.annotation.Value
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.kafka.core.DefaultKafkaProducerFactory
    import org.springframework.kafka.core.KafkaTemplate
    import org.springframework.kafka.core.ProducerFactory
    import java.util.*
    
    
    @Configuration
    class KafkaProducerConfig {
        @Value("\${spring.kafka.producer.bootstrap-servers}")
        private lateinit var BOOTSTRAP_SERVERS: String
        @Bean
        fun factory(): ProducerFactory<String, String> {
            val props: MutableMap<String, Any> = HashMap()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.forName("org.apache.kafka.common.serialization.StringSerializer"));
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.forName("org.apache.kafka.common.serialization.StringSerializer"));
            return DefaultKafkaProducerFactory(props)
        }
    
        @Bean
        fun kafkaTemplate(): KafkaTemplate<String, String> {
            return KafkaTemplate(factory())
        }
    }

     

    Consumer

    package com.demo.kafka.consumer
    
    import mu.KotlinLogging
    import org.springframework.kafka.annotation.KafkaListener
    import org.springframework.kafka.support.Acknowledgment
    import org.springframework.messaging.handler.annotation.Payload
    import org.springframework.stereotype.Component
    import java.util.concurrent.CountDownLatch
    
    
    @Component
    class KafkaConsumer{
        val logger =  KotlinLogging.logger{}
        val latch = CountDownLatch(1)
        var saveLastPayload: String = "init"
    
        @KafkaListener(topics = ["testTopic"], groupId = "testGroup")
        fun consume(@Payload payload: String, acknowledgment: Acknowledgment) {
            logger.info(payload)
            saveLastPayload = payload
            latch.countDown()
            // Process
            acknowledgment.acknowledge()
        }
    }

     

    Producer

    package com.demo.kafka.producer
    
    import org.springframework.kafka.core.KafkaTemplate
    import org.springframework.stereotype.Component
    
    
    @Component
    class KafkaProducer(
        private val kafkaTemplate: KafkaTemplate<String, String>
    ) {
    
        fun send(topic: String, payload: String) {
            kafkaTemplate.send(topic, payload)
        }
    }

     

    EnableKafka Annotation

    @Target(AnnotationTarget.CLASS)
    @Retention(AnnotationRetention.RUNTIME)
    @MustBeDocumented
    @Import(
        KafkaConsumerConfig::class,
        KafkaProducerConfig::class,
        KafkaConsumer::class,
        KafkaProducer::class,
    
    )
    annotation class EnableKafka {
    }

    4개의 클래스를 Import Spring에서 사용할 수 있도록 합니다.

     

    test 디렉터리에서 SpringApplication

    import com.demo.kafka.config.EnableKafka
    import org.springframework.boot.autoconfigure.SpringBootApplication
    
    @EnableKafka
    @SpringBootApplication(scanBasePackages = ["com.demo.kafka"])
    class SpringBootApplication {
    }

     

    EmbeddedKafkaTest

    import com.demo.kafka.SpringBootApplication
    import com.demo.kafka.consumer.KafkaConsumer
    import com.demo.kafka.producer.KafkaProducer
    import org.assertj.core.api.Assertions.assertThat
    import org.junit.jupiter.api.Assertions.assertTrue
    import org.junit.jupiter.api.Test
    import org.springframework.beans.factory.annotation.Autowired
    import org.springframework.boot.test.context.SpringBootTest
    import org.springframework.kafka.test.context.EmbeddedKafka
    import org.springframework.test.context.ActiveProfiles
    import org.springframework.test.context.ContextConfiguration
    import java.util.concurrent.TimeUnit
    
    
    @SpringBootTest
    @ContextConfiguration(classes = [SpringBootApplication::class])
    @ActiveProfiles("local")
    @EmbeddedKafka(partitions = 3, brokerProperties = ["listeners=PLAINTEXT://localhost:9092"], ports = [9092])
    class EmbeddedKafkaIntegrationTest(
        @Autowired private val producer: KafkaProducer,
        @Autowired private val consumer: KafkaConsumer,
    ) {
    
        @Test
        fun `임베디드 카프카 테스트`() {
            // given
            val payload = "embedded Kafka Test"
    
            // when
            producer.send("testTopic", payload)
    
            // then
            val result = consumer.latch.await(10L, TimeUnit.SECONDS)
            assertTrue(result)
            assertThat(consumer.saveLastPayload).isEqualTo(payload)
        }
    }

     

    test application.yml

    spring:
      profiles:
        active: local
    
      kafka:
        producer:
          bootstrap-servers: localhost:9092
        consumer:
          auto-offset-reset: earliest
          bootstrap-servers: localhost:9092
          enable-auto-commit: false
        listener:
          ack-mode: manual

     

     

     

     

    참고자료

    https://www.baeldung.com/spring-boot-kafka-testing

    https://kafka.apache.org/documentation/

    https://velog.io/@wodyd202/Embedded-Kafka%EB%A5%BC-%ED%86%B5%ED%95%9C-Kafka-%ED%85%8C%EC%8A%A4%ED%8A%B8

    https://velog.io/@pha2616/Apache-Kafka-bootstrap.server

    https://always-kimkim.tistory.com/entry/kafka101-configuration-bootstrap-servers

    https://hanseom.tistory.com/174

    댓글

Designed by Tistory.