When using Dify (assuming some kind of generative AI model or service) to implement streaming output in combination with Spring Boot and WebClient, we need to ensure version compatibility of the technology stack and understand the core concept of streaming output. The following is a detailed explanation:
1. Requirements for the technology stack version
Spring Boot Version Requirements
Minimum recommended version: 2. or
If you need to support HTTP/2 or higher asynchronous processing capabilities, Spring Boot is recommended.
Spring Boot is based on Spring Framework and Java 17+, providing better reactive programming support.
JDK version requirements
Minimum recommended version: Java 11
Spring Boot 2. Supports Java 8 and above, but Java 11 or higher is recommended.
If you are using Spring Boot, you must use Java 17 or later because Spring Boot has stopped supporting versions below Java 11.
2. Core concept: Streaming output
Streaming Output refers to the server sending data to the client step by step in chunking, rather than returning the complete result at once. This approach is particularly suitable for handling word-by-word output of large file transfers, real-time data streams, or generative models.
In Spring Boot, streaming output can be implemented in the following ways:
- Use ResponseEntity<Flux<?>> or ResponseBodyEmitter (for synchronous scenarios).
- Use WebClient's reactive programming model to handle streaming requests and responses.
3. Implementation steps
3.1 Add dependencies
Make sure to add the following dependencies in :
<dependency> <groupId></groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
spring-boot-starter-webflux provides support for reactive web programming.
3.2 Configuring WebClient
Create a WebClient instance, mainly used to set up cross-origin resource sharing (CORS, Cross-Origin Resource Sharing). Its function is to solve the cross-domain problem when the front-end and back-end communicate under different domain names or ports.
@Configuration public class WebConfig implements WebMvcConfigurer { static final List<String> ORIGIN_LIST = ( // Local "http://localhost:8080", "http://127.0.0.1:8080", "http://localhost:8888", "http://127.0.0.1:8888", "http://localhost:8803", "http://127.0.0.1:8803" ); @Override public void addCorsMappings(CorsRegistry registry) { // Configure global cross-domain rules ("/**") // Allow requests for all paths .allowedOrigins(ORIGIN_LIST.toArray(new String[0])) // Allowed sources .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS") // Allowed HTTP methods .allowedHeaders("Content-Type", "Authorization") // Allowed request header .allowCredentials(true); // Whether to allow sending cookies and other credential information } }
3.3 Implementing the stream output controller
@Slf4j @RestController @RequestMapping("/api") @RequiredArgsConstructor public class DifyController { @Value("${}") private String chatMessages; private final DifyService difyService; @GetMapping(value = "/chatMessagesStreaming", produces = "text/event-stream") public Flux<StreamResponse> chatMessagesStreaming(HttpServletRequest request, @RequestParam(value = "query", required = true) String query, @RequestParam(value = "userName", required = true) String userName, @RequestParam(value = "conversationId", required = false) String conversationId) throws Exception { return (query, conversationId, userName).doOnNext(response -> { ("Flowing Results:" + ()); //The workflow_finished node can obtain the complete answer and perform your logical processing if (().equals("workflow_finished")) { ("Enter workflow_finished stage"); String answer = ().getOutputs().getAnswer();//Complete answer } // Message_end end node, perform your logical processing if (().equals("message_end")) { ("Enter message_end"); } }); }
3.4 Implementing the streaming output service layer
java @Slf4j @Service @RequiredArgsConstructor public class DifyService { @Value("${}") private String url; @Value("${}") private String apiKey; /** * Streaming call to diify. * * @param query text * @param conversationId id * @param userName Username * @return Flux response stream */ public Flux<StreamResponse> streamingMessage(String query, String conversationId, String userName) { //1. Set the request body DifyRequestBody body = new DifyRequestBody(); (new HashMap<>()); (query); ("streaming"); (""); (userName); if ((conversationId)) { (conversationId); } //If there is a custom entry parameter, it can be added to the following map //Map<String, Object> commoninputs = new HashMap<>(); //("search_type", searchType); //(commoninputs); //2. Use webclient to send post request return () .uri(url) .headers(httpHeaders -> { (MediaType.APPLICATION_JSON); (apiKey); }) .bodyValue((body)) .retrieve() .bodyToFlux();// Entity conversion .filter(this::shouldInclude) // Filter out unnecessary data [Add to increase according to demand] //.map(this::convertToCustomResponseAsync) // Asynchronous conversion [if the return format is customized, it is implemented through asynchronous conversion] .onErrorResume(throwable -> { ("Exception output:"+()) }) //.concatWith((createCustomFinalMessage())); // Add a custom final message [added according to demand] } private boolean shouldInclude(StreamResponse streamResponse) { // Example: Just the data of the message node and the data of the message_end node if (().equals("message") || ().equals("message_end")) { return true; } return false; }
3.4 Implement streaming output data access layer
The same format as the streaming output of the diify return
@Data public class StreamResponse implements Serializable { /** * Event types in different modes. */ private String event; /** * agent_thought id. */ private String id; /** * Task ID. */ private String task_id; /** * Message unique ID. */ private String message_id; /** * LLM Returns the text block content. */ private String answer; /** * Create timestamp. */ private Long created_at; /** * Session ID. */ private String conversation_id; private StreamResponseData data; } @Data public class StreamResponseData implements Serializable { private String id; private String workflow_id; private String status; private Long created_at; private Long finished_at; private OutputsData outputs; } @Data public class OutputsData implements Serializable { private String answer; }
4. Key points description
.TEXT_EVENT_STREAM_VALUE
Indicates streaming using the Server-Sent Events (SSE) protocol.
Clients can receive streaming data through a browser or SSE-enabled tools such as Postman.
Flux is a core type in the Reactor library, representing an asynchronous sequence that can contain zero or more elements.
Here, Flux represents a word-by-word or sentence-by-word stream received from Dify.
Reactive Characteristics
WebClient is a reactive HTTP client provided by Spring that can efficiently process streaming data.
It does not block threads, but processes data step by step in an event-driven way
Summarize
Through the above steps, we can use Spring Boot and WebClient to implement streaming output. The key is to utilize reactive programming models (Reactor's Flux and WebClient) and to properly configure streaming protocols such as SSE. Selecting the appropriate Spring Boot and JDK versions according to your needs can ensure the performance and stability of your project.
This is the end of this article about implementing streaming response output based on SpringBoot and Dify. For more related SpringBoot Dify streaming response output content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!