SoFunction
Updated on 2025-03-08

Spring Boot integrates Kafka+SSE to achieve real-time data display

Why use Kafka?

Rabbitmq or Rocketmq is not used because Kafka is an integral part of the Hadoop cluster, and it is adaptable to the related development of big data. In the current business scenario, it is not necessary to use dead letter queues. However, it should be noted that Kafka is also slow to pull data with slow update time, so it is necessary to choose other MQs for high requirements for real-time performance.

Message queues are used because the middleware is real-time and can be distributed as a broadcast.

Why use SSE?

When using Websocket to transmit information, it will be converted into binary data, resulting in a certain time loss. SSE directly transmits text, and this problem does not exist.

Since Websocket is bidirectional, when reading logs, if someone connects to WS logs, it will send a large amount of exception information, which will cause problems for the use of segments and log segments; SSE is one-way, and this issue is not needed, which improves security
In addition, SSE supports disconnection and reconnection; the Websocket protocol itself does not provide a heartbeat mechanism, so when no data is sent for a long time, this connection will be disconnected, so a handwritten heartbeat mechanism is required to implement it.

In addition, since it is an implementation method of long connection, SSE can also replace Websocket to implement scanning code login (for example, through the SSE timeout component, I can organize the timeout function of the QR code.)

In addition, if it is an ordinary project, it does not require too high real-time, so you do not need to use Websocket, just use SSE

Code implementation

Introducing SSE and Kafka

<!-- SSE,generallyspringbootDevelopmentwebAll applications are -->
       <dependency>
           <groupId></groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
<!-- kafka,The most important thing is the first one,The remaining two are for testing -->
       <dependency>
            <groupId></groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency
            <groupId></groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

Add Kafka configuration information

# KafkaProperties
-servers=localhost:9092
-id=community-consumer-group
-offset-reset=earliest
-serializer=
-serializer=

Configure Kafka information

@Configuration
public class KafkaProducerConfig {

    @Value("${-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, );
        (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, );
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Configure controller to enable effects through web

@RestController
@RequestMapping(path = "sse")
public class KafkaSSEController {

    private static final Map&lt;String, SseEmitter&gt; sseCache = new ConcurrentHashMap&lt;&gt;();

    @Resource
    private KafkaTemplate&lt;String, String&gt; kafkaTemplate;

    /**
      * @param message
      * @apiNote Send information to Kafka topic
      */
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        ("my-topic", message);
    }

    /**
      * Listen to Kafka data
      *
      * @param message
      */
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        ("Received message: " + message);
    }

    /**
      * Connect to sse service
      *
      * @param id
      * @return
      * @throws IOException
      */
    @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter push(@RequestParam("id") String id) throws IOException {
        // Timeout time is set to 5 minutes to demonstrate the automatic reconnection of the client        SseEmitter sseEmitter = new SseEmitter(5_60_000L);
        // Set the front-end retry time to 1s        // send(): Send data. If the passed in is a non-SseEventBuilder object, the passed parameters will be encapsulated into data.        (().reconnectTime(1000).data("Connected successfully"));
        (id, sseEmitter);
        ("add " + id);
        ("Hello", MediaType.APPLICATION_JSON);
         data = ().name("finish").id("6666").data("Ha ha");
        (data);
        // onTimeout(): Timeout callback triggers        (() -&gt; {
            (id + "time out");
            (id);
        });
        // onCompletion(): The callback triggers after the end        (() -&gt; ("Finish!!!"));
        return sseEmitter;
    }
    /**
     * http://127.0.0.1:8080/sse/push?id=7777&amp;content=%E4%BD%A0%E5%93%88aaaaaa
     * @param id
     * @param content
     * @return
     * @throws IOException
     */
    @ResponseBody
    @GetMapping(path = "push")
    public String push(String id, String content) throws IOException {
        SseEmitter sseEmitter = (id);
        if (sseEmitter != null) {
            (content);
        }
        return "over";
    }

    @ResponseBody
    @GetMapping(path = "over")
    public String over(String id) {
        SseEmitter sseEmitter = (id);
        if (sseEmitter != null) {
            // complete(): means that the execution will be disconnected            ();
            (id);
        }
        return "over";
    }

}

Front-end method

&lt;html&gt;
  &lt;head&gt;
    &lt;script&gt;
      ('start')
      const clientId = "your_client_id_x"; // Set client ID      const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // Subscribe to the SSE on the server side
       = event =&gt; {
        ()
        const message = ();
        (`Received message from server: ${message}`);
      };

      // Send messages to the server. You can call it through postman, so the following sendMessage() call is commented out      function sendMessage() {
        const message = "hello sse";
        fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: (message)
        });
        ('dddd'+(message))
      }
      // sendMessage()
    &lt;/script&gt;
  &lt;/head&gt;
&lt;/html&gt;

This is the article about Spring Boot integrating Kafka+SSE to achieve real-time data display. For more relevant Spring Boot real-time data content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!