ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Consumer를 On Off 할 수 있을까?
    프로젝트/kafka 2023. 11. 26. 00:01
    728x90

    개요

    Kafka를 활용하다가 잠시 DB의 장애가 발생하거나 Applicaion에 문제가 발생하는 경우 DLQ에 수십만 건의 데이터가 쌓여있을 수 있습니다.

    이때 DLQ를 off 해두었다가 장애가 복구하면 on을 하여 활용하면 괜찮은 전략이 될 수 있을 것 같아 on, off 방법을 구성해보려 합니다.

     

     

    Kafka Listener autoStartUp

    @KafkaListener(id="assigned_listener_id", autoStartup = "false", topics = "topic-to-listen-to")
    public void listen(Message message){
        // interesting message processing logic
    }

    consumer를 구성할때 autoSartUp을 false로 주면 listener가 시작되지 않습니다.

     

    KafkaListenerEndpointRegistry

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    Container의 stop, start 등의 생명 주기를 관리하는 클래스입니다.

     

    특정 리스너를 start 시키는 방법

    /**
     * invoke this method to start the listener
     */
    public void startListener(){
        kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").start();
    }

    assigned_listener_id에 해당하는 리스너를 시작시킵니다.

     

    특정 리스너를 stop 시키는 방법

    /**
     * invoke this method to stop the listener
     */
    public void stopListener(){
        kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").stop(()->{
            log.info("Listener Stopped.");
        });
    }

     

    assigned_listener_id에 해당하는 리스너를 멈춥니다.

     

     

    Controller로 요청받을 수 있도록 구성

    @Slf4j
    @RestController
    @RequiredArgsConstructor
    public class KafkaDLQManagementController {
    
        private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        @PostMapping("/developer/kafka/consumer/{consumer-id}")
        public ResponseEntity<String> validate(
                @PathVariable(name = "consumer-id") String consumerId,
                @RequestParam Boolean isOn
        ) {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
            if(listenerContainer == null){
                throw new IllegalArgumentException("consumer-id 가 존재하지 않습니다");
            }
            if (isOn) {
                listenerContainer.start();
            } else {
                listenerContainer.stop();
            }
            return ResponseEntity.ok("ok");
        }
    
    }

     

     

     

     

    참고자료

    https://ananthasharma.medium.com/how-to-dynamically-start-stop-kafka-listener-in-springboot-apps-1e3da2140be1

     

    댓글

Designed by Tistory.