kafka의 graceful shutdown
개요
Spring Kafka의 Consumer를 활용하다가 graceful shtudown을 어떻게 지원하는지 궁금해졌습니다.
만약 offset을 commit 하기 전에 SIGTERM, SIGINT와 같은 종료 요청이 오면 어떻게 될까요?
Shutdown Signal
- SIGKILL (9): SIGKILL signal 은 프로세스를 즉각 종료할 때 사용한다.
- SIGINT(2): 사용자의 요청으로 중단이 될 때 사용한다. (e.g., Ctrl+C).
- SIGTERM(15): SIGTERM signal 종료 요청을 보낸다, 요청을 보낼 뿐 SIGKILL은 아니다.
kafka의 immediate-stop 옵션
Container가 stop 되었을 때 현재 record를 처리하고 종료할지, 이전에 poll 한 모든 메시지를 처리하고 종료할지를 결정하는 옵션입니다.
기본값이 false입니다.
간단한 사전 지식
- max-poll-records : 토픽에서 한번에 당겨오는 레코드의 개수
- record를 하나 처리하는데 5초가 걸린다고 가정
- Spring Application을 종료할 때 timeout 시간은 spring.lifecycle.timeout-per-shutdown-phase의 기본값은 30초로 세팅되어 있다.
- SIGTERM을 호출하기 위해 kill -15 {PID} terminal 명령어를 날린다.
- docker exec -it local-kafka /bin/bash로 kafka container에 접속 후 /opt/kafka/bin으로 이동 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test6 --describe (현재 토픽 상황을 모니터링 하기 위해)
테스트 1
spring.kafka.consumer.max-poll-records=5
spring.kafka.listener.immediate-stop=false # 기본값
SIGTERM 호출 시 5개의 레코드를 모두 처리하고 오프셋을 커밋후 종료합니다.
테스트 2
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.immediate-stop=false # 기본값
SIGTERM 호출을 컨슈머는 poll()으로 가져온 모든 레코드를 처리합니다.
이때 기본값인 30초가 지나면 SpringApplicationShutdownHook 스레드에서 Failed to shut down 1 bean with phase value 2147483547 within timeout of 30000ms와 같이 timeout으로 인한 실패 로그를 확인할 수 있습니다.
timeout 이내에 처리하지 못한다면 메시지를 중복으로 처리하는 상황이 발생할 수 있으니 조심해야 합니다.
테스트 3
spring.kafka.consumer.max-poll-records=5
spring.kafka.listener.immediate-stop=true
immediate-stop 옵션을 true로 설정하는 경우, 현재 레코드가 처리된 이후에 종료 처리를 진행합니다.
예를 들어 N번째 레코드 처리 도중 SIGTERM 요청이 발생하면 N번째 레코드가 완전히 처리된 이후 종료됩니다.
graceful shutdown 원리
kafka가 종료될 때 AbstractMessageListenerContainer에서 stop 메서드가 호출됩니다.
public void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
doStop(callback);
}
else {
callback.run();
}
}
}
stop메서드를 내부적으로 타고 들어가다 보면 consumer.wakeup 메서드가 호출됩니다.
void wakeIfNecessaryForStop() {
if (this.polling.getAndSet(false)) {
this.consumer.wakeup();
}
}
Kafka가 종료될 때 KafkaMessageListenerContainer클래스의 consumer.wakeup 메서드가 호출됩니다.
wakeup 메서드란?
long polling을 중단시켜서 다음번 poll() 메서드가 실행될 때 WakeupException을 반환하도록 합니다.
이후에는 KafkaMessageListenerContainer의 run 메서드의 while(isRunning()) 구문이 false로 탈출되기 때문에 wrapUp 메서드가 호출됩니다.
이때 커밋을 수행하고 consumer를 unsubscribe합니다.
containeProperties의 shutdownTimeout 옵션
public final void stop(boolean wait) {
synchronized(this.lifecycleMonitor) {
if (this.isRunning()) {
if (wait) {
CountDownLatch latch = new CountDownLatch(1);
Objects.requireNonNull(latch);
this.doStop(latch::countDown);
try {
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
this.publishContainerStoppedEvent();
} catch (InterruptedException var6) {
Thread.currentThread().interrupt();
}
} else {
this.doStop(this::publishContainerStoppedEvent);
}
}
}
}
isRunning()이라면 stop을 호출하고 latch를 활용하여 10초간 기다리는것을 의미합니다.
즉, stop 메서드 호출에 대한 10초 타임아웃을 의미합니다.
graceful shutdown과는 관련이 없는 옵션이지만 네이밍때문에 헷갈려서 까먹지 않기 위해 정리해두려 합니다.
참고자료
https://dgle.dev/kafkashutodwn/
https://velog.io/@jaymin_e/Kafka-Java-Kafka-Consumer-%ED%8A%9C%ED%86%A0%EB%A6%AC%EC%96%BC
https://stackoverflow.com/questions/35566011/shutting-down-kafka-consumer