Study/Kafka

Spring Boot Kafka RebalanceListener 예시

hongeeii 2023. 12. 7.
728x90
반응형

 

@Slf4j
public class RebalanceListener<T> implements ConsumerRebalanceListener {

    private KafkaConsumer<String, T> consumer;
    private Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();

    public RebalanceListener(KafkaConsumer<String, T> consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        // commitSync 실행 시 lastProcessedMessageOffset + 1로 설정
        long nextOffset = offset + 1L;
        this.currentOffset.put(new TopicPartition(topic, partition),  new OffsetAndMetadata(nextOffset));
    }

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffset(){
        return this.currentOffset;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for(TopicPartition partition : partitions) {
            log.debug("partition Revoked:{}", partition);
        }
        this.consumer.commitSync(this.currentOffset);
        currentOffset.forEach( (key, value)
                -> log.debug("onPartitionsRevoked. TopicPartition:{}, OffsetAndMetadata:{}", key, value) );
        this.currentOffset.clear();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for(TopicPartition partition : partitions) {
            log.debug("partition Assigned:{}", partition);
        }
    }
}

 

Rebalence Listener 등록하는 예시

	KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);

        RebalanceListener<String> rebalanceListener = new RebalanceListener<>(consumer);
        consumer.subscribe(topicList, rebalanceListener);

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    try {
                        log.debug("listen:{}", record.toString());
                        
                        // consume 된 message 처리

                        rebalanceListener.addOffset(record.topic(), record.partition(), record.offset());
                    } catch (Exception e) {
                        log.error("Consumer.listen() consumerRecord=" + record.toString(), e);
                    }
                }

                if (! records.isEmpty()){
                    consumer.commitSync();
                    log.debug("commit count:{}, partitions:{}", records.count(), records.partitions());
                }
            }
        } catch (Exception e) {
            log.error("Consumer Exception", e);
        } finally {
            try {
                consumer.commitSync(rebalanceListener.getCurrentOffset());
                rebalanceListener.getCurrentOffset().forEach( (key, value)
                        -> log.debug("finally commitSync. TopicPartition:{}, OffsetAndMetadata:{}", key, value) );
            } finally {
                consumer.close();
            }
        }
728x90
반응형

'Study > Kafka' 카테고리의 다른 글

Spring Boot Kafka Consumer 전략  (0) 2023.12.07
Spring Boot Kafka Producer 설정  (0) 2023.12.07
springboot에서 kafka 연동  (2) 2023.12.07
Kafka Topic  (0) 2023.12.07
Kafka 고가용성(HA)의 핵심 - Broker, Replication, ISR  (2) 2023.11.30

추천 글