SoFunction
Updated on 2025-03-05

gRPC's publish subscription mode, REST interface and timeout control

Preface

Previous articlegRPC, likeIt exploded directly, and the content mainly includes: simple gRPC service, stream processing mode, validator, Token authentication and certificate authentication.

The reading volume on multiple platforms has hit new highs, and I have received homepage recommendations on oschina, and the reading volume has reached 1w+, which is the peak of my single reading.

It seems that as long as you write carefully, you will still gain something.

In this article, we will start from actual combat and mainly introduce the publish and subscription mode of gRPC, REST interface and timeout control.

I will upload all the relevant codes to GitHub. Interested friends can view or download them.

Publish and Subscription Mode

Publishing subscription is a common design pattern, and many implementations of this pattern already exist in the open source community. The docker project provides a minimalist implementation of pubsub. The following is the local publish subscription code based on the pubsub package implementation:

package main
import (
    "fmt"
    "strings"
    "time"
    "/moby/moby/pkg/pubsub"
)
func main() {
    p := (100*, 10)
    golang := (func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if (key, "golang:") {
                return true
            }
        }
        return false
    })
    docker := (func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if (key, "docker:") {
                return true
            }
        }
        return false
    })
    go ("hi")
    go ("golang: ")
    go ("docker: /")
    (1)
    go func() {
        ("golang topic:", <-golang)
    }()
    go func() {
        ("docker topic:", <-docker)
    }()
    <-make(chan bool)
}

This code first creates an object and then publishes a message by implementing a subscription.

The execution effect is as follows:

docker topic: docker: /
golang topic: golang: 
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
()
    /Users/zhangyongxin/src/go-example/grpc-example/pubsub/server/:43 +0x1e7
exit status 2

Subscription messages can be printed normally.

But there is a deadlock error, which is caused by this statement <-make(chan bool). However, without this statement, the subscription message cannot be printed normally.

I don’t understand it very much here. Is there any big guy who knows it? Welcome to leave a message and ask for guidance.

Next, we use gRPC and pubsub packages to implement the publish subscription model.

Four parts need to be implemented:

  • proto file;
  • Server: Used to receive subscription requests, and also receive publication requests, and forward the publication requests to subscribers;
  • Subscription client: used to subscribe to messages from the server and process messages;
  • Publish client: Used to send messages to the server.

proto file

First define the proto file:

syntax = "proto3";
package proto;
message String {
    string value = 1;
}
service PubsubService {
    rpc Publish (String) returns (String);
    rpc SubscribeTopic (String) returns (stream String);
    rpc Subscribe (String) returns (stream String);
}

Define three methods, namely one publish Publish and two subscriptions Subscribe and SubscribeTopic.

The Subscribe method receives all messages, and SubscribeTopic receives messages based on a specific Topic.

Server side

package main
import (
    "context"
    "fmt"
    "log"
    "net"
    "server/proto"
    "strings"
    "time"
    "/moby/moby/pkg/pubsub"
    "/grpc"
    "/grpc/reflection"
)
type PubsubService struct {
    pub *
}
func (p *PubsubService) Publish(ctx , arg *) (*, error) {
    (())
    return &amp;amp;{}, nil
}
func (p *PubsubService) SubscribeTopic(arg *, stream proto.PubsubService_SubscribeTopicServer) error {
    ch := (func(v interface{}) bool {
        if key, ok := v.(string); ok {
            if (key, ()) {
                return true
            }
        }
        return false
    })
    for v := range ch {
        if err := (&amp;amp;{Value: v.(string)}); nil != err {
            return err
        }
    }
    return nil
}
func (p *PubsubService) Subscribe(arg *, stream proto.PubsubService_SubscribeServer) error {
    ch := ()
    for v := range ch {
        if err := (&amp;amp;{Value: v.(string)}); nil != err {
            return err
        }
    }
    return nil
}
func NewPubsubService() *PubsubService {
    return &amp;amp;PubsubService{pub: (100*, 10)}
}
func main() {
    lis, err := ("tcp", ":50051")
    if err != nil {
        ("failed to listen: %v", err)
    }
    // Simple call    server := ()
    //Register grcurl’s reflection service required    (server)
    // Register business services    (server, NewPubsubService())
    ("grpc server start ...")
    if err := (lis); err != nil {
        ("failed to serve: %v", err)
    }
}

Compared with the previous publishing and subscription programs, here we actually use * as a member of the structure PubsubService of gRPC.

Then, according to the development process of gRPC, implement three methods corresponding to the structure.

Finally, when registering the service, inject the NewPubsubService() service to implement the local publishing and subscription function.

Subscribe to the client

package main
import (
    "client/proto"
    "context"
    "fmt"
    "io"
    "log"
    "/grpc"
)
func main() {
    conn, err := ("localhost:50051", ())
    if err != nil {
        (err)
    }
    defer ()
    client := (conn)
    stream, err := (
        (), &amp;{Value: "golang:"},
    )
    if nil != err {
        (err)
    }
    go func() {
        for {
            reply, err := ()
            if nil != err {
                if  == err {
                    break
                }
                (err)
            }
            ("sub1: ", ())
        }
    }()
    streamTopic, err := (
        (), &amp;{Value: "golang:"},
    )
    if nil != err {
        (err)
    }
    go func() {
        for {
            reply, err := ()
            if nil != err {
                if  == err {
                    break
                }
                (err)
            }
            ("subTopic: ", ())
        }
    }()
    &lt;-make(chan bool)
}

Create a new NewPubsubServiceClient object, then implement and method respectively, and then receive messages continuously through goroutine.

Publish client

package main
import (
    "client/proto"
    "context"
    "log"
    "/grpc"
)
func main() {
    conn, err := ("localhost:50051", ())
    if err != nil {
        (err)
    }
    defer ()
    client := (conn)
    _, err = (
        (), &amp;{Value: "golang: hello Go"},
    )
    if err != nil {
        (err)
    }
    _, err = (
        (), &amp;{Value: "docker: hello Docker"},
    )
    if nil != err {
        (err)
    }
}

Create a new NewPubsubServiceClient object and then publish the message through the method.

After all the code is written, we open three terminals to test it:

Terminal 1Start the server on:

go run 

Terminal 2Start the subscription client on:

go run sub_client.go

Terminal 3Execute the publishing client on:

go run pub_client.go

In this way, inTerminal 2There is a corresponding output on it:

subTopic:  golang: hello Go
sub1:  golang: hello Go
sub1:  docker: hello Docker

You can also open a few more subscription terminals, and each subscription terminal will have the same content output.

Source code address:

/yongxinz/go-example/tree/main/grpc-example/pubsub

REST interface

gRPC is generally used for internal communication in the cluster. If you need to provide external services, most of them are through the REST interface. The open source project grpc-gateway provides the ability to convert gRPC services into REST services. In this way, you can directly access the gRPC API.

But I think that in fact, it should be relatively few of them used in this way. If you provide a REST interface, it will be much more convenient to write an HTTP service directly.

proto file

The first step is to create a proto file:

syntax = "proto3";
package proto;
import "google/api/";
message StringMessage {
  string value = 1;
}
service RestService {
    rpc Get(StringMessage) returns (StringMessage) {
        option () = {
            get: "/get/{value}"
        };
    }
    rpc Post(StringMessage) returns (StringMessage) {
        option () = {
            post: "/post"
            body: "*"
        };
    }
}

Define a REST service RestService to implement GET and POST methods respectively.

Install plug-in:

go get -u /grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway

Generate corresponding code:

protoc -I/usr/local/include -I. \
    -I$GOPATH/pkg/mod \
    -I$GOPATH/pkg/mod//grpc-ecosystem/[email protected]/third_party/googleapis \
    --grpc-gateway_out=. --go_out=plugins=grpc:.\
    --swagger_out=. \
    

The --grpc-gateway_out parameter can generate the corresponding gw file, and the --swagger_out parameter can generate the corresponding API document.

The two files I generated here are as follows:


REST Services

package main
import (
    "context"
    "log"
    "net/http"
    "rest/proto"
    "/grpc-ecosystem/grpc-gateway/runtime"
    "/grpc"
)
func main() {
    ctx := ()
    ctx, cancel := (ctx)
    defer cancel()
    mux := ()
    err := (
        ctx, mux, "localhost:50051",
        []{()},
    )
    if err != nil {
        (err)
    }
    (":8080", mux)
}

Here we mainly connect to the gRPC service by implementing the RegisterRestServiceHandlerFromEndpoint method in the gw file.

gRPC Service

package main
import (
    "context"
    "net"
    "rest/proto"
    "/grpc"
)
type RestServiceImpl struct{}
func (r *RestServiceImpl) Get(ctx , message *) (*, error) {
    return &amp;{Value: "Get hi:" +  + "#"}, nil
}
func (r *RestServiceImpl) Post(ctx , message *) (*, error) {
    return &amp;{Value: "Post hi:" +  + "@"}, nil
}
func main() {
    grpcServer := ()
    (grpcServer, new(RestServiceImpl))
    lis, _ := ("tcp", ":50051")
    (lis)
}

The implementation of gRPC service is still the same as before.

The above is all the code, let's test it now:

Start three terminals:

Terminal 1Start the gRPC service:

go run grpc_service.go

 Terminal 2Start the REST service:

go run rest_service.go

Terminal 3Come to request REST service:

$ curl localhost:8080/get/gopher
{"value":"Get hi:gopher"}
$ curl localhost:8080/post -X POST --data '{"value":"grpc"}'
{"value":"Post hi:grpc"}

Source code address:

/yongxinz/go-example/tree/main/grpc-example/rest

Timeout control

The last part introduces timeout control, which is very important.

General WEB service APIs, or Nginx, will set a timeout time. If no data has been returned, the server may directly return a timeout error, or the client may end the connection.

Without this timeout, it would be quite dangerous. All requests are blocked on the server and consume a lot of resources, such as memory. If resources are exhausted, it may even cause the entire service to crash.

So, how to set the timeout time in gRPC? It mainly uses context parameters, specifically functions.

proto file

I won't say much about creating the simplest proto file.

syntax = "proto3";
package proto;
// The greeting service definition.
service Greeter {
    // Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}
// The response message containing the greetings
message HelloReply {
    string message = 1;
}

Client

package main
import (
    "client/proto"
    "context"
    "fmt"
    "log"
    "time"
    "/grpc"
    "/grpc/codes"
    "/grpc/status"
)
func main() {
    // Simple call    conn, err := ("localhost:50051", ())
    defer ()
    ctx, cancel := ((), ().Add((3*)))
    defer cancel()
    client := (conn)
    // Simple call    reply, err := (ctx, &amp;amp;{Name: "zzz"})
    if err != nil {
        statusErr, ok := (err)
        if ok {
            if () ==  {
                (" err: deadline")
            }
        }
        (" err: %v", err)
    }
    ()
}

Set a 3s timeout time through the following function:

ctx, cancel := ((), ().Add((3*)))
defer cancel()

The timeout error is then detected in the response error.

Server side

package main
import (
    "context"
    "fmt"
    "log"
    "net"
    "runtime"
    "server/proto"
    "time"
    "/grpc"
    "/grpc/codes"
    "/grpc/reflection"
    "/grpc/status"
)
type greeter struct {
}
func (*greeter) SayHello(ctx , req *) (*, error) {
    data := make(chan *, 1)
    go handle(ctx, req, data)
    select {
    case res := &amp;lt;-data:
        return res, nil
    case &amp;lt;-():
        return nil, (, "Client cancelled, abandoning.")
    }
}
func handle(ctx , req *, data chan&amp;lt;- *) {
    select {
    case &amp;lt;-():
        (())
        () //Exit the Go coroutine after timeout    case &amp;lt;-(4 * ): //Simulation time-consuming operation        res := {
            Message: "hello " + ,
        }
        //// Make timeout judgment before modifying the database        // if () == {
        //  ...
        // // If the timeout has been completed, exit        // }
        data &amp;lt;- &amp;amp;res
    }
}
func main() {
    lis, err := ("tcp", ":50051")
    if err != nil {
        ("failed to listen: %v", err)
    }
    // Simple call    server := ()
    //Register grcurl’s reflection service required    (server)
    // Register business services    (server, &amp;amp;greeter{})
    ("grpc server start ...")
    if err := (lis); err != nil {
        ("failed to serve: %v", err)
    }
}

The server adds a handle function, where case <-(4 * ) means that its corresponding code will be executed only after 4s to simulate timeout requests.

If the client timeout time exceeds 4s, a timeout error will occur.

Let’s simulate it below:

Server:

$ go run 
grpc server start ...
2021/10/24 22:57:40 context deadline exceeded

Client:

$ go run 
2021/10/24 22:57:40  err: deadline
exit status 1

Source code address

/yongxinz/go-example/tree/main/grpc-example/deadline

Summarize

This article mainly introduces three practical contents of gRPC, namely:

  • Publish subscription model
  • REST interface
  • Timeout control

Personally, timeout control is still the most important thing, and you need to pay more attention during the development process.

Based on the previous article, the actual content of gRPC has been written, all the codes can be executed, and all uploaded to GitHub.

If you have any questions, please leave me a message. If you feel good, you are also welcome to follow and forward it.

Title pictureThis image was posted by Reytschl on Pixabay

Source code address:

/yongxinz/go-example

/yongxinz/gopher

Recommended reading

I heard that 99% of Go programmers have been cheated by defer

The test lady asked me how to use gRPC, and I threw this article to her directly

gRPC, like

Use grpcurl to access gRPC services from the command line

Recommend three practical Go development tools

Reference link

/advanced-go-programming-book/ch4-rpc/

https:///article/

Go gRPC timeout control Deadlines

gRPC timeout interceptor implementation example

For more information about gRPC publishing and subscription REST interface, please follow my other related articles!