Kafka Consumer 성능 올리기
개요
Kafka와 같은 Message Queue의 장점 중 하나로는 중간다리에 트래픽을 저장해 놓을 수 있기 때문에 서버가 필요한 만큼만 처리할 수 있습니다.
만약 Producer가 메시지를 생산해 내는 양에 비해 Consumer가 메시지를 처리하는 양이 너무 느린 경우는 어떻게 처리할 수 있을까요?
간단하게 Consumer의 개수를 그냥 늘리면 안 될까요?
Producer는 10개의 메시지 발생, Consumer는 5초에 한 개씩 처리하는 구조 만들기
producer는 메시지를 한번 보낼 때 10번 보내기
@Component
class KafkaProducer(
private val kafkaTemplate: KafkaTemplate<String, TestDto>
) {
fun send(topic: String) {
repeat(10){
logger.info {"kafka Producer start"}
val message = TestDto(col1 = it.toString(), col2 = it.toString())
logger.info {"topic & payload: $topic , $message"}
kafkaTemplate.send("test", message)
}
}
}
repeat을 통해 kafka에 10번의 데이터를 보냅니다.
consumer는 요청을 5초에 하나씩 처리한다.
@Component
class KafkaConsumer {
val logger = KotlinLogging.logger {}
@KafkaListener(
topics = ["test"],
groupId = "#{ T(java.util.UUID).randomUUID().toString() }",
containerFactory = "myContainerFactory"
)
fun consume(
@Payload payload: TestDto,
acknowledgment: Acknowledgment,
) {
logger.info("KafkaConsumer start: $payload")
sleep(5000)
logger.info("KafkaConsumer end: $payload")
acknowledgment.acknowledge()
}
}
sleep을 통해 5초의 처리시간을 가지고, 이후에 acknowledgement를 통해 다음 메시지를 가져옵니다.
28초 -> 33초 -> 38초 -> 43초 -> 48초 간격으로 로그가 남으며 5초에 하나씩 메시지를 consume 하는 모습을 볼 수 있습니다.
단순히 Consumer를 2대로 늘리면 해결될까?
kafka에서는 topic내부에 Partition이란 개념이 존재합니다.
Partition은 하나의 Consumer만 접근이 가능합니다.
예를 들어 위의 그림을 예시로 보면 topic에 partition은 2개인데 consumer를 3개로 늘려봤자 consumer3은 놀고 있게 됩니다.
Partition개수를 늘리고 Consumer개수를 같이 맞춰주자
하지만 Partition은 개수는 늘릴 수만 있고 줄일 수 없습니다.
따라서 partition의 개수를 늘리는 것은 신중하게 고려해야 합니다.
Partition 개수를 늘리지 않고 Consumer의 성능을 높이는 방법
관련된 내용으로 다음과 같은 서치들을 해보았습니다.
- ConcurrentKafkaListenerContainerFactory의 setConcurrency 높이기
- 이 옵션은 Partition 개수를 늘려야 의미가 있음
- Confluent의 parallel-consumer
- Spring에 적용하는 예제들이 없고 관리하기 힘들어 보임
- cpu 부하 때문이 아닌 blocking이 많이 걸려서 느려지는 상황이라면 consume은 빠르게 처리하고 후 처리 수행
- 예를 들어 우선 수신받아 event를 DB에 저장하고 처리가 완료되면 상태 update
- BatchListener 활용하는 방안
- N개의 레코드를 동시에 가져오는 방안 + Worker Thread를 활용하여 처리
Spring Event Listener를 활용하여 Consumer의 성능을 높여보자
@Component
class KafkaConsumeEventListener {
@EventListener
@Async
fun processKafkaConsumeEvent(event: KafkaConsumeEvent){
logger.info { "processKafkaConsumeEvent start: $event" }
sleep(5000)
logger.info { "processKafkaConsumeEvent end: $event" }
}
}
data class KafkaConsumeEvent(
val id: String,
)
@Async와, @SpringBootApplication부분에 @EnableAsync를 달지 않으면 기존과 동일하게 동기식으로 동작합니다.
물론 중간에 예외가 발생했을 때 내부적으로 재처리를 통해 메시지가 처리되는 것이 보장되어야 합니다.
이제 다시 메시지를 발행해 보면 26초에 첫 메시지를 읽기 시작하고 31초에 모든 메시지를 처리하여 processKafkaConsumeEvent end로그를 확인할 수 있습니다.
참고자료
https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
https://bebong.tistory.com/entry/Kafka-Consumer-Design
https://velog.io/@courage331/Kafka-Partition%EA%B3%BC-Consumer%EC%9D%98-%EA%B4%80%EA%B3%84