General Usage
gRPC
Four basic uses
- Request response mode
- Client data flow mode
- Server side data flow mode
- Bidirectional flow mode
CommongRPC
Call writing method
func main(){ //... some code // Link to grpc service conn , err := (":8000",) if err != nil { //...log } defer () //...some code
Existing problems: When faced with high concurrency, performance problems are likely to occur. For example, when we are doing performance tests, we will find that when the client calls for the server, an error will be reported:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
When I actually checked the problem, it was obvious that the number of gRPC connections was full, and many connections were not fully released. [#Source of this article: #]
gRPC
Communication is essentiallyTCP
For connections, then one connection requires three handshakes and four handshakes. Each time we establish and release the connection, we need to go through this process. If we frequently establish and release the connection, this is actually a big waste of resources and performance.
On the server side,gRPC
We don't have to worry about the link management on the server, butgRPC
It is very necessary to care about the link management of the client, and to realize the connection of the reused client.
Create a link pool
Issues to consider when creating a link pool:
- Does the connection pool support expansion?
- Whether an idle connection supports timeout and self-closing
- What is the strategy for handling when the pool is full
Create a link pool interface
type Pool interface { // Get a new connection. When closing the connection, the connection will be placed in the pool Get() (Conn, error) // Close the connection pool, the connections in the natural connection pool are no longer available Close() error //[#Source of this article: #] Status() string }
Implement link pool interface
Create a link pool code
func New(address string, option Options) (Pool, error) { if address == "" { return nil, ("invalid address settings") } if == nil { return nil, ("invalid dial settings") } if <= 0 || <= 0 || > { return nil, ("invalid maximum settings") } if <= 0 { return nil, ("invalid maximun settings") } p := &pool{ index: 0, current: int32(), ref: 0, opt: option, conns: make([]*conn, ), address: address, closed: 0, } for i := 0; i < ; i++ { c, err := (address) if err != nil { () return nil, ("dial is not able to fill the pool: %s", err) } [i] = (c, false) } ("new pool success: %v\n", ()) return p, nil }
Regarding the above code, it is necessary to pay special attention to the establishment of each connection.New
Completed in it, [#Source of this article:#] As long as there is1
If a connection is not established successfully, then even if our connection pool fails, we will call it()
Release all the previously established connections.
Close the link pool code
// Close the connection poolfunc (p *pool) Close() error { atomic.StoreInt32(&, 1) atomic.StoreUint32(&, 0) atomic.StoreInt32(&, 0) atomic.StoreInt32(&, 0) (0) ("[]close pool success: %v\n", ()) return nil }
Delete the link pool code from a specific location
// Clear connections from the specified location to MaxActivefunc (p *pool) deleteFrom(begin int) { for i := begin; i < ; i++ { (i) } }
Destroy specific link code
// Clear specific connectionfunc (p *pool) reset(index int) { conn := [index] if conn == nil { return } () [index] = nil }
Close the link
Code
func (c *conn) reset() error { cc := = nil = false // Source of this blog: if cc != nil { return () } return nil } func (c *conn) Close() error { () if { return () } return nil }
Using the connection pool through()
Get the specific connection handleconn
After that, it will be used()
Close the connection, you will actually go to the aboveClose()
The implementation location, but not specified, and certainly no permissions are displayed.once
Set asfalse
, that is, for the caller, the connection is closed, and for the connection pool, it is actually returning the connection to the connection pool.
Expanding capacity
Key Code
func (p *pool) Get() (Conn, error) { // the first selected from the created connections nextRef := () () current := atomic.LoadInt32(&) () if current == 0 { return nil, ErrClosed } if nextRef <= current*int32() { next := atomic.AddUint32(&, 1) % uint32(current) return [next], nil } // Source of this blog: // the number connection of pool is reach to max active if current == int32() { // the second if reuse is true, select from pool's connections if { next := atomic.AddUint32(&, 1) % uint32(current) return [next], nil } // the third create one-time connection c, err := () return (c, true), err } // the fourth create new connections given back to pool () current = atomic.LoadInt32(&) if current < int32() && nextRef > current*int32() { // 2 times the incremental or the remain incremental ## increment := current if current+increment > int32() { increment = int32() - current } var i int32 var err error for i = 0; i < increment; i++ { c, er := () if er != nil { err = er break } (int(current + i)) [current+i] = (c, false) } // Source of this blog: current += i ("##grow pool: %d ---> %d, increment: %d, maxActive: %d\n", , current, increment, ) atomic.StoreInt32(&, current) if err != nil { () return nil, err } } () next := atomic.AddUint32(&, 1) % uint32(current) return [next], nil }
Get
Code logic
- First increase the reference count of the connection, if set
current*int32()
Within the range, you can just use the connection directly. - If the current number of connections reaches the maximum active number of connections, it depends on what we pass when we create a new pool.
option
In-housereuse
Is the parameter truetrue
, If it is multiplexed, any connection in the connection pool will be randomly removed for use. If it is not multiplexed, a new connection will be created. - We need to do the rest of the situation
2
Double or1
The connection pool is expanded by multiple times.
Can also be inGet
The specific scaling strategy can be determined according to actual conditions, such as when the reference countnextRef
Only the number of currently active connections10%
When (this is just an example), you can consider shrinking.
Performance Testing
Link pool creation and performance testing
/post/poolin…
The above is the detailed explanation of the implementation process of Go creating a Grpc link pool. For more information about Go creating a Grpc link pool, please pay attention to my other related articles!