SoFunction
Updated on 2025-04-09

SpringBoot  Practical solution to access streaming response of non-standard SSE format

Recently, the popularity of domestic big models such as DeepSeek has continued to rise, and their attention has even exceeded that of OpenAI (knownly known as CloseAI). existIn the environment, you can easily access using the official Spring AI, but for those who are still using itJDK8andSpringBoot2.7.3For enterprise-level applications, custom implementations are often required. Especially when the data format returned by the large model team does not comply with the standard SSE specifications, it needs to be processed flexibly. This article will share our practical solutions.

📦 Introducing Gradle dependencies

Core dependency description:

  • spring-boot-starter-web: Basic Web Support
  • spring-boot-starter-webflux: Responsive programming support (the module where WebClient is located)
implementation ':spring-boot-starter-web'
implementation ':spring-boot-starter-webflux'

🌐 WebClient configuration points

Pay special attention to the Header configuration when initializing:

@Bean
public WebClient init() {
    return ()
            .baseUrl(baseUrl)
            .defaultHeader(, "Bearer " + openAi)
            // ⚠️ Must be set to JSON format            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
}

🚨 Key pitfall point: initial settingMediaType.TEXT_EVENT_STREAM_VALUEWill cause the request to fail and must be usedAPPLICATION_JSON_VALUE

🧠 Core processing logic

Streaming request portal

@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
    // Request body construction    String requestBody = ("""
        {
            "model": "%s",
            "messages": [{"role": "user", "content": "%s"}],
            "stream": true
        }
        """, model, prompt);
    return ()
            // Request configuration            .uri("/v1/chat/completions")
            .bodyValue(requestBody)
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux()  // 🔑 Key configuration points            .transform(this::processStream)
            // Retry and timeout configuration            .retryWhen((3, (1)))
            .timeout((180));
            // Error handling            .doOnError(e -> ("Stream error", e))
            .doFinally(signal -> ("Stream completed: {}", signal));
}

Technical principles description

When usingbodyToFlux()hour:

  • ✅ Obtain original byte flow control rights
  • ❌ Avoid automatic SSE format parsing (for non-standard responses)
  • 📡 Dynamic data stream processing: similar to Java Stream, but data is continuously added

🔧 Non-standard SSE data processing

Core processing flow

private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
    return dataBufferFlux
            .transform(DataBufferUtils::join)          // Byte stream merging            .map(buffer -> {                          // Byte to string                String content = (StandardCharsets.UTF_8);
                (buffer);
                return content;
            })
            .flatMap(content ->                       // Handle the problem of sticking                (("\\r?\\n\\r?\\n")))
            .filter(event -> !().isEmpty()) // Filter empty events            .map(event -> {                           // Standardized format processing                String trimmed = ();
                if (("data:")) {
                    String substring = (5);
                    return (" ") ? (1) : substring;
                }
                return trimmed;
            })
            .filter(event -> !("data:")); // Secondary filtering}

Three key technical points

  • Sticking bag treatmentpasssplit("\\r?\\n\\r?\\n")Solve the message boundary problem in network transmission, example raw data:

    data:{response1}\n\ndata:{response2}\n\n
  • Format compatibility processingAutomatically remove possible return from the serverdata:Prefix while preserving Spring's ability to automatically add SSE prefixes

  • Dual filtering mechanismMake sure the final output does not contain any residual SSE format identifiers

⚠️ Pay special attention

When the interface is setproduces = MediaType.TEXT_EVENT_STREAM_VALUEhour:

  • Spring WebFlux will be automatically addeddata: Prefix

  • Example format received by the front-end:

    data: {Actual content}
  • If added manually

    data: 

    Prefixes will cause duplication:

    data: data: {Error content}  // ❌ Error format

🛠️ Complete implementation code

// Package declaration and import...@Service
@Slf4j
public class OpenAiService {
    // Configuration items and initialization    private String openAiApiKey = "sk-xxxxxx";
    private String baseUrl = "/xxxx";
    private String model = "gpt-4o";
    private WebClient webClient;
    @PostConstruct
    public void init() {
        webClient = ()
                .baseUrl(baseUrl)
                .defaultHeader(, "Bearer " + openAiApiKey)
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
    }
    @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
        // Build the request body        String requestBody = ("""
                {
                    "model": "gpt-4o-mini",
                    "messages": [{"role": "user", "content": "%s"}],
                    "stream": true
                }
                """, prompt);
        // Send streaming request        return ()
            .uri("/v1/chat/completions")
            .bodyValue(requestBody)
            .retrieve()
            .onStatus(HttpStatusCode::isError, response ->
                    ()
                            .flatMap(error -> (new RuntimeException("API Error: " + error)))
            )
            .bodyToFlux()
            .transform(this::processStream)
            .retryWhen((3, (1)))
            .timeout((180))
            .doOnError(e -> ("Stream error", e))
            .doFinally(signal -> ("Stream completed: {}", signal));
    }
    private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
        return dataBufferFlux
                // Use byte stream processing                .transform(DataBufferUtils::join)
                .map(buffer -> {
                    String content = (StandardCharsets.UTF_8);
                    (buffer);
                    return content;
                })
                // Press the SSE event boundary to prevent the problem of sticking packets                .flatMap(content -> (("\\r?\\n\\r?\\n")))
                // Filter empty events                .filter(event -> !().isEmpty())
                // Standardize SSE event format                .map(event -> {
                    String trimmed = ();
                    // Since webflux sets "produces = MediaType.TEXT_EVENT_STREAM_VALUE",                    // So when returning data, "data:" will be automatically added, so if the returned format contains "data:" and needs to be removed manually                    if (("data:")) {
                        trimmed = ("data:","").trim();
                    }
                    return trimmed;
                })
                .filter(event -> !("data:"));
    }
}

This is the article about SpringBoot’s access to a large-scale streaming response practice for large-scale model access to SpringBoot. For more related SpringBoot’s access to a large-scale model access to a large-scale model access to a large-scale model, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!