kafka consumer offset option (latest vs earliest)
개요
만약 Consumer에서 동일한 토픽을 보고 다른 GroupId를 가진다면 새로운 Consumer가 생기는 경우 처음부터 메시지를 가져올 것이라 생각했고, 현재시점부터 메시지를 가져오기 원했습니다.
Consumer의 auto-offset-reset 옵션
- latest : 현재 시점부터 발생한 메시지부터 구독한다. (commit 관계없이 가장 최신)
- earliest : commit이 마지막으로 된 메세지부터 구독한다. (commit 기준으로 가장 오래된)
- none : 구독하고자 하는 topic의 offset 정보가 없으면 exception을 발생한다.
이미지로 살펴보기
consumer의 offset정보가 존재하지 않을 때 토픽의 가장 마지막 offset부터 읽습니다.
earliest 옵션에 대해 오해한 사항이 있습니다.
바로 Application이 구동될 때 마다 Kafka에 저장된 모든 record를 읽어온다고 착각하였습니다.
하지만 consumer group마다 offset의 위치를 각각 다르게 가지며 이전에 읽지 못한 메시지 중 가장 오래된 메시지부터 읽어오게 됩니다.
예를 들면 1~101번 record는 이미 offset을 읽어왔고 commit 되었다고 가정할 수 있습니다.
ConsumerConfig에 명시된 코드
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server
(e.g. because that data has been deleted):
<li>earliest: automatically reset the offset to the earliest offset
<li>latest: automatically reset the offset to the latest offset</li>
<li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li>
<li>anything else: throw exception to the consumer.</li></ul>";
auto.offset.reset 옵션으로부터 earliest, latest, none을 설정할 수 있습니다.
Docker-Compose를 활용하여 local에 kafka 띄우고 실행
#docker-compose.yml
version: '2'
services:
zookeeper:
container_name: local-zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: local-kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "testTopic:1:1" #topic이름:partition개수:replica개수
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
ealirest 옵션 & 서로 다른 consumer groupId
조건
- earliest 설정
- consumer Application 기동 시마다 서로 다른 groupId 설정
Instance1은 2개의 메시지를 publish 하고 Instance2는 Consume을 수행합니다.
가설으로는 Instance2를 새로 띄우면 메시지를 2개 컨슘 할 것이다.
실제로 동작시켜 보면 Instance2가 실행되자마자 메시지를 2개 컨슘해옵니다.
ealirest 옵션 & 서로 같은 consumer groupId
조건
- earliest 설정
- consumer Application 기동 시마다 서로 같은 test라는 groupId로 설정
이전에 Instance1을 통해 2개의 메시지를 publish 했던 상태에서 진행하였습니다.
첫 번째로 test라는 groupId로 consumer를 구동하면 2건의 메시지를 consume 합니다.
하지만 N번이상(2회 이상) 기동을 수행하면 이미 해당 groupId에 commit 이력이 적재되었기 때문에 0건의 메시지가 consume 됩니다.
lastest 옵션
조건
- lastest 설정
lastest 옵션의 경우에는 groupId가 같은지 다른지와 관계없이 메시지를 모두 consume 하지 않습니다.
메시지가 발행된 이후에 consumer Application이 기동 되기 때문에 읽어올 메시지가 없습니다.
Instance1에서 5개 메시지 publish 하고 Instance2는 Consume을 수행합니다.
Instance2 기동 시 아무런 메시지를 consume 하지 않습니다.
Instance2가 구동된 상태에서 Instance1에서 신규로 메시지를 발행 시 Instance2에서 메시지를 consume 합니다.
개인적인 생각
만약 lastest로 옵션을 설정한다면 Application이 장애로 잠시 죽었거나 배포시에 잠깐 비어있는 시간이 발생하는 경우 메시지가 유실될 수 있습니다.
ealirest옵션을 통해 commit을 적절하게 관리해줌으로 메시지 유실을 방지해야 할 것 같습니다.
참고자료
https://blog.voidmainvoid.net/305