SoFunction
Updated on 2025-03-05

Go gRPC server streaming RPC tutorial example

Preface

The previous article introduced the simple mode RPC. When the data volume is large or data needs to be continuously transmitted, we should use streaming RPC, which allows us to transfer data while processing. This article first introduces server-side streaming RPC.

Server-side streaming RPC: The client sends a request to the server and gets a stream to read the returned message sequence. The client reads the returned stream until there is no message inside.

Scenario Simulation: Get stock trends in real time

1. The client wants to obtain the real-time trend of a certain crude oil stock, and the client sends a request

2. The server returns the trend of the stock in real time

Create a new proto file

Create a new server_stream.proto file

1. Define the sending information

// Define the send request informationmessage SimpleRequest{
    // Define the sent parameters, use camel naming method, and underline it in lowercase, such as: student_name    // Request parameters    string data = 1;
}

2. Define the received information

// Define streaming response informationmessage StreamResponse{
    // Streaming response data    string stream_value = 1;
}

3. Define service method ListValue

Server-side streaming rpc, just add stream before responding to data

// Define our service (multiple services can be defined, each service can define multiple interfaces)service StreamServer{
    // Server-side streaming rpc, add stream before responding to data    rpc ListValue(SimpleRequest)returns(stream StreamResponse){};
}

4. Compile the proto file

Enter the directory where server_stream.proto is located and run the command:

protoc --go_out=plugins=grpc:./ ./server_stream.proto

Create a Server side

1. Define our service and implement the ListValue method

// SimpleService defines our servicestype StreamService struct{}
// ListValue Implements ListValue methodfunc (s *StreamService) ListValue(req *, srv pb.StreamServer_ListValueServer) error {
	for n := 0; n < 5; n++ {
		// Send a message to the stream. The default maximum length of each send message is `math.MaxInt32`bytes		err := (&{
			StreamValue:  + (n),
		})
		if err != nil {
			return err
		}
	}
	return nil
}

Beginners may find it confused about how the parameters and return values ​​of ListValue are determined. In fact, these are all defined in the file generated when compiling proto, and we only need to implement it.

2. Start the gRPC server

const (
	// Address Listening Address	Address string = ":8000"
	// Network Network Communication Protocol	Network string = "tcp"
)

func main() {
	// Listen to local ports	listener, err := (Network, Address)
	if err != nil {
		(" err: %v", err)
	}
	(Address + " ...")
	// Create a new gRPC server instance	// The default maximum message length for a single received is `1024*1024*4`bytes(4M), and the maximum message length for a single sent is `math.MaxInt32`bytes	// grpcServer := ((1024*1024*4), (math.MaxInt32))
	grpcServer := ()
	// Register our service on the gRPC server	(grpcServer, &StreamService{})

	//Use the server Serve() method and our port information area to block and wait until the process is killed or Stop() is called	err = (listener)
	if err != nil {
		(" err: %v", err)
	}
}

Run the server

go run 
:8000 ...

Create Client

1. Create and call the server ListValue method

// listValue calls the ListValue method on the serverfunc listValue() {
	// Create a send structure	req := {
		Data: "stream server grpc ",
	}
	// Call our service (ListValue method)	stream, err := ((), &req)
	if err != nil {
		("Call ListStr err: %v", err)
	}
	for {
		//Recv() method receives server messages. The default maximum message length for each Recv() is `1024*1024*4`bytes(4M)		res, err := ()
		// Determine whether the message flow has ended		if err ==  {
			break
		}
		if err != nil {
			("ListStr get stream err: %v", err)
		}
		// Print the return value		()
	}
}

2. Start the gRPC client

// Address connection addressconst Address string = ":8000"

var grpcClient 

func main() {
	// Connect to the server	conn, err := (Address, ())
	if err != nil {
		(" err: %v", err)
	}
	defer ()

	// Establish a gRPC connection	grpcClient = (conn)
	route()
	listValue()
}

Run the client

go run
stream server grpc 0
stream server grpc 1
stream server grpc 2
stream server grpc 3
stream server grpc 4

The client continuously obtains data from the server

think

If the server keeps sending data, similar to obtaining real-time data on stock trends, can the client stop obtaining data by itself?

Answer: Yes

1. We slightly modify the ListValue method on the server

// ListValue Implements ListValue methodfunc (s *StreamService) ListValue(req *, srv pb.StreamServer_ListValueServer) error {
	for n := 0; n < 15; n++ {
		// Send a message to the stream. The default maximum length of each send message is `math.MaxInt32`bytes		err := (&{
			StreamValue:  + (n),
		})
		if err != nil {
			return err
		}
		(n)
		(1 * )
	}
	return nil
}

2. Then modify the implementation of the client calling ListValue method to get the result

// listValue calls the ListValue method on the serverfunc listValue() {
	// Create a send structure	req := {
		Data: "stream server grpc ",
	}
	// Call our service (Route method)	// At the same time, a , which allows us to change the behavior of RPC when needed, such as timeout/cancel a running RPC	stream, err := ((), &req)
	if err != nil {
		("Call ListStr err: %v", err)
	}
	for {
		//Recv() method receives server messages. The default maximum message length for each Recv() is `1024*1024*4`bytes(4M)		res, err := ()
		// Determine whether the message flow has ended		if err ==  {
			break
		}
		if err != nil {
			("ListStr get stream err: %v", err)
		}
		// Print the return value		()
		break
	}
	//You can use CloseSend() to close the stream, so that the server will not continue to generate stream messages	//After calling CloseSend(), if you continue to call Recv(), the stream will be reactivated, and the previous result will be retrieved.	()
}

Just call the CloseSend() method to close the server stream and stop sending data. It is worth noting that after calling CloseSend(), if you continue to call Recv(), the stream will be reactivated, and the current result will continue to get the message.

This can perfectly solve the operation of client pause->continue to get data.

Summarize

This article introduces the simple use of streaming RPC on the server. The client initiates a request and the server constantly returns data until the server stops sending data or the client actively stops receiving data. The next article will introduce client streaming RPC.

Tutorial source code address:/Bingjian-Zhu/go-grpc-example

refer to:gRPC official document Chinese version

For more information about Go gRPC server streaming RPC, please follow my other related articles!