Preface
Previous articlegRPC, likeIt exploded directly, and the content mainly includes: simple gRPC service, stream processing mode, validator, Token authentication and certificate authentication.
The reading volume on multiple platforms has hit new highs, and I have received homepage recommendations on oschina, and the reading volume has reached 1w+, which is the peak of my single reading.
It seems that as long as you write carefully, you will still gain something.
In this article, we will start from actual combat and mainly introduce the publish and subscription mode of gRPC, REST interface and timeout control.
I will upload all the relevant codes to GitHub. Interested friends can view or download them.
Publish and Subscription Mode
Publishing subscription is a common design pattern, and many implementations of this pattern already exist in the open source community. The docker project provides a minimalist implementation of pubsub. The following is the local publish subscription code based on the pubsub package implementation:
package main import ( "fmt" "strings" "time" "/moby/moby/pkg/pubsub" ) func main() { p := (100*, 10) golang := (func(v interface{}) bool { if key, ok := v.(string); ok { if (key, "golang:") { return true } } return false }) docker := (func(v interface{}) bool { if key, ok := v.(string); ok { if (key, "docker:") { return true } } return false }) go ("hi") go ("golang: ") go ("docker: /") (1) go func() { ("golang topic:", <-golang) }() go func() { ("docker topic:", <-docker) }() <-make(chan bool) }
This code first creates an object and then publishes a message by implementing a subscription.
The execution effect is as follows:
docker topic: docker: /
golang topic: golang:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
()
/Users/zhangyongxin/src/go-example/grpc-example/pubsub/server/:43 +0x1e7
exit status 2
Subscription messages can be printed normally.
But there is a deadlock error, which is caused by this statement <-make(chan bool). However, without this statement, the subscription message cannot be printed normally.
I don’t understand it very much here. Is there any big guy who knows it? Welcome to leave a message and ask for guidance.
Next, we use gRPC and pubsub packages to implement the publish subscription model.
Four parts need to be implemented:
- proto file;
- Server: Used to receive subscription requests, and also receive publication requests, and forward the publication requests to subscribers;
- Subscription client: used to subscribe to messages from the server and process messages;
- Publish client: Used to send messages to the server.
proto file
First define the proto file:
syntax = "proto3"; package proto; message String { string value = 1; } service PubsubService { rpc Publish (String) returns (String); rpc SubscribeTopic (String) returns (stream String); rpc Subscribe (String) returns (stream String); }
Define three methods, namely one publish Publish and two subscriptions Subscribe and SubscribeTopic.
The Subscribe method receives all messages, and SubscribeTopic receives messages based on a specific Topic.
Server side
package main import ( "context" "fmt" "log" "net" "server/proto" "strings" "time" "/moby/moby/pkg/pubsub" "/grpc" "/grpc/reflection" ) type PubsubService struct { pub * } func (p *PubsubService) Publish(ctx , arg *) (*, error) { (()) return &amp;{}, nil } func (p *PubsubService) SubscribeTopic(arg *, stream proto.PubsubService_SubscribeTopicServer) error { ch := (func(v interface{}) bool { if key, ok := v.(string); ok { if (key, ()) { return true } } return false }) for v := range ch { if err := (&amp;{Value: v.(string)}); nil != err { return err } } return nil } func (p *PubsubService) Subscribe(arg *, stream proto.PubsubService_SubscribeServer) error { ch := () for v := range ch { if err := (&amp;{Value: v.(string)}); nil != err { return err } } return nil } func NewPubsubService() *PubsubService { return &amp;PubsubService{pub: (100*, 10)} } func main() { lis, err := ("tcp", ":50051") if err != nil { ("failed to listen: %v", err) } // Simple call server := () //Register grcurl’s reflection service required (server) // Register business services (server, NewPubsubService()) ("grpc server start ...") if err := (lis); err != nil { ("failed to serve: %v", err) } }
Compared with the previous publishing and subscription programs, here we actually use * as a member of the structure PubsubService of gRPC.
Then, according to the development process of gRPC, implement three methods corresponding to the structure.
Finally, when registering the service, inject the NewPubsubService() service to implement the local publishing and subscription function.
Subscribe to the client
package main import ( "client/proto" "context" "fmt" "io" "log" "/grpc" ) func main() { conn, err := ("localhost:50051", ()) if err != nil { (err) } defer () client := (conn) stream, err := ( (), &{Value: "golang:"}, ) if nil != err { (err) } go func() { for { reply, err := () if nil != err { if == err { break } (err) } ("sub1: ", ()) } }() streamTopic, err := ( (), &{Value: "golang:"}, ) if nil != err { (err) } go func() { for { reply, err := () if nil != err { if == err { break } (err) } ("subTopic: ", ()) } }() <-make(chan bool) }
Create a new NewPubsubServiceClient object, then implement and method respectively, and then receive messages continuously through goroutine.
Publish client
package main import ( "client/proto" "context" "log" "/grpc" ) func main() { conn, err := ("localhost:50051", ()) if err != nil { (err) } defer () client := (conn) _, err = ( (), &{Value: "golang: hello Go"}, ) if err != nil { (err) } _, err = ( (), &{Value: "docker: hello Docker"}, ) if nil != err { (err) } }
Create a new NewPubsubServiceClient object and then publish the message through the method.
After all the code is written, we open three terminals to test it:
Terminal 1Start the server on:
go run
Terminal 2Start the subscription client on:
go run sub_client.go
Terminal 3Execute the publishing client on:
go run pub_client.go
In this way, inTerminal 2There is a corresponding output on it:
subTopic: golang: hello Go
sub1: golang: hello Go
sub1: docker: hello Docker
You can also open a few more subscription terminals, and each subscription terminal will have the same content output.
Source code address:
/yongxinz/go-example/tree/main/grpc-example/pubsub
REST interface
gRPC is generally used for internal communication in the cluster. If you need to provide external services, most of them are through the REST interface. The open source project grpc-gateway provides the ability to convert gRPC services into REST services. In this way, you can directly access the gRPC API.
But I think that in fact, it should be relatively few of them used in this way. If you provide a REST interface, it will be much more convenient to write an HTTP service directly.
proto file
The first step is to create a proto file:
syntax = "proto3"; package proto; import "google/api/"; message StringMessage { string value = 1; } service RestService { rpc Get(StringMessage) returns (StringMessage) { option () = { get: "/get/{value}" }; } rpc Post(StringMessage) returns (StringMessage) { option () = { post: "/post" body: "*" }; } }
Define a REST service RestService to implement GET and POST methods respectively.
Install plug-in:
go get -u /grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
Generate corresponding code:
protoc -I/usr/local/include -I. \ -I$GOPATH/pkg/mod \ -I$GOPATH/pkg/mod//grpc-ecosystem/[email protected]/third_party/googleapis \ --grpc-gateway_out=. --go_out=plugins=grpc:.\ --swagger_out=. \
The --grpc-gateway_out parameter can generate the corresponding gw file, and the --swagger_out parameter can generate the corresponding API document.
The two files I generated here are as follows:
REST Services
package main import ( "context" "log" "net/http" "rest/proto" "/grpc-ecosystem/grpc-gateway/runtime" "/grpc" ) func main() { ctx := () ctx, cancel := (ctx) defer cancel() mux := () err := ( ctx, mux, "localhost:50051", []{()}, ) if err != nil { (err) } (":8080", mux) }
Here we mainly connect to the gRPC service by implementing the RegisterRestServiceHandlerFromEndpoint method in the gw file.
gRPC Service
package main import ( "context" "net" "rest/proto" "/grpc" ) type RestServiceImpl struct{} func (r *RestServiceImpl) Get(ctx , message *) (*, error) { return &{Value: "Get hi:" + + "#"}, nil } func (r *RestServiceImpl) Post(ctx , message *) (*, error) { return &{Value: "Post hi:" + + "@"}, nil } func main() { grpcServer := () (grpcServer, new(RestServiceImpl)) lis, _ := ("tcp", ":50051") (lis) }
The implementation of gRPC service is still the same as before.
The above is all the code, let's test it now:
Start three terminals:
Terminal 1Start the gRPC service:
go run grpc_service.go
Terminal 2Start the REST service:
go run rest_service.go
Terminal 3Come to request REST service:
$ curl localhost:8080/get/gopher {"value":"Get hi:gopher"} $ curl localhost:8080/post -X POST --data '{"value":"grpc"}' {"value":"Post hi:grpc"}
Source code address:
/yongxinz/go-example/tree/main/grpc-example/rest
Timeout control
The last part introduces timeout control, which is very important.
General WEB service APIs, or Nginx, will set a timeout time. If no data has been returned, the server may directly return a timeout error, or the client may end the connection.
Without this timeout, it would be quite dangerous. All requests are blocked on the server and consume a lot of resources, such as memory. If resources are exhausted, it may even cause the entire service to crash.
So, how to set the timeout time in gRPC? It mainly uses context parameters, specifically functions.
proto file
I won't say much about creating the simplest proto file.
syntax = "proto3"; package proto; // The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
Client
package main import ( "client/proto" "context" "fmt" "log" "time" "/grpc" "/grpc/codes" "/grpc/status" ) func main() { // Simple call conn, err := ("localhost:50051", ()) defer () ctx, cancel := ((), ().Add((3*))) defer cancel() client := (conn) // Simple call reply, err := (ctx, &amp;{Name: "zzz"}) if err != nil { statusErr, ok := (err) if ok { if () == { (" err: deadline") } } (" err: %v", err) } () }
Set a 3s timeout time through the following function:
ctx, cancel := ((), ().Add((3*))) defer cancel()
The timeout error is then detected in the response error.
Server side
package main import ( "context" "fmt" "log" "net" "runtime" "server/proto" "time" "/grpc" "/grpc/codes" "/grpc/reflection" "/grpc/status" ) type greeter struct { } func (*greeter) SayHello(ctx , req *) (*, error) { data := make(chan *, 1) go handle(ctx, req, data) select { case res := &lt;-data: return res, nil case &lt;-(): return nil, (, "Client cancelled, abandoning.") } } func handle(ctx , req *, data chan&lt;- *) { select { case &lt;-(): (()) () //Exit the Go coroutine after timeout case &lt;-(4 * ): //Simulation time-consuming operation res := { Message: "hello " + , } //// Make timeout judgment before modifying the database // if () == { // ... // // If the timeout has been completed, exit // } data &lt;- &amp;res } } func main() { lis, err := ("tcp", ":50051") if err != nil { ("failed to listen: %v", err) } // Simple call server := () //Register grcurl’s reflection service required (server) // Register business services (server, &amp;greeter{}) ("grpc server start ...") if err := (lis); err != nil { ("failed to serve: %v", err) } }
The server adds a handle function, where case <-(4 * ) means that its corresponding code will be executed only after 4s to simulate timeout requests.
If the client timeout time exceeds 4s, a timeout error will occur.
Let’s simulate it below:
Server:
$ go run grpc server start ... 2021/10/24 22:57:40 context deadline exceeded
Client:
$ go run 2021/10/24 22:57:40 err: deadline exit status 1
Source code address
/yongxinz/go-example/tree/main/grpc-example/deadline
Summarize
This article mainly introduces three practical contents of gRPC, namely:
- Publish subscription model
- REST interface
- Timeout control
Personally, timeout control is still the most important thing, and you need to pay more attention during the development process.
Based on the previous article, the actual content of gRPC has been written, all the codes can be executed, and all uploaded to GitHub.
If you have any questions, please leave me a message. If you feel good, you are also welcome to follow and forward it.
Title picture:This image was posted by Reytschl on Pixabay
Source code address:
/yongxinz/go-example
/yongxinz/gopher
Recommended reading
I heard that 99% of Go programmers have been cheated by defer
The test lady asked me how to use gRPC, and I threw this article to her directly
gRPC, like
Use grpcurl to access gRPC services from the command line
Recommend three practical Go development tools
Reference link
/advanced-go-programming-book/ch4-rpc/
https:///article/
Go gRPC timeout control Deadlines
gRPC timeout interceptor implementation example
For more information about gRPC publishing and subscription REST interface, please follow my other related articles!