In the previous chapter, we deeply analyze Spring AI's blocking request and response mechanisms and explore how to enhance its memory capabilities. Today, we will focus on explaining the concept and implementation of streaming response. After all, AI's streaming answering function is closely related to its interactive experience and is an important part of improving user satisfaction.
Basic usage
The basic usage is very simple, just add onestream
Methods can realize the required functions. Next, we will demonstrate this process with code examples to help you understand more clearly how to operate in real-world applications. Please see the following code:
@GetMapping(value = "/ai-stream",produces = MediaType.APPLICATION_OCTET_STREAM_VALUE + ";charset=UTF-8") Flux<String> generationByStream(@RequestParam("userInput") String userInput) { Flux<String> output = () .user(userInput) .stream() .content(); return output; }
Added in usstream
After the method, the returned object type will no longer be the original blocking form.CallResponseSpec
, but convert to non-blockingStreamResponseSpec
. At the same time, the returned data type is also from the previous oneString
Change toFlux
。
Before exploring its specific application in depth, let me introduce it first.Flux
Concept and characteristics of .
Spring WebFlux processor implementation
First, in WebFlux, processors have implemented non-blocking functionality. This means that as long as our code returns a Flux object, the response function can be easily implemented. In this way, the application can efficiently process concurrent requests without affecting overall performance due to blocking operations.
@Override public Mono<Void> handle(ServerWebExchange exchange) { if ( == null) { return createNotFoundError(); } if ((())) { return handlePreFlight(exchange); } return () .concatMap(mapping -> (exchange)) .next() .switchIfEmpty(createNotFoundError()) .onErrorResume(ex -> handleResultMono(exchange, (ex))) .flatMap(handler -> handleRequestWith(exchange, handler)); }
Here is a brief introduction to Spring WebFlux. Although this is not our focus, it is still helpful to understand its basic concepts. Spring WebFlux is part of the Spring framework designed to build reactive applications. It supports asynchronous and non-blocking programming models, making handling highly concurrent requests more efficient. Here are some key features of WebFlux:
-
Reactive programming: WebFlux is based on a reactive programming model, using
Mono
andFlux
Type to process data flow.Mono
represents zero or an element, andFlux
means zero or more elements. This model allows us to easily handle asynchronous data flows, thereby improving the readability and maintainability of our code. - Non-blocking I/O: WebFlux enables efficient resource utilization through non-blocking I/O operations such as Netty or Servlet 3.1+ containers. Unlike traditional blocking I/O, WebFlux can free threads while waiting for a response, which can significantly improve the concurrency of the application and support more simultaneous requests without increasing thread overhead.
Understanding these features will lay the foundation for subsequent non-blocking response designs and help us better utilize WebFlux's capabilities to improve application performance.
Source code analysis
Now let's take a look at how our content operates in detail. The following code examples will show a specific implementation to help us understand how data flow and response are handled in WebFlux:
public Flux<String> content() { return doGetFluxChatResponse().map(r -> { if (() == null || ().getOutput() == null || ().getOutput().getContent() == null) { return ""; } return ().getOutput().getContent(); }).filter(StringUtils::hasLength); }
The implementation here is relatively simple, mainly by passing in a function. Next, we will analyze the code implementation of doGetFluxChatResponse in depth to better understand its specific logic and how it works:
private Flux<ChatResponse> doGetFluxChatResponse2(DefaultChatClientRequestSpec inputRequest) { //Duplication code is omitted here var fluxChatResponse = (prompt); //Duplication code is omitted here return advisedResponse; }
The code logic here is basically the same as the blocking answer, the only difference is that it calls(prompt)
method. Next, we will discuss in depth(prompt)
The specific implementation of the method and the design ideas behind it:
public Flux<ChatResponse> stream(Prompt prompt) { return (contextView -> { //Duplication code is omitted here Flux<> completionChunks = (request, getAdditionalHttpHeaders(prompt)); //Duplication code is omitted here Flux<ChatResponse> chatResponse = (this::chunkToChatCompletion) .switchMap(chatCompletion -> (chatCompletion).map(chatCompletion2 -> { //Duplication code is omitted here return new ChatResponse(generations, from(chatCompletion2, null)); } })); //Duplication code is omitted here return new MessageAggregator().aggregate(flux, observationContext::setResponse); }); }
The same logic will not be described here, we will focus on the differences. In this section, we usedchatCompletionStream
, and unlike before, it is no longer used hereretryTemplate
, but introducedwebClient
, This is a tool class that can receive event streams.
public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chatRequest, MultiValueMap<String, String> additionalHttpHeader) { (chatRequest, "The request body can not be null."); ((), "Request must set the stream property to true."); AtomicBoolean isInsideTool = new AtomicBoolean(false); return () .uri() .headers(headers -> (additionalHttpHeader)) .body((chatRequest), ) .retrieve() .bodyToFlux() // cancels the flux stream after the "[DONE]" is received. .takeUntil(SSE_DONE_PREDICATE) // filters out the "[DONE]" message. .filter(SSE_DONE_PREDICATE.negate()) .map(content -> (content, )) //A bunch of codes are omitted here
The main purpose of this code is to passwebClient
Send a POST request to the specified path, and set the appropriate request header and request body. When obtaining response data, the event stream is used (throughbodyToFlux
Method) to receive the response content, filter and convert the data, and finally convert it intoChatCompletionChunk
Object.
Although the rest of the business logic is similar to before, there is a significant difference, that is, the return type of the entire process and the calling method of the OpenAI API are non-blocking.
Summarize
In today's digital age, streaming response mechanisms not only improve the performance of the system, but also play a key role in user experience. By introducing the Flux type, Spring WebFlux design philosophy enables applications to process concurrent requests in a non-blocking manner, effectively leveraging resources and reducing response latency.
We finally fully explain the basic operations of Spring AI, including blocking answers, streaming answers, and memory enhancement functions. These contents lay the foundation for us to understand the working mechanism in depth. Next, we will continue to explore the source code in depth, focusing on analyzing important functions such as callback functions and entity class mapping.
This will help us better understand the internal workings of Spring AI and provide guidance for further optimization and customization.
This is the article about in-depth exploration of Spring AI: Source Code Analysis Streaming Answers. For more related Spring AI streaming answers, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!