Preface
The previous article introduced the simple mode RPC. When the data volume is large or data needs to be continuously transmitted, we should use streaming RPC, which allows us to transfer data while processing. This article first introduces server-side streaming RPC.
Server-side streaming RPC: The client sends a request to the server and gets a stream to read the returned message sequence. The client reads the returned stream until there is no message inside.
Scenario Simulation: Get stock trends in real time
1. The client wants to obtain the real-time trend of a certain crude oil stock, and the client sends a request
2. The server returns the trend of the stock in real time
Create a new proto file
Create a new server_stream.proto file
1. Define the sending information
// Define the send request informationmessage SimpleRequest{ // Define the sent parameters, use camel naming method, and underline it in lowercase, such as: student_name // Request parameters string data = 1; }
2. Define the received information
// Define streaming response informationmessage StreamResponse{ // Streaming response data string stream_value = 1; }
3. Define service method ListValue
Server-side streaming rpc, just add stream before responding to data
// Define our service (multiple services can be defined, each service can define multiple interfaces)service StreamServer{ // Server-side streaming rpc, add stream before responding to data rpc ListValue(SimpleRequest)returns(stream StreamResponse){}; }
4. Compile the proto file
Enter the directory where server_stream.proto is located and run the command:
protoc --go_out=plugins=grpc:./ ./server_stream.proto
Create a Server side
1. Define our service and implement the ListValue method
// SimpleService defines our servicestype StreamService struct{} // ListValue Implements ListValue methodfunc (s *StreamService) ListValue(req *, srv pb.StreamServer_ListValueServer) error { for n := 0; n < 5; n++ { // Send a message to the stream. The default maximum length of each send message is `math.MaxInt32`bytes err := (&{ StreamValue: + (n), }) if err != nil { return err } } return nil }
Beginners may find it confused about how the parameters and return values of ListValue are determined. In fact, these are all defined in the file generated when compiling proto, and we only need to implement it.
2. Start the gRPC server
const ( // Address Listening Address Address string = ":8000" // Network Network Communication Protocol Network string = "tcp" ) func main() { // Listen to local ports listener, err := (Network, Address) if err != nil { (" err: %v", err) } (Address + " ...") // Create a new gRPC server instance // The default maximum message length for a single received is `1024*1024*4`bytes(4M), and the maximum message length for a single sent is `math.MaxInt32`bytes // grpcServer := ((1024*1024*4), (math.MaxInt32)) grpcServer := () // Register our service on the gRPC server (grpcServer, &StreamService{}) //Use the server Serve() method and our port information area to block and wait until the process is killed or Stop() is called err = (listener) if err != nil { (" err: %v", err) } }
Run the server
go run :8000 ...
Create Client
1. Create and call the server ListValue method
// listValue calls the ListValue method on the serverfunc listValue() { // Create a send structure req := { Data: "stream server grpc ", } // Call our service (ListValue method) stream, err := ((), &req) if err != nil { ("Call ListStr err: %v", err) } for { //Recv() method receives server messages. The default maximum message length for each Recv() is `1024*1024*4`bytes(4M) res, err := () // Determine whether the message flow has ended if err == { break } if err != nil { ("ListStr get stream err: %v", err) } // Print the return value () } }
2. Start the gRPC client
// Address connection addressconst Address string = ":8000" var grpcClient func main() { // Connect to the server conn, err := (Address, ()) if err != nil { (" err: %v", err) } defer () // Establish a gRPC connection grpcClient = (conn) route() listValue() }
Run the client
go run
stream server grpc 0
stream server grpc 1
stream server grpc 2
stream server grpc 3
stream server grpc 4
The client continuously obtains data from the server
think
If the server keeps sending data, similar to obtaining real-time data on stock trends, can the client stop obtaining data by itself?
Answer: Yes
1. We slightly modify the ListValue method on the server
// ListValue Implements ListValue methodfunc (s *StreamService) ListValue(req *, srv pb.StreamServer_ListValueServer) error { for n := 0; n < 15; n++ { // Send a message to the stream. The default maximum length of each send message is `math.MaxInt32`bytes err := (&{ StreamValue: + (n), }) if err != nil { return err } (n) (1 * ) } return nil }
2. Then modify the implementation of the client calling ListValue method to get the result
// listValue calls the ListValue method on the serverfunc listValue() { // Create a send structure req := { Data: "stream server grpc ", } // Call our service (Route method) // At the same time, a , which allows us to change the behavior of RPC when needed, such as timeout/cancel a running RPC stream, err := ((), &req) if err != nil { ("Call ListStr err: %v", err) } for { //Recv() method receives server messages. The default maximum message length for each Recv() is `1024*1024*4`bytes(4M) res, err := () // Determine whether the message flow has ended if err == { break } if err != nil { ("ListStr get stream err: %v", err) } // Print the return value () break } //You can use CloseSend() to close the stream, so that the server will not continue to generate stream messages //After calling CloseSend(), if you continue to call Recv(), the stream will be reactivated, and the previous result will be retrieved. () }
Just call the CloseSend() method to close the server stream and stop sending data. It is worth noting that after calling CloseSend(), if you continue to call Recv(), the stream will be reactivated, and the current result will continue to get the message.
This can perfectly solve the operation of client pause->continue to get data.
Summarize
This article introduces the simple use of streaming RPC on the server. The client initiates a request and the server constantly returns data until the server stops sending data or the client actively stops receiving data. The next article will introduce client streaming RPC.
Tutorial source code address:/Bingjian-Zhu/go-grpc-example
refer to:gRPC official document Chinese version
For more information about Go gRPC server streaming RPC, please follow my other related articles!