1. Introduce dependencies
You need to add spring-kafka related dependencies in
<dependencies> <!-- Spring Boot Web --> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Kafka --> <dependency> <groupId></groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Spring Boot Starter for Logging (optional but useful for debugging) --> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <!-- Spring Boot Starter for Testing --> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2. Add Kafka configuration
Configure the Kafka connection properties in the or file:
spring: kafka: bootstrap-servers: localhost:9092 # Kafka server address consumer: group-id: my-consumer-group # Consumer group ID auto-offset-reset: earliest # The consumer reads from scratch (if there is no submitted offset) key-deserializer: # Set the deserializer of the key value-deserializer: # Set the deserializer of value to a string listener: missing-topics-fatal: false # If the theme does not exist,No fatal errors thrown
-servers=localhost:9092 -id=my-consumer-group -offset-reset=earliest -topics-fatal=false -deserializer= # Set the deserializer of the key-deserializer= # set upvalueThe deserializer is a string
Note: -deserializer= # Set the deserializer of the key
-deserializer= # Set the deserializer of value to a string
The above configuration shows that the data produced by Kafka is a json string, so the data received by consumption is also a json string by default. If you want to accept the message using an object, you need to customize the serializer, such as the following configuration
spring: kafka: producer: key-serializer: # Use StringSerializer for Key value-serializer: # Use ErrorHandlingSerializer for Value properties: : # Default JSON Deserialization target type is Order
3. Create Kafka Consumers
Create a Kafka consumer class to process messages. You can use the @KafkaListener annotation to listen for messages in Kafka
(I) The message produced by Kafka is a JSON string
1. Method 1
If the message is a JSON string, you can use StringDeserializer to get the message and then use ObjectMapper to convert it to
Java objects (such as Order).
import ; import ; import ; @Service @EnableKafka // Enable Kafka Consumerpublic class KafkaConsumer { private final ObjectMapper objectMapper = new ObjectMapper(); // Listen to the order-topic topic in Kafka @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(String message) { try { // Deserialize the JSON string into an Order object Order order = (message, ); ("Received order: " + order); } catch (Exception e) { (); } } }
@KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics represent the Kafka topic to be listened to, and groupId represents the consumer group to which the consumer belongs.
listen(String message): This method is called to process each message received. In this example, we print out the message content.
2. Method 2: You need to directly access the message metadata
Kafka messages can be received through ConsumerRecord. This method is suitable for direct access to message metadata (e.g.
The scenarios of topic, partition, offset) are also suitable for manually managing message consumption and offset submission.
import ; import ; import ; @Service public class KafkaConsumer { // Listen to the order-topic topic in Kafka @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(ConsumerRecord<String, String> record) { // Get detailed information about the message String key = (); // Get the key of the message String value = (); // Get the value of the message String topic = (); // Get the topic of the message int partition = (); // Get the partition of the message long offset = (); // Get the offset of the message long timestamp = (); // Get the timestamp of the message // Process the message (here we just print the message) ("Consumed record: "); ("Key: " + key); ("Value: " + value); ("Topic: " + topic); ("Partition: " + partition); ("Offset: " + offset); ("Timestamp: " + timestamp); } }
(II) The message produced by Kafka is an object Order
import ; import ; import ; @Service public class KafkaConsumer { // Listen to the order-topic topic in Kafka @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(ConsumerRecord<String, Order> record) { // Get detailed information about the message String key = (); // Get the key of the message Order value = (); // Get the value of the message String topic = (); // Get the topic of the message int partition = (); // Get the partition of the message long offset = (); // Get the offset of the message long timestamp = (); // Get the timestamp of the message // Process the message (here we just print the message) ("Consumed record: "); ("Key: " + key); ("Value: " + value); ("Topic: " + topic); ("Partition: " + partition); ("Offset: " + offset); ("Timestamp: " + timestamp); } }
4. Create Startup Class
Make sure your Spring Boot startup class is properly configured with Spring Boot application startup.
import ; import ; @SpringBootApplication public class KafkaConsumerApplication { public static void main(String[] args) { (, args); } }
5. Configure Kafka Producer (optional)
(I) The message type is json string
import ; import ; import ; import ; import ; @Service @EnableKafka public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; // Send a String type message private ObjectMapper objectMapper = new ObjectMapper(); // Jackson ObjectMapper is used for serialization // Send order to Kafka public void sendOrder(String topic, Order order) { try { // Convert the Order object to a JSON string String orderJson = (order); // Send JSON string to Kafka (topic, orderJson); // Send string message ("Order JSON sent to Kafka: " + orderJson); } catch (Exception e) { (); } } }
(II) The message type is an object Order
import ; import ; import ; import ; @Service @EnableKafka public class KafkaProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; // Send order to Kafka public void sendOrder(String topic, Order order) { (topic, order); // Send an order object, Spring Kafka will automatically convert Order to JSON } }
6. Start Kafka Service
Start Kafka Service
bin/ config/
7. Test Kafka Consumers
You can test whether the consumer is working properly by sending a message to Kafka. Suppose you have created a topic in Kafka called my-topic and can use KafkaProducer to send messages:
import ; import ; import ; @RestController public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @GetMapping("/sendOrder") public String sendOrder() { Order order = new Order(); (1L); (123L); ("Laptop"); (2); ("Created"); ("order-topic", order); return "Order sent!"; } }
When you access the /sendOrder endpoint, KafkaProducer will send the message to Kafka, and KafkaConsumer will receive the message and print it out.
9. Testing and debugging
You can make sure that messages have been successfully consumed by viewing the Kafka consumer log. You can also use KafkaTemplate to send messages and ensure that the connection between Kafka producers and consumers is normal.
10. Conclusion
At this point, you have successfully configured and implemented Kafka consumers and producers in Spring Boot. You can extend features as needed, such as handling more complex message types, batch consumption, etc.
This is the end of this article about how Springboot projects consume Kafka data. For more related content on Springboot consumption Kafka data, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!