SoFunction
Updated on 2025-03-05

Methods for Springboot project to consume Kafka data

1. Introduce dependencies

You need to add spring-kafka related dependencies in

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Spring Kafka -->
    <dependency>
        <groupId></groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Logging (optional but useful for debugging) -->
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Testing -->
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. Add Kafka configuration

Configure the Kafka connection properties in the or file:

Example:

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka server address    consumer:
      group-id: my-consumer-group   # Consumer group ID      auto-offset-reset: earliest   # The consumer reads from scratch (if there is no submitted offset)      key-deserializer:   # Set the deserializer of the key      value-deserializer:   # Set the deserializer of value to a string    listener:
      missing-topics-fatal: false    # If the theme does not exist,No fatal errors thrown

Example:

-servers=localhost:9092
-id=my-consumer-group
-offset-reset=earliest
-topics-fatal=false
-deserializer=  # Set the deserializer of the key-deserializer=  # set upvalueThe deserializer is a string

Note: -deserializer= # Set the deserializer of the key
-deserializer= # Set the deserializer of value to a string
The above configuration shows that the data produced by Kafka is a json string, so the data received by consumption is also a json string by default. If you want to accept the message using an object, you need to customize the serializer, such as the following configuration

spring:
  kafka:
    producer:
      key-serializer:   # Use StringSerializer for Key      value-serializer:   # Use ErrorHandlingSerializer for Value      properties:
        :   # Default JSON Deserialization target type is Order

3. Create Kafka Consumers

Create a Kafka consumer class to process messages. You can use the @KafkaListener annotation to listen for messages in Kafka

(I) The message produced by Kafka is a JSON string

1. Method 1

If the message is a JSON string, you can use StringDeserializer to get the message and then use ObjectMapper to convert it to
Java objects (such as Order).

import ;
import ;
import ;
@Service
@EnableKafka  // Enable Kafka Consumerpublic class KafkaConsumer {
    private final ObjectMapper objectMapper = new ObjectMapper();
    // Listen to the order-topic topic in Kafka    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(String message) {
        try {
            // Deserialize the JSON string into an Order object            Order order = (message, );
            ("Received order: " + order);
        } catch (Exception e) {
            ();
        }
    }
}

illustrate:

@KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics represent the Kafka topic to be listened to, and groupId represents the consumer group to which the consumer belongs.
listen(String message): This method is called to process each message received. In this example, we print out the message content.

2. Method 2: You need to directly access the message metadata

Kafka messages can be received through ConsumerRecord. This method is suitable for direct access to message metadata (e.g.
The scenarios of topic, partition, offset) are also suitable for manually managing message consumption and offset submission.

import ;
import ;
import ;
@Service
public class KafkaConsumer {
    // Listen to the order-topic topic in Kafka    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord&lt;String, String&gt; record) {
        // Get detailed information about the message        String key = ();           // Get the key of the message        String value = ();       // Get the value of the message        String topic = ();       // Get the topic of the message        int partition = (); // Get the partition of the message        long offset = ();      // Get the offset of the message        long timestamp = (); // Get the timestamp of the message        // Process the message (here we just print the message)        ("Consumed record: ");
        ("Key: " + key);
        ("Value: " + value);
        ("Topic: " + topic);
        ("Partition: " + partition);
        ("Offset: " + offset);
        ("Timestamp: " + timestamp);
    }
}

(II) The message produced by Kafka is an object Order

import ;
import ;
import ;
@Service
public class KafkaConsumer {
    // Listen to the order-topic topic in Kafka    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord&lt;String, Order&gt; record) {
        // Get detailed information about the message        String key = ();           // Get the key of the message        Order value = ();       // Get the value of the message        String topic = ();       // Get the topic of the message        int partition = (); // Get the partition of the message        long offset = ();      // Get the offset of the message        long timestamp = (); // Get the timestamp of the message        // Process the message (here we just print the message)        ("Consumed record: ");
        ("Key: " + key);
        ("Value: " + value);
        ("Topic: " + topic);
        ("Partition: " + partition);
        ("Offset: " + offset);
        ("Timestamp: " + timestamp);
    }
}

4. Create Startup Class

Make sure your Spring Boot startup class is properly configured with Spring Boot application startup.

import ;
import ;
@SpringBootApplication
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        (, args);
    }
}

5. Configure Kafka Producer (optional)

(I) The message type is json string

import ;
import ;
import ;
import ;
import ;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate&lt;String, String&gt; kafkaTemplate;  // Send a String type message    private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper is used for serialization    // Send order to Kafka    public void sendOrder(String topic, Order order) {
        try {
            // Convert the Order object to a JSON string            String orderJson = (order);
            // Send JSON string to Kafka            (topic, orderJson);  // Send string message            ("Order JSON sent to Kafka: " + orderJson);
        } catch (Exception e) {
            ();
        }
    }
}

(II) The message type is an object Order

import ;
import ;
import ;
import ;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate&lt;String, Order&gt; kafkaTemplate;
    // Send order to Kafka    public void sendOrder(String topic, Order order) {
        (topic, order);  // Send an order object, Spring Kafka will automatically convert Order to JSON    }
}

6. Start Kafka Service

Start Kafka Service

bin/ config/

7. Test Kafka Consumers

You can test whether the consumer is working properly by sending a message to Kafka. Suppose you have created a topic in Kafka called my-topic and can use KafkaProducer to send messages:

import ;
import ;
import ;
@RestController
public class KafkaController {
    @Autowired
    private KafkaProducer kafkaProducer;
    @GetMapping("/sendOrder")
    public String sendOrder() {
        Order order = new Order();
        (1L);
        (123L);
        ("Laptop");
        (2);
        ("Created");
        ("order-topic", order);
        return "Order sent!";
    }
}

When you access the /sendOrder endpoint, KafkaProducer will send the message to Kafka, and KafkaConsumer will receive the message and print it out.

9. Testing and debugging

You can make sure that messages have been successfully consumed by viewing the Kafka consumer log. You can also use KafkaTemplate to send messages and ensure that the connection between Kafka producers and consumers is normal.

10. Conclusion

At this point, you have successfully configured and implemented Kafka consumers and producers in Spring Boot. You can extend features as needed, such as handling more complex message types, batch consumption, etc.

This is the end of this article about how Springboot projects consume Kafka data. For more related content on Springboot consumption Kafka data, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!