Study/Kafka
Spring Boot Kafka RebalanceListener 예시
728x90
반응형
@Slf4j
public class RebalanceListener<T> implements ConsumerRebalanceListener {
private KafkaConsumer<String, T> consumer;
private Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
public RebalanceListener(KafkaConsumer<String, T> consumer) {
this.consumer = consumer;
}
public void addOffset(String topic, int partition, long offset) {
// commitSync 실행 시 lastProcessedMessageOffset + 1로 설정
long nextOffset = offset + 1L;
this.currentOffset.put(new TopicPartition(topic, partition), new OffsetAndMetadata(nextOffset));
}
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffset(){
return this.currentOffset;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for(TopicPartition partition : partitions) {
log.debug("partition Revoked:{}", partition);
}
this.consumer.commitSync(this.currentOffset);
currentOffset.forEach( (key, value)
-> log.debug("onPartitionsRevoked. TopicPartition:{}, OffsetAndMetadata:{}", key, value) );
this.currentOffset.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition : partitions) {
log.debug("partition Assigned:{}", partition);
}
}
}
Rebalence Listener 등록하는 예시
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
RebalanceListener<String> rebalanceListener = new RebalanceListener<>(consumer);
consumer.subscribe(topicList, rebalanceListener);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
log.debug("listen:{}", record.toString());
// consume 된 message 처리
rebalanceListener.addOffset(record.topic(), record.partition(), record.offset());
} catch (Exception e) {
log.error("Consumer.listen() consumerRecord=" + record.toString(), e);
}
}
if (! records.isEmpty()){
consumer.commitSync();
log.debug("commit count:{}, partitions:{}", records.count(), records.partitions());
}
}
} catch (Exception e) {
log.error("Consumer Exception", e);
} finally {
try {
consumer.commitSync(rebalanceListener.getCurrentOffset());
rebalanceListener.getCurrentOffset().forEach( (key, value)
-> log.debug("finally commitSync. TopicPartition:{}, OffsetAndMetadata:{}", key, value) );
} finally {
consumer.close();
}
}
728x90
반응형
'Study > Kafka' 카테고리의 다른 글
Spring Boot Kafka Consumer 전략 (0) | 2023.12.07 |
---|---|
Spring Boot Kafka Producer 설정 (0) | 2023.12.07 |
springboot에서 kafka 연동 (2) | 2023.12.07 |
Kafka Topic (0) | 2023.12.07 |
Kafka 고가용성(HA)의 핵심 - Broker, Replication, ISR (2) | 2023.11.30 |
댓글