ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Boot WebSocket with Kafka
    프로젝트/WebSocket 2023. 7. 28. 00:01

    개요

    지난번 포스팅에서는 Spring WebSocket External Broker 적용을 통해 In-memory Broker인 SimpleBroker를 사용하지 않고 External Broker인 STOMP Message Broker인 ActiveMQ를 적용해 보았습니다.

    하지만 저를 포함한 팀원들이 ActiveMQ에 대해 자세히 알지 못했으며 추후 트러블슈팅이나 운영 등을 생각해서 팀원들이 알고 있는 메시지 브로커인 Kafka로 WebSocket을 활용해보고자 합니다.

     

     

    다시 보는 External Broker를 활용한 STOMP 구조도

    external broker 동작과정

    하지만 Kafka는 STOMP 프로토콜을 지원하지 않습니다..

     

     

    Kafka 어떻게 적용하지? - 다시 SimpleBroker로

    simple broker 동작과정

    다시 SimpleBroker로 돌아갑니다.

    External Broker를 사용해야만 했던 이유는 무엇일까요?

     

    Server Instance가 여러 대 존재할 때 상태에 대한 동기화가 제대로 이루어지지 않았기 때문입니다.

    예를 들어 다음과 같은 일이 일어날 수 있습니다.

    1. A유저가 Server1에 웹소켓을 연동하고 /topic/A를 구독

    2. B유저가 Server2에 웹소켓을 연동하고 /topic/B를 구독

    3. Server2에 서버에 요청이와서 /topic/A를 구독하는 사람에게 메시지 전송!

    4. Server2의 in-memory message broker는 A유저가 /topic/A를 구독한다는 정보를 전혀 알지 못해서 A에게 메시지를 전달할 수 없음.

     

     

    이 과정을 Kafka를 통해 동기화하여 해결하면 될 것 같습니다.

    1. A유저가 Server1에 웹소켓을 연동하고 /topic/A를 구독

    2. B유저가 Server2에 웹소켓을 연동하고 /topic/B를 구독

    3. 서버에 요청이 오면 Kafka로 해당 메시지 Produce

    4. Server1, Server2는 해당 메시지를 Consume 대기

    5. kafka Consumer가 토픽을 구독하여 In-memory Message Broker를 통해 메시지 발송

    6. 2대의 서버에서 /topic/A를 구독하는 상대에게 모두 메시지가 발송되기 때문에 동기화 문제가 해결됩니다.

     

    Kafka 환경을 구성하는 방법에 대해서는 다음을 참고해주세요

    https://junuuu.tistory.com/685

     

    kafka 모듈 구성하기

    git repo에 모든 코드가 존재합니다. https://github.com/Junuu/coupon-system GitHub - Junuu/coupon-system: 선착순 쿠폰 발급 프로젝트 선착순 쿠폰 발급 프로젝트. Contribute to Junuu/coupon-system development by creating an accou

    junuuu.tistory.com

     

    서버에 요청이 오면 Kafka로 해당 메시지 Produce

    @Component
    class KafkaProducer(
        private val kafkaTemplate: KafkaTemplate<String, String>
    ) {
    
        fun send(topic: String, payload: String) {
            logger.info {"kafka Producer start"}
            kafkaTemplate.send(topic, payload)
        }
    }

     

     

    kafka Consumer

    @Component
    class KafkaConsumer(
        private val template: SimpMessagingTemplate
    ){
        val logger =  KotlinLogging.logger{}
        var saveLastPayload: String = "init"
    
        @KafkaListener(topics = ["qr-login"], groupId = "#{ T(java.util.UUID).randomUUID().toString() }")
        fun consume(@Payload payload: String, acknowledgment: Acknowledgment) {
            logger.info("Consumer start: $payload")
            template.convertAndSend("/queue/sessionId/$sessionId", "login success payload")
            acknowledgment.acknowledge()
        }
    }

    SimpMessageTemplate을 활용하여 해당 sessionId를 가진 유저에게 모든 서버가 메시지를 전송합니다.

    이제 동기화를 신경 쓸 필요가 사라지게 되었습니다.

    이때 consumerGroupId를 다르게 해주어야 여러대의 instance가 있을때 동기화가 가능합니다.

    '프로젝트 > WebSocket' 카테고리의 다른 글

    Spring WebSocket External Broker 적용(ActiveMQ)  (0) 2023.07.21
    RSocket이란?  (0) 2023.07.20
    WebSocket Scale Out - 이론편  (0) 2023.06.24
    TCP Socket vs WebSocket  (0) 2023.06.23
    Spring WebSocket 활용  (0) 2023.06.12

    댓글

Designed by Tistory.