SoFunction
Updated on 2025-03-05

Detailed explanation of the implementation process of Go creating a Grpc link pool

General Usage

gRPCFour basic uses

  • Request response mode
  • Client data flow mode
  • Server side data flow mode
  • Bidirectional flow mode

CommongRPCCall writing method

func main(){
	//... some code
	// Link to grpc service	conn , err := (":8000",)
	if err != nil {
		//...log
	}
	defer ()
	//...some code

Existing problems: When faced with high concurrency, performance problems are likely to occur. For example, when we are doing performance tests, we will find that when the client calls for the server, an error will be reported:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

When I actually checked the problem, it was obvious that the number of gRPC connections was full, and many connections were not fully released. [#Source of this article: #]

gRPCCommunication is essentiallyTCPFor connections, then one connection requires three handshakes and four handshakes. Each time we establish and release the connection, we need to go through this process. If we frequently establish and release the connection, this is actually a big waste of resources and performance.

On the server side,gRPCWe don't have to worry about the link management on the server, butgRPCIt is very necessary to care about the link management of the client, and to realize the connection of the reused client.

Create a link pool

Issues to consider when creating a link pool:

  • Does the connection pool support expansion?
  • Whether an idle connection supports timeout and self-closing
  • What is the strategy for handling when the pool is full

Create a link pool interface

type Pool interface {
	// Get a new connection. When closing the connection, the connection will be placed in the pool   Get() (Conn, error)
	// Close the connection pool, the connections in the natural connection pool are no longer available   Close() error
	//[#Source of this article: #]   Status() string
}

Implement link pool interface

Create a link pool code

func New(address string, option Options) (Pool, error) {
   if address == "" {
      return nil, ("invalid address settings")
   }
   if  == nil {
      return nil, ("invalid dial settings")
   }
   if  <= 0 ||  <= 0 ||  >  {
      return nil, ("invalid maximum settings")
   }
   if  <= 0 {
      return nil, ("invalid maximun settings")
   }
   p := &pool{
      index:   0,
      current: int32(),
      ref:     0,
      opt:     option,
      conns:   make([]*conn, ),
      address: address,
      closed:  0,
   }
   for i := 0; i < ; i++ {
      c, err := (address)
      if err != nil {
         ()
         return nil, ("dial is not able to fill the pool: %s", err)
      }
      [i] = (c, false)
   }
   ("new pool success: %v\n", ())
   return p, nil
}

Regarding the above code, it is necessary to pay special attention to the establishment of each connection.NewCompleted in it, [#Source of this article:#] As long as there is1If a connection is not established successfully, then even if our connection pool fails, we will call it()Release all the previously established connections.

Close the link pool code

// Close the connection poolfunc (p *pool) Close() error {
   atomic.StoreInt32(&, 1)
   atomic.StoreUint32(&, 0)
   atomic.StoreInt32(&, 0)
   atomic.StoreInt32(&, 0)
   (0)
   ("[]close pool success: %v\n", ())
   return nil
}

Delete the link pool code from a specific location

// Clear connections from the specified location to MaxActivefunc (p *pool) deleteFrom(begin int) {
   for i := begin; i < ; i++ {
      (i)
   }
}

Destroy specific link code

// Clear specific connectionfunc (p *pool) reset(index int) {
   conn := [index]
   if conn == nil {
      return
   }
   ()
   [index] = nil
}

Close the link

Code

func (c *conn) reset() error {
   cc := 
    = nil
    = false
   // Source of this blog:   if cc != nil {
      return ()
   }
   return nil
}
func (c *conn) Close() error {
   ()
   if  {
      return ()
   }
   return nil
}

Using the connection pool through()Get the specific connection handleconnAfter that, it will be used()Close the connection, you will actually go to the aboveClose()The implementation location, but not specified, and certainly no permissions are displayed.onceSet asfalse, that is, for the caller, the connection is closed, and for the connection pool, it is actually returning the connection to the connection pool.

Expanding capacity

Key Code

func (p *pool) Get() (Conn, error) {
   // the first selected from the created connections
   nextRef := ()
   ()
   current := atomic.LoadInt32(&)
   ()
   if current == 0 {
      return nil, ErrClosed
   }
   if nextRef <= current*int32() {
      next := atomic.AddUint32(&, 1) % uint32(current)
      return [next], nil
   }
   // Source of this blog:   // the number connection of pool is reach to max active
   if current == int32() {
      // the second if reuse is true, select from pool's connections
      if  {
         next := atomic.AddUint32(&, 1) % uint32(current)
         return [next], nil
      }
      // the third create one-time connection
      c, err := ()
      return (c, true), err
   }
   // the fourth create new connections given back to pool
   ()
   current = atomic.LoadInt32(&)
   if current < int32() && nextRef > current*int32() {
      // 2 times the incremental or the remain incremental  ##
      increment := current
      if current+increment > int32() {
         increment = int32() - current
      }
      var i int32
      var err error
      for i = 0; i < increment; i++ {
         c, er := ()
         if er != nil {
            err = er
            break
         }
         (int(current + i))
         [current+i] = (c, false)
      }
	  // Source of this blog:      current += i
      ("##grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
         , current, increment, )
      atomic.StoreInt32(&, current)
      if err != nil {
         ()
         return nil, err
      }
   }
   ()
   next := atomic.AddUint32(&, 1) % uint32(current)
   return [next], nil
}

GetCode logic

  • First increase the reference count of the connection, if setcurrent*int32()Within the range, you can just use the connection directly.
  • If the current number of connections reaches the maximum active number of connections, it depends on what we pass when we create a new pool.optionIn-housereuseIs the parameter truetrue, If it is multiplexed, any connection in the connection pool will be randomly removed for use. If it is not multiplexed, a new connection will be created.
  • We need to do the rest of the situation2Double or1The connection pool is expanded by multiple times.

Can also be inGetThe specific scaling strategy can be determined according to actual conditions, such as when the reference countnextRefOnly the number of currently active connections10%When (this is just an example), you can consider shrinking.

Performance Testing

Link pool creation and performance testing

/post/poolin…

The above is the detailed explanation of the implementation process of Go creating a Grpc link pool. For more information about Go creating a Grpc link pool, please pay attention to my other related articles!