프로젝트/kafka

우리는 왜 spring kafka batch listener를 활용하지 않나요?

Junuuu 2024. 12. 1. 23:38
반응형

개요

신규입사자분께서 우리는 왜 kafka message를 consume 할 때 batch listener를 활용하지 않는지, 그 선택에서 어떤 히스토리와 결정이 있었는지 물으셨습니다.

 

단건씩 메시지를 받지 않고 batch listener를 활용하여 한번에 여러 건의 메시지를 받으면 네트워크 레이턴시를 아낄 수 있을 것 같다는 의견을 주셨습니다.

 

질문 주신 내용이 합리적인 생각이라는 들었지만 batch listener에 대한 지식이 없었기 때문에 why에 답변을 할 수 없었습니다.

 

그에 대한 답을 찾기 위한 여정을 기록해보고자 합니다.

 

 

Batch Listener란?

@Bean
fun myContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, TestDto> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, TestDto>()
    factory.consumerFactory = consumerFactory()
    factory.containerProperties.ackMode = ContainerProperties.AckMode.BATCH
    factory.isBatchListener = true // <<<< true 세팅 필요
    factory.containerProperties.idleBetweenPolls = 3000L // << 각각의 poll을 얼마나 대기할지
    return factory
}

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

 

batch는 일괄이라는 뜻을 가지고 있으며 batch listener는 단건씩 메시지를 처리하지 않고 일괄적(여러건)으로 메시지를 처리함을 의미합니다.

 

batch listener와 대조되는 개념이 바로 메시지를 건건이 처리하는 record listeners입니다.

 

spring kafka에서는 위와 같이 ContainerFactory의 몇 가지 설정으로 batch listener를 활용할 수 있습니다.

 

일괄적으로 메시지를 받아오고 처리하기 때문에 consumer에서 메시지를 받아오는 poll()과 consumer에 메시지를 읽었다고 commit()을 수행할 때 네트워크 레이턴시를 효과적으로 줄일 수 있는 방법일 것 같습니다.

 

batch listener 설정 후 consume 하는 메시지를 디버깅해 보면 아래와 같이 10개의 메시지를 한 번에 가져와서 처리하게 됩니다.

 

 

Batch listener 동작원리 파악해 보기

옵션을 하나 넣어주면 동작하는 batch listener는 어떻게 동작하는지도 잠깐 살펴보겠습니다.

 

메시지를 처리하는 부분에 디버깅 포인트를 찍고  stack trace를 확인해 보면 클래스 혹은 함수의 이름에 Batch라고 명명된 부분들이 보입니다.

 

그리고 batch라는 키워드는 KafkaMessageListenerContainer의 invokeListener 이후부터 시작됨을 확인해 볼 수 있습니다.

 

 

실제로 코드상에서 isBatchListener인 경우에는 invokeBatchListener가 활용되고 그렇지 않은 경우에는 invokeRecordListener가 호출됩니다.

 

batch listener가 무엇인지 어떤 옵션으로 활성화 할 수 있는지 어떻게 구현되어 있는지 확인해보았습니다.

 

그러면 batch listener를 활용하면 메시지를 주고 받는 과정에서 네트워크 레이턴시를 아낄 수 있을까요?

 

Record Listeners와 max-poll-records 옵션

batch listener를 활용하면 일괄적으로 메시지를 처리할 수 있다고 합니다.

 

그렇다면 Record Listeners를 활용하게 되었을 때 poll()이 한번 호출될 때 가져오는 메시지의 개수를 설정하는 옵션을 10, 50 등으로 설정할 수 있는데, 이 설정은 일괄 메시지 처리와 어떤 연관이 있을까요?

 

max-poll-records 옵션을 10으로 설정하게 되면 poll()이 한번 호출될 때 kafka consumer에서 10개의 메시지를 읽어옵니다.

 

이때 record listeners를 활용한다면 10개의 메시지를 for 문을 돌면서 메시지를 단건씩 처리하게 됩니다.

이후 commit 전략에 따라 메시지가 모두 처리될 때 commit을 호출하거나 단건씩 commit을 하기도 합니다.

 

따라서 record listener를 활용하더라도 설정에 따라 1번의 poll() 호출로 10개의 메시지를 가져올 수 있습니다.

 

즉, max-poll-records 옵션은 batch listener, record listener 와는 연관이 없는 독립적인 옵션입니다.

 

여기서 max-poll-records를 늘리면 네트워크 레이턴시를 아낄 수 있겠다는 생각이 들 수 있습니다.

 

하지만 실제로는 max-poll-records 옵션은 네트워크 레이턴시와 연관이 없습니다.

 

kafka는 Fetcher 라는 클래스가 브로커로부터 데이터를 가져오는 역할을 수행합니다.

 

이때 Fetcher는 내부적으로 캐싱을 활용하여 이미 가져온 데이터가 있는 경우에는 max.poll.records 설정 값만큼 반환하며 데이터가 없는 경우에는 브로커로부터 데이터를 가져오기 위해 대기합니다.

 

kafka 공식문서에서도 해당 값과 fetching은 영향이 없다고 설명합니다.

 

max-poll-records 옵션보다는 Fetcher 클래스의 옵션을 조정하게 된다면 브로커에서 메시지를 가져올 때 네트워크 레이턴시를 효율적으로 관리할 수 있을 것 같습니다.

 

Commit 관점에서 Batch Listener의 네트워크 레이턴시

Consumer에서는 kafka broker에게 메시지를 읽었다고 알려주기 위해 commit()을 호출합니다.

 

1건씩 commit()을 수행하지 않고 batch listener를 활용하여 일괄적으로 commit()을 수행한다면 네트워크 레이턴시를 줄일 수 있지 않을까요?

 

여기어때 기술블로그에서 참고한 단순 로그만 찍는 consumer에서의 성능테스트는 아래와 같습니다.

https://techblog.gccompany.co.kr/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EA%B7%B8%EB%A3%B9-%EB%A6%AC%EB%B0%B8%EB%9F%B0%EC%8B%B1-kafka-consumer-group-rebalancing-5d3e3b916c9e

 

위에서 알아본것 처럼 max.poll.records 옵션은 성능향상에 영향을 주지 않으며, AckMode.BATCH 이므로 commit()의 묶음에 따른 네트워크 레이턴시 효율성을 보여줄 수 있을 것 같습니다.

(다만 max.poll.records 옵션이 커지면서 Fetcher에 캐싱된 데이터가 없어 브로커로부터 데이터를 가져온다면 추가시간이 소요되어 무시하긴 어려울 것 같습니다.)

 

max.poll.records=1일 때 메시지 하나당 처리시간은 약 25ms
max.poll.records=500일 때 메시지 하나당 처리시간은 약 0.1ms

 

위 결과를 통하여 여러건 묶음처리를 하면 성능향상이 있음을 확인할 수 있습니다.

 

하지만 실제 서비스의 컨슈머 로직에서는 IO 작업이 포함되어 있어 일반적으로 더 오랜 시간이 필요할 수 있습니다.

 

만약 관리하는 서비스의 컨슈머 로직이 기존이 1000ms가 걸리는 경우라면 max.poll.records=1로 설정하는 것이 더 장점이 많을 수 있습니다.

 

예를 들어 max.poll.records가 큰 경우 poll() 한번 당 메시지를 처리하는데 시간이 오래 걸려 리밸런싱이 일어나지 않도록 유의하거나, 배포나 장애시 리밸런싱에 소요되는 시간이 길어질 수 있습니다.

 

하지만 기존 컨슈머 로직이 10ms가 걸린다면 max.poll.records=100으로 설정하여 성능을 높이는 게 더 좋은 선택일 수 있습니다.

 

그러므로 max.poll.records 값에 따른 트레이드오프를 고려해서 서비스의 특성에 맞게 조정이 필요합니다.

 

 

그렇다면 우리는 왜 spring kafka batch listener를 활용하지 않나요?

이제는 위에 학습한 개념들을 기반으로 대답해 볼 수 있을 것 같습니다.

 

현재 서비스에서는 record listner를 활용하며 commit 전략으로는 auto-commit을 활용하고 max.poll.records는 1로 활용하고 있습니다.

 

max.poll.records 를 1로 세팅한 이유는 문제 발생 시 중복실행 되는 메시지를 최소화하기 위한 세팅이며, 중복처리가 잘되어 있다면 늘려도 무방합니다. 

 

max.poll.records와 poll() 성능은 관계없기 때문에 poll()의 네트워크 레이턴시를 줄이기 위해서는 Fetcher의 옵션을 조절해야 할 것 같습니다.

 

commit 시 발생할 수 있는 네트워크 레이턴시에 대해서도 이야기해 보면 spring-kafka 에서 auto-commit을 활용하면 특정 interval이 지났는가를 기준으로 commit을 묶어서 수행하기 때문에 1건의 메시지를 처리하는 시간이 commit interval을 초과하지 않는다면 여러 건의 commit을 묶어서 수행할 수 있어 메시지의 처리속도가 빠르다면 네트워크 레이턴시도 자연스럽게 감소될 수 있습니다.

 

현재 설정상 poll() 당 메시지 1건씩 처리하기 때문에 메시지가 빨리 처리될수록 묶어서 처리하는 commit 수가 늘어날 수 있습니다.

 

또한 현재 구조에서 파티션을 증설하지 않고 컨슈머의 성능향상을 위해 내부적으로 순서보장이 가능한 스레드풀을 할당하여 위임한 후 commit을 수행하기 때문에 commit은 매우 빠른주기로 수행됩니다.

 

이로인해 실제 메시지의 처리속도가 느리더라도 commit interval 동안 여러건을 모아서 commit 할 수 있습니다.

 

따라서 만약 commit에 대한 네트워크 레이턴시를 줄이고 싶다면 commit interval을 늘려볼 수 있을 것 같습니다.

 

 

참고자료

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.html

https://helloworld.kurly.com/blog/search-system-with-kafka-1/

https://developer-pi.tistory.com/388

https://d2.naver.com/helloworld/0974525

https://techblog.gccompany.co.kr/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%BB%A8%EC%8A%88%EB%A8%B8-%EA%B7%B8%EB%A3%B9-%EB%A6%AC%EB%B0%B8%EB%9F%B0%EC%8B%B1-kafka-consumer-group-rebalancing-5d3e3b916c9e