SoFunction
Updated on 2025-03-05

Go gRPC service client streaming RPC tutorial

Preface

The previous article introduces the server-side streaming RPC. The client sends a request to the server and gets a stream to read the returned message sequence. The client reads the data of the returned stream. This article will introduce client streaming RPC.

Client streaming RPC: In contrast to server streaming RPC, the client continuously sends data streams to the server, and after the sending is completed, the server returns a response.

Scenario simulation: Upload large amount of data on the client to the server

Create a new proto file

Create a new client_stream.proto file

1. Define the sending information

// Define streaming request informationmessage StreamRequest{
    //Stream request parameters    string stream_data = 1;
}

2. Define the received information

// Define the response informationmessage SimpleResponse{
    //Response code    int32 code = 1;
    //Response value    string value = 2;
}

3. Define service method RouteList

Client streaming rpc, just add stream before the requested parameters

service StreamClient{
    // Client streaming rpc, add stream before requested parameters    rpc RouteList (stream StreamRequest) returns (SimpleResponse){};
}

4. Compile the proto file

Enter the directory where client_stream.proto is located and run the command:

protoc --go_out=plugins=grpc:./ ./client_stream.proto

Create a Server side

1. Define our service and implement the RouteList method

// SimpleService defines our servicestype SimpleService struct{}
// RouteList Implement the RouteList methodfunc (s *SimpleService) RouteList(srv pb.StreamClient_RouteListServer) error {
	for {
		//Get message from stream		res, err := ()
		if err ==  {
			//Send the result and close it			return (&{Value: "ok"})
		}
		if err != nil {
			return err
		}
		()
	}
}

2. Start the gRPC server

const (
	// Address Listening Address	Address string = ":8000"
	// Network Network Communication Protocol	Network string = "tcp"
)

func main() {
	// Listen to local ports	listener, err := (Network, Address)
	if err != nil {
		(" err: %v", err)
	}
	(Address + " ...")
	// Create a new gRPC server instance	grpcServer := ()
	// Register our service on the gRPC server	(grpcServer, &SimpleService{})

	//Use the server Serve() method and our port information area to block and wait until the process is killed or Stop() is called	err = (listener)
	if err != nil {
		(" err: %v", err)
	}
}

Run the server

go run 
:8000 ...

Create Client

1. Create and call the server RouteList method

// routeList calls the server RouteList methodfunc routeList() {
	//Calling the server RouteList method to get the flow	stream, err := (())
	if err != nil {
		("Upload list err: %v", err)
	}
	for n := 0; n < 5; n++ {
		//Send a message to the stream		err := (&{StreamData: "stream client rpc " + (n)})
		if err != nil {
			("stream request err: %v", err)
		}
	}
	//Close the stream and get the returned message	res, err := ()
	if err != nil {
		("RouteList get response err: %v", err)
	}
	(res)
}

2. Start the gRPC client

// Address connection addressconst Address string = ":8000"

var streamClient 

func main() {
	// Connect to the server	conn, err := (Address, ())
	if err != nil {
		(" err: %v", err)
	}
	defer ()

	// Establish a gRPC connection	streamClient = (conn)
	routeList()
}

Run the client

go run 
code:200 value:"hello grpc"
value:"ok"

The server continuously obtains data from the client

stream client rpc 0
stream client rpc 1
stream client rpc 2
stream client rpc 3
stream client rpc 4

think

Can the server actively stop receiving data without receiving messages (in rare cases)?

Answer: Yes, but the client code needs to pay attention to EOF judgment

1. We slightly modify the server RouteList method, and immediately call SendAndClose() to close the stream after receiving a piece of data.

// RouteList Implement the RouteList methodfunc (s *SimpleService) RouteList(srv pb.StreamClient_RouteListServer) error {
	for {
		//Get message from stream		res, err := ()
		if err ==  {
			//Send the result and close it			return (&{Value: "ok"})
		}
		if err != nil {
			return err
		}
		()
		return (&{Value: "ok"})
	}
}

2. Then slightly modify the implementation of the client calling the RouteList method

// routeList calls the server RouteList methodfunc routeList() {
	//Calling the server RouteList method to get the flow	stream, err := (())
	if err != nil {
		("Upload list err: %v", err)
	}
	for n := 0; n < 5; n++ {
		//Send a message to the stream		err := (&{StreamData: "stream client rpc " + (n)})
		//EOF must be detected when sending. When the server actively calls SendAndClose() before the message is received, close the stream. At this time, the client still executes Send(), an EOF error will be returned, so a judgment needs to be added here.		if err ==  {
			break
		}
		if err != nil {
			("stream request err: %v", err)
		}
	}
	//Close the stream and get the returned message	res, err := ()
	if err != nil {
		("RouteList get response err: %v", err)
	}
	(res)
}

The client Send() needs to detect whether err is EOF, because when the server actively calls SendAndClose() before the message is received, if the client continues to execute Send() at this time, an EOF error will be returned.

Summarize

This article introduces the simple use of client streaming RPC, and the next article introduces bidirectional streaming RPC.

Tutorial source code address:/Bingjian-Zhu/go-grpc-example

refer to:gRPC official document Chinese version

The above is the detailed content of the Go gRPC service client streaming RPC tutorial. For more information about Go gRPC client streaming RPC, please follow my other related articles!