Consuming messages in batch in Kafka using @KafkaListener – Java

by
Ali Hasan
apache-kafka cucumber-java spring spring-boot spring-kafka

Quick Fix: This issue can be fixed by using the batch property. @KafkaListener in combination with ConsumerRecords will consume messages in batches. Refer https://docs.spring.io/spring-kafka/docs/2.8.4/api/ for more insights.

The Problem:

While consuming messages from a Kafka topic, the current approach processes them one by one, leading to inefficiencies. The goal is to enhance the consumer to handle messages in bulk, such as 500 messages at a time, using Spring Boot’s @KafkaListener annotation. Potential issues and considerations related to bulk processing need to be addressed.

The Solutions:

Solution 2: Configuring Batch Processing using @KafkaListener

To consume messages in batch using `@KafkaListener`, you can configure the `concurrentKafkaListenerContainerFactory` bean as follows:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyPojo> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MyPojo> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}

Setting `batchListener` to `true` enables batch processing. By default, batch processing is turned off.

Once batch processing is enabled, the `consumeEvents` method will be invoked with a list of messages instead of a single message:

@KafkaListener(id = "groupId", topics = "topic-name")
public void consumeEvents(List<MyPojo> myPojoItems) {
  // Process the messages in bulk
}

When using batch processing, it’s important to handle the case where a batch of messages contains a mix of valid and invalid messages. To handle this, you can use the `KafkaListenerErrorHandler` interface to provide custom error handling logic.

Solution 3: Batch Processing in Kafka Using @KafkaListener

To consume messages in batch using the @KafkaListener annotation, you can specify the max.poll.records property in your application configuration.

Configuration through application.yaml:

kafka:
  consumer:
    max-poll-records: 500  # can be a value greater than 1
  listener:
    type: batch
  topic:
    your-topic-name

Configuration through Code:

@SpringBootApplication
public class KafkaBatchProcessingApplication {
public static void main(String[] args) {
    SpringApplication.run(KafkaBatchProcessingApplication.class, args);
}

@KafkaListener(id = "groupId", topics = "topic-name")
public void consumeBatch(List<MyPojo> myPojoItems) {
    // Process messages in bulk
}

}

Note: When consuming messages in batch, it’s important to handle cases where the processing of a message fails. You should implement error handling mechanisms to avoid message loss or inconsistent state.

Solution 4: Using a Kafka Listener Container Factory

To consume messages in batch in Kafka using `@KafkaListener`, you can use a Kafka Listener Container Factory. A Kafka Listener Container Factory is responsible for creating and configuring the Kafka listener containers that will actually consume the messages. By using a Kafka Listener Container Factory, you can configure the batching behavior of the Kafka listener containers.

Here is an example of how to use a Kafka Listener Container Factory to consume messages in batch:

“`
@SpringBootApplication
public class KafkaStreamsApplication {

public static void main(String[] args) {
    SpringApplication.run(KafkaStreamsApplication.class, args);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setBatchListener(true);
    factory.setBatchHeaders(true);
    factory.setBatchTimeout(100);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
}

@KafkaListener(topics = "batch-test", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    acknowledgment.acknowledge();
}

}

<p>
In this example, the `KafkaListenerContainerFactory` is configured with the following settings:
</p>
<ul>
<li>`setBatchListener(true)`: This setting enables batch listening. When batch listening is enabled, the Kafka listener container will consume messages in batches.</li>
<li>`setBatchHeaders(true)`: This setting enables the inclusion of batch headers in the Kafka listener method. Batch headers contain information about the batch, such as the number of messages in the batch and the size of the batch.</li>
<li>`setBatchTimeout(100)`: This setting specifies the maximum amount of time that the Kafka listener container will wait for new messages to arrive before sending the current batch to the listener method.</li>
<li>`getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)`: This setting specifies that the Kafka listener container should acknowledge messages manually. This means that the listener method must call the `acknowledgment.acknowledge()` method to acknowledge the messages that it has processed.</li>
</ul>

<p>
The Kafka listener method `listen()` is annotated with the `@KafkaListener` annotation. The `topics` attribute specifies the topic that the listener should consume messages from. The `containerFactory` attribute specifies the Kafka Listener Container Factory that should be used to create the Kafka listener container.
</p>
<p>
In the Kafka listener method, the `records` parameter contains the list of messages that were consumed in the batch. The `acknowledgment` parameter is used to acknowledge the messages that have been processed.
</p>

Q&A

Can you help me to consume messages in batch in Kafka using @KafkaListener

Sure!

Can you give me an example of using batch in @KafkaListener

Certainly.

Are there any potential issues or considerations when consuming messages in bulk like this?

Yes, there are a few things to keep in mind.

Video Explanation:

The following video, titled "Spring Boot with Spring Kafka Consumer Example | Tech Primers ...", provides additional insights and in-depth exploration related to the topics discussed in this post.

Play video

Apache Kafka® Consumer Example using SpringBoot 3 | Consumer Groups | Java Techie ... messages from Kafka Topics. The Java Tech Learning•6.4K ...