프로젝트/kafka

Kafka Consumer를 On Off 할 수 있을까?

Junuuu 2023. 11. 26. 00:01
728x90

개요

Kafka를 활용하다가 잠시 DB의 장애가 발생하거나 Applicaion에 문제가 발생하는 경우 DLQ에 수십만 건의 데이터가 쌓여있을 수 있습니다.

이때 DLQ를 off 해두었다가 장애가 복구하면 on을 하여 활용하면 괜찮은 전략이 될 수 있을 것 같아 on, off 방법을 구성해보려 합니다.

 

 

Kafka Listener autoStartUp

@KafkaListener(id="assigned_listener_id", autoStartup = "false", topics = "topic-to-listen-to")
public void listen(Message message){
    // interesting message processing logic
}

consumer를 구성할때 autoSartUp을 false로 주면 listener가 시작되지 않습니다.

 

KafkaListenerEndpointRegistry

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

Container의 stop, start 등의 생명 주기를 관리하는 클래스입니다.

 

특정 리스너를 start 시키는 방법

/**
 * invoke this method to start the listener
 */
public void startListener(){
    kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").start();
}

assigned_listener_id에 해당하는 리스너를 시작시킵니다.

 

특정 리스너를 stop 시키는 방법

/**
 * invoke this method to stop the listener
 */
public void stopListener(){
    kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").stop(()->{
        log.info("Listener Stopped.");
    });
}

 

assigned_listener_id에 해당하는 리스너를 멈춥니다.

 

 

Controller로 요청받을 수 있도록 구성

@Slf4j
@RestController
@RequiredArgsConstructor
public class KafkaDLQManagementController {

    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @PostMapping("/developer/kafka/consumer/{consumer-id}")
    public ResponseEntity<String> validate(
            @PathVariable(name = "consumer-id") String consumerId,
            @RequestParam Boolean isOn
    ) {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
        if(listenerContainer == null){
            throw new IllegalArgumentException("consumer-id 가 존재하지 않습니다");
        }
        if (isOn) {
            listenerContainer.start();
        } else {
            listenerContainer.stop();
        }
        return ResponseEntity.ok("ok");
    }

}

 

 

 

 

참고자료

https://ananthasharma.medium.com/how-to-dynamically-start-stop-kafka-listener-in-springboot-apps-1e3da2140be1