-
Kafka Consumer를 On Off 할 수 있을까?프로젝트/kafka 2023. 11. 26. 00:01728x90
개요
Kafka를 활용하다가 잠시 DB의 장애가 발생하거나 Applicaion에 문제가 발생하는 경우 DLQ에 수십만 건의 데이터가 쌓여있을 수 있습니다.
이때 DLQ를 off 해두었다가 장애가 복구하면 on을 하여 활용하면 괜찮은 전략이 될 수 있을 것 같아 on, off 방법을 구성해보려 합니다.
Kafka Listener autoStartUp
@KafkaListener(id="assigned_listener_id", autoStartup = "false", topics = "topic-to-listen-to") public void listen(Message message){ // interesting message processing logic }
consumer를 구성할때 autoSartUp을 false로 주면 listener가 시작되지 않습니다.
KafkaListenerEndpointRegistry
@Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
Container의 stop, start 등의 생명 주기를 관리하는 클래스입니다.
특정 리스너를 start 시키는 방법
/** * invoke this method to start the listener */ public void startListener(){ kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").start(); }
assigned_listener_id에 해당하는 리스너를 시작시킵니다.
특정 리스너를 stop 시키는 방법
/** * invoke this method to stop the listener */ public void stopListener(){ kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").stop(()->{ log.info("Listener Stopped."); }); }
assigned_listener_id에 해당하는 리스너를 멈춥니다.
Controller로 요청받을 수 있도록 구성
@Slf4j @RestController @RequiredArgsConstructor public class KafkaDLQManagementController { private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @PostMapping("/developer/kafka/consumer/{consumer-id}") public ResponseEntity<String> validate( @PathVariable(name = "consumer-id") String consumerId, @RequestParam Boolean isOn ) { MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId); if(listenerContainer == null){ throw new IllegalArgumentException("consumer-id 가 존재하지 않습니다"); } if (isOn) { listenerContainer.start(); } else { listenerContainer.stop(); } return ResponseEntity.ok("ok"); } }
참고자료
'프로젝트 > kafka' 카테고리의 다른 글
우리는 왜 spring kafka batch listener를 활용하지 않나요? (0) 2024.12.01 spring kafka auto-commit 동작원리 : 데이터 유실과 중복처리 (6) 2024.10.13 Kafka Binder란 무엇인가? (0) 2023.11.21 Dead Letter Queue란 무엇인가? (0) 2023.11.16 kafka Consumer Lag 모니터링하기 (0) 2023.11.15