Categories
Development

Consume minimum N number of messages once from Kafka with KafkaListener

I need to consumer minimum N number of messages once from Kafka with KafkaListener Below are the Consumer configurations Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, “batch”); // maximum records per poll props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, “10”);

Categories
Development

I need to consumer minimum N number of messages once from Kafka with KafkaListener

Below are the Consumer configurations Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, “batch”); // maximum records per poll props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, “10”);

Categories
Development

with Kafka Manual acknowledge getting “Kafka Listner Exception : Commit cannot be completed” occasionally

We have the Kafka Consumer (concurrency of 5) with Manual ack. With the below implementation, sometimes getting the exception Commit cannot be completed since the group has already rebalanced … In the Exception scenario, the message is not acknowledged and it is getting consumed once again. Any suggestions on the configuration changes with out impacting […]

Categories
Development

Spring Kafka SeekToCurrentErrorHandler maxFailures doesn’t work when concurrency level is less than partition number

I am using spring kafka 2.2.7 and my consumer configuration code is following: @Slf4j @Configuration @EnableKafka public class KafkaConfiguration { @Bean ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // less than number of partition, will do infinite retry factory.setConcurrency(1); SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> { LOGGER.info(“***in error handler data, […]