Preface
This time, we mainly introduce the use of interceptors in gRPC, including a one-party interceptor and a stream interceptor. Add JWT authentication to the interceptor. After the client logs in, you will get a token. When requesting a specific API, you need to bring a token to access it. Since we use grpc-gateway to provide http service in the code, we need to install some dependencies of gateway
In this article, I will briefly introduce the use of interceptors. RPC requests are divided into one-yuan RPC requests and streaming RPC requests. The so-called one-yuan RPC means that both requests and responses are completed at once. gRPC is based on HTTP 2.0. Therefore, one-yuan RPC can be regarded as a client request once and the server responds once. Streaming RPC responds or requests transmission multiple times like a stream.
Correspondingly, interceptors are divided into server interceptors and client interceptors, and are divided into mono-interceptors and stream interceptors according to their functions.
Server Interceptor
1. Unary Interceptor
The source code is written more clearly. UnaryServerInterceptor provides a hook to intercept the execution of a single RPC on the server. The info parameter contains all the information that this RPC interceptor can operate. The handler is a wrapper implemented by the service method, which is used for the logic in the interceptor to call the RPC requests.
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the // server. Only one unary interceptor can be installed. The construction of multiple // interceptors (., chaining) can be implemented at the caller. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if != nil { panic("The unary server interceptor was already set and may not be reset.") } = i }) }
Mainly look at the parameters contained in UnaryServerInterceptor:
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info // contains all the information of this RPC the interceptor can operate on. And handler // is the wrapper of the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC. type UnaryServerInterceptor func(ctx , req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
Several of the parameters are interpreted as:
- ctx: Request context
- req interface{}: Request parameters of RPC method
- info *UnaryServerInfo: contains all information about the RPC method
- handler UnaryHandler: RPC method real execution logic
2. Stream Interceptor
StreamServerInterceptor provides a hook to intercept the execution of streamed RPCs on the server. info contains all information about the RPC that the interceptor can operate. handler is a service method implementation. The interceptor is responsible for calling handler to complete the RPC.
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the // server. Only one stream interceptor can be installed. func StreamInterceptor(i StreamServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if != nil { panic("The stream server interceptor was already set and may not be reset.") } = i }) }
StreamServerInterceptor:
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC //on the contains all the information of this RPC the interceptor can operate //on. And handler is the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC. type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
3. Implement server interceptor
Let's make a simple implementation:
// One-yuan interceptorfunc Unary() { return func( ctx , req interface{}, info *, handler ) (interface{}, error) { ("---------> the unaryServerInterceptor: ", ) err := (ctx, )//Implement the logic of intercept verification and implement it yourself. I am in the complete code I intercepted here, and you can also refer to the code on my gitee if err != nil { return nil, err } return handler(ctx, req) } } //stream interceptorfunc Stream() { return func( srv interface{}, stream , info *, handler ) error { ("--------->the streamServerInterceptor: ", ) err := ((), )//Implement the logic of intercept verification and implement it yourself. I am in the complete code I intercepted here, and you can also refer to the code on my gitee if err != nil { return err } return handler(srv, stream) } }
Add the interceptor implemented above to the Server:
serverOptions := []{ (Unary()), (Stream()), } grpcServer := (serverOptions...)
Client Interceptor
1. One-yuan interceptor: WithUnaryInterceptor
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for // unary RPCs. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { return newFuncDialOption(func(o *dialOptions) { = f }) }
UnaryClientInterceptor
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. // Unary interceptors can be specified as a DialOption, using // WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a // ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC // delegates all unary RPC invocations to the interceptor, and it is the // responsibility of the interceptor to call invoker to complete the processing // of the RPC. // // method is the RPC name. req and reply are the corresponding request and // response messages. cc is the ClientConn on which the RPC was invoked. invoker // is the handler to complete the RPC and it is the responsibility of the // interceptor to call it. opts contain all applicable call options, including // defaults from the ClientConn as well as per-call options. // // The returned error must be compatible with the status package. type UnaryClientInterceptor func(ctx , method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
2. Stream Interceptor: WithStreamInterceptor
// WithStreamInterceptor returns a DialOption that specifies the interceptor for // streaming RPCs. func WithStreamInterceptor(f StreamClientInterceptor) DialOption { return newFuncDialOption(func(o *dialOptions) { = f }) }
StreamClientInterceptor Interceptor Interceptor intercepts the creation of client stream ClientStream. The stream interceptor can be specified as a Dial option. When creating a client connection, use WithStreamInterceptor() or WithChainStreamInterceptor() . When a stream interceptor is set in the client connection, gRPC handes all stream creation to the interceptor, and the interceptor calls the streamer.
// StreamClientInterceptor intercepts the creation of a ClientStream. Stream // interceptors can be specified as a DialOption, using WithStreamInterceptor() // or WithChainStreamInterceptor(), when creating a ClientConn. When a stream // interceptor(s) is set on the ClientConn, gRPC delegates all stream creations // to the interceptor, and it is the responsibility of the interceptor to call // streamer. // // desc contains a description of the stream. cc is the ClientConn on which the // RPC was invoked. streamer is the handler to create a ClientStream and it is // the responsibility of the interceptor to call it. opts contain all applicable // call options, including defaults from the ClientConn as well as per-call // options. // // StreamClientInterceptor may return a custom ClientStream to intercept all I/O // operations. The returned error must be compatible with the status package. type StreamClientInterceptor func(ctx , desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
Parameter explanation:
- ctx is the request context
- desc *StreamDesc contains the information described in the stream
- cc *ClientConn is the client connection that calls RPC
- method string is the method name of the request
- streamer Streamer is a processor that creates client streams, which needs to be called in the stream interceptor.
- opts ...CallOption contains all applicable call options, including the default options from client connections and all calls.
3. Implement client interceptor
The following is the specific implementation:
func Unary() { return func( ctx , method string, req, reply interface{}, conn *, invoker , //Callback function opts ..., ) error { ("-------> unary interceptor: %s", method) if authMethods[method] { //If it is an intercepting method, add the token to the context before calling the actual rpc method. This storage requires intercepting method return invoker(attachToken(ctx), method, req, reply, conn, opts...) // AttachToken appends token after the client intercepts the request. } return invoker(ctx, method, req, reply, conn, opts...) } } // Stream returns a client interceptor to authenticate stream RPC func Stream() { return func( ctx , desc *, conn *, method string, streamer , opts ..., ) (, error) { ("-------> stream interceptor: %s", method) if [method] { return streamer(attachToken(ctx), desc, conn, method, opts...) } return streamer(ctx, desc, conn, method, opts...) } }
It is also relatively easy to use, just add it to Dial:
conn2, err := ( Address, //address transportOption, //SSL/TLS certificate selection (Unary()), (Stream())) if err != nil { ("cannot dial server: ", err) }
This is the end of this article about the detailed explanation of the use of interceptors in gRPC. For more related interceptor content in gRPC, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!