Study/Kafka

Spring Boot Kafka RebalanceListener 예시

hongeeii 2023. 12. 7.
728x90
반응형

 

@Slf4j
public class RebalanceListener<T> implements ConsumerRebalanceListener {

    private KafkaConsumer<String, T> consumer;
    private Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();

    public RebalanceListener(KafkaConsumer<String, T> consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        // commitSync 실행 시 lastProcessedMessageOffset + 1로 설정
        long nextOffset = offset + 1L;
        this.currentOffset.put(new TopicPartition(topic, partition),  new OffsetAndMetadata(nextOffset));
    }

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffset(){
        return this.currentOffset;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for(TopicPartition partition : partitions) {
            log.debug("partition Revoked:{}", partition);
        }
        this.consumer.commitSync(this.currentOffset);
        currentOffset.forEach( (key, value)
                -> log.debug("onPartitionsRevoked. TopicPartition:{}, OffsetAndMetadata:{}", key, value) );
        this.currentOffset.clear();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for(TopicPartition partition : partitions) {
            log.debug("partition Assigned:{}", partition);
        }
    }
}

 

Rebalence Listener 등록하는 예시

	KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);

        RebalanceListener<String> rebalanceListener = new RebalanceListener<>(consumer);
        consumer.subscribe(topicList, rebalanceListener);

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    try {
                        log.debug("listen:{}", record.toString());
                        
                        // consume 된 message 처리

                        rebalanceListener.addOffset(record.topic(), record.partition(), record.offset());
                    } catch (Exception e) {
                        log.error("Consumer.listen() consumerRecord=" + record.toString(), e);
                    }
                }

                if (! records.isEmpty()){
                    consumer.commitSync();
                    log.debug("commit count:{}, partitions:{}", records.count(), records.partitions());
                }
            }
        } catch (Exception e) {
            log.error("Consumer Exception", e);
        } finally {
            try {
                consumer.commitSync(rebalanceListener.getCurrentOffset());
                rebalanceListener.getCurrentOffset().forEach( (key, value)
                        -> log.debug("finally commitSync. TopicPartition:{}, OffsetAndMetadata:{}", key, value) );
            } finally {
                consumer.close();
            }
        }
728x90
반응형

'Study > Kafka' 카테고리의 다른 글

[Kafka] Transactional OUTBOX Pattern을 활용한 이벤트 처리  (0) 2025.03.15
Spring Boot Kafka Consumer 전략  (0) 2023.12.07
Spring Boot Kafka Producer 설정  (0) 2023.12.07
springboot에서 kafka 연동  (2) 2023.12.07
Kafka Topic  (0) 2023.12.07

추천 글