프로젝트/kafka
Kafka Consumer를 On Off 할 수 있을까?
Junuuu
2023. 11. 26. 00:01
728x90
개요
Kafka를 활용하다가 잠시 DB의 장애가 발생하거나 Applicaion에 문제가 발생하는 경우 DLQ에 수십만 건의 데이터가 쌓여있을 수 있습니다.
이때 DLQ를 off 해두었다가 장애가 복구하면 on을 하여 활용하면 괜찮은 전략이 될 수 있을 것 같아 on, off 방법을 구성해보려 합니다.
Kafka Listener autoStartUp
@KafkaListener(id="assigned_listener_id", autoStartup = "false", topics = "topic-to-listen-to")
public void listen(Message message){
// interesting message processing logic
}
consumer를 구성할때 autoSartUp을 false로 주면 listener가 시작되지 않습니다.
KafkaListenerEndpointRegistry
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
Container의 stop, start 등의 생명 주기를 관리하는 클래스입니다.
특정 리스너를 start 시키는 방법
/**
* invoke this method to start the listener
*/
public void startListener(){
kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").start();
}
assigned_listener_id에 해당하는 리스너를 시작시킵니다.
특정 리스너를 stop 시키는 방법
/**
* invoke this method to stop the listener
*/
public void stopListener(){
kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").stop(()->{
log.info("Listener Stopped.");
});
}
assigned_listener_id에 해당하는 리스너를 멈춥니다.
Controller로 요청받을 수 있도록 구성
@Slf4j
@RestController
@RequiredArgsConstructor
public class KafkaDLQManagementController {
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@PostMapping("/developer/kafka/consumer/{consumer-id}")
public ResponseEntity<String> validate(
@PathVariable(name = "consumer-id") String consumerId,
@RequestParam Boolean isOn
) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if(listenerContainer == null){
throw new IllegalArgumentException("consumer-id 가 존재하지 않습니다");
}
if (isOn) {
listenerContainer.start();
} else {
listenerContainer.stop();
}
return ResponseEntity.ok("ok");
}
}
참고자료