SoFunction
Updated on 2025-03-08

Spring integrates the configuration process of kafka monitoring consumption

Preface

Recently, there is a demand in the project, which requires consumption of data in kafka. I have also written code manually to consume kafka data before. But then I thought about it. Since spring provides a way to consume kafka. There is no need to repeat the wheel. So I tried using spring's API.

Project technical background, using springMVC, XML configuration and annotations to use each other. The configuration of kafka uses XML.

Integration process

1. Introduce spring-kafka dependency package

 <dependency>
      <groupId></groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.2.</version>
    </dependency>

2. Add configuration items to the spring xml file, or create a file separately.

&lt;!-- consumer configuration This configuration item can be added or deleted according to the actual needs of your business--&gt;
  &lt;bean  class=""&gt;
    &lt;constructor-arg&gt;
      &lt;map&gt;
        &lt;entry key="" value="${}" /&gt;
        &lt;entry key="" value="group" /&gt;
        &lt;entry key="" value="true" /&gt;
        &lt;entry key="" value="3000" /&gt;
        &lt;entry key="" value="10000" /&gt;
        &lt;entry key=""
            value="" /&gt;
        &lt;entry key=""
            value="" /&gt;
      &lt;/map&gt;
    &lt;/constructor-arg&gt;
  &lt;/bean&gt;

  &lt;!-- create factory This category isspring jarProvided in the bag,Just configure it--&gt;
  &lt;bean  class=""&gt;
    &lt;constructor-arg&gt;
      &lt;ref bean="consumerProperties" /&gt;
    &lt;/constructor-arg&gt;
  &lt;/bean&gt;

  &lt;!-- Customized consumption categories,Need to be implementedspringInterface --&gt;
  &lt;bean 
     class="" /&gt;

  &lt;!-- This category is alsojarProvided in the bag的,The injected listening class is defined by itself,topicThe name is introduced by the configuration file--&gt;
  &lt;bean  class=""&gt;
    &lt;constructor-arg name="topics" value="${}"/&gt;
    &lt;property name="messageListener" ref="payPalConsumer" /&gt;
  &lt;/bean&gt;

  &lt;!-- Change classes are the samejarProvided in,Put thiscontainerPropertiesandconsumerfactory injection --&gt;
  &lt;bean  class=""
     init-method="doStart"&gt;
    &lt;constructor-arg ref="consumerFactory" /&gt;
    &lt;constructor-arg ref="containerProperties" /&gt;
  &lt;/bean&gt;

2. Customize the consumer class, and the consumer class can still use annotations.

/**
 * get msg from kafka
 */
@Component 
public class PayPalConsumer implements MessageListener<String, String> {

  private static Logger logger = ();
  @Autowired
  private XXService XXService;
  @Override
  public void onMessage(ConsumerRecord<String, String> authorizeRecord) {
    String value = ();
    if ((value)){
      ("receive message from kafka is null");
      return;
    }
    ("receive message from kafka is {}",value);
  }
}

Use this step to configure it and go through it at once. Very smooth.

This is the article about the configuration process of spring integrating kafka monitoring consumption. For more related spring integrating kafka content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!