Recently, the popularity of domestic big models such as DeepSeek has continued to rise, and their attention has even exceeded that of OpenAI (knownly known as CloseAI). existIn the environment, you can easily access using the official Spring AI, but for those who are still using itJDK8andSpringBoot2.7.3For enterprise-level applications, custom implementations are often required. Especially when the data format returned by the large model team does not comply with the standard SSE specifications, it needs to be processed flexibly. This article will share our practical solutions.
📦 Introducing Gradle dependencies
Core dependency description:
-
spring-boot-starter-web
: Basic Web Support -
spring-boot-starter-webflux
: Responsive programming support (the module where WebClient is located)
implementation ':spring-boot-starter-web' implementation ':spring-boot-starter-webflux'
🌐 WebClient configuration points
Pay special attention to the Header configuration when initializing:
@Bean public WebClient init() { return () .baseUrl(baseUrl) .defaultHeader(, "Bearer " + openAi) // ⚠️ Must be set to JSON format .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); }
🚨 Key pitfall point: initial settingMediaType.TEXT_EVENT_STREAM_VALUE
Will cause the request to fail and must be usedAPPLICATION_JSON_VALUE
🧠 Core processing logic
Streaming request portal
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // Request body construction String requestBody = (""" { "model": "%s", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, model, prompt); return () // Request configuration .uri("/v1/chat/completions") .bodyValue(requestBody) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux() // 🔑 Key configuration points .transform(this::processStream) // Retry and timeout configuration .retryWhen((3, (1))) .timeout((180)); // Error handling .doOnError(e -> ("Stream error", e)) .doFinally(signal -> ("Stream completed: {}", signal)); }
Technical principles description
When usingbodyToFlux()
hour:
- ✅ Obtain original byte flow control rights
- ❌ Avoid automatic SSE format parsing (for non-standard responses)
- 📡 Dynamic data stream processing: similar to Java Stream, but data is continuously added
🔧 Non-standard SSE data processing
Core processing flow
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux .transform(DataBufferUtils::join) // Byte stream merging .map(buffer -> { // Byte to string String content = (StandardCharsets.UTF_8); (buffer); return content; }) .flatMap(content -> // Handle the problem of sticking (("\\r?\\n\\r?\\n"))) .filter(event -> !().isEmpty()) // Filter empty events .map(event -> { // Standardized format processing String trimmed = (); if (("data:")) { String substring = (5); return (" ") ? (1) : substring; } return trimmed; }) .filter(event -> !("data:")); // Secondary filtering}
Three key technical points
-
Sticking bag treatmentpass
split("\\r?\\n\\r?\\n")
Solve the message boundary problem in network transmission, example raw data:data:{response1}\n\ndata:{response2}\n\n
Format compatibility processingAutomatically remove possible return from the server
data:
Prefix while preserving Spring's ability to automatically add SSE prefixesDual filtering mechanismMake sure the final output does not contain any residual SSE format identifiers
⚠️ Pay special attention
When the interface is setproduces = MediaType.TEXT_EVENT_STREAM_VALUE
hour:
Spring WebFlux will be automatically added
data:
Prefix-
Example format received by the front-end:
data: {Actual content}
-
If added manually
data:
Prefixes will cause duplication:
data: data: {Error content} // ❌ Error format
🛠️ Complete implementation code
// Package declaration and import...@Service @Slf4j public class OpenAiService { // Configuration items and initialization private String openAiApiKey = "sk-xxxxxx"; private String baseUrl = "/xxxx"; private String model = "gpt-4o"; private WebClient webClient; @PostConstruct public void init() { webClient = () .baseUrl(baseUrl) .defaultHeader(, "Bearer " + openAiApiKey) .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); } @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // Build the request body String requestBody = (""" { "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, prompt); // Send streaming request return () .uri("/v1/chat/completions") .bodyValue(requestBody) .retrieve() .onStatus(HttpStatusCode::isError, response -> () .flatMap(error -> (new RuntimeException("API Error: " + error))) ) .bodyToFlux() .transform(this::processStream) .retryWhen((3, (1))) .timeout((180)) .doOnError(e -> ("Stream error", e)) .doFinally(signal -> ("Stream completed: {}", signal)); } private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux // Use byte stream processing .transform(DataBufferUtils::join) .map(buffer -> { String content = (StandardCharsets.UTF_8); (buffer); return content; }) // Press the SSE event boundary to prevent the problem of sticking packets .flatMap(content -> (("\\r?\\n\\r?\\n"))) // Filter empty events .filter(event -> !().isEmpty()) // Standardize SSE event format .map(event -> { String trimmed = (); // Since webflux sets "produces = MediaType.TEXT_EVENT_STREAM_VALUE", // So when returning data, "data:" will be automatically added, so if the returned format contains "data:" and needs to be removed manually if (("data:")) { trimmed = ("data:","").trim(); } return trimmed; }) .filter(event -> !("data:")); } }
This is the article about SpringBoot’s access to a large-scale streaming response practice for large-scale model access to SpringBoot. For more related SpringBoot’s access to a large-scale model access to a large-scale model access to a large-scale model, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!