SoFunction
Updated on 2025-03-05

Specific use of Go concurrent programming

Introduction

Go Standard library providesCond The purpose of the primitive is to provide support for concurrency issues in the waiting/notify scenario.CondUsually applied to a set of waiting for a certain conditiongoroutine, wait for the conditions to becometrueWhen one of themgoroutineOr allgoroutineThey will be awakened to execute.

CondIt is related to a certain condition, and this condition requires a set ofgoroutineCooperation is completed together, and when the conditions are not met, all those waiting for this conditiongoroutineThey will be blocked, only this groupgoroutineOnly by collaborating to achieve this condition can the waiting goroutine continue.

This condition can be customized by ustrue/falseLogical expression.

butCondIt is used less because it can be used in most scenariosChannelandWaitGroupTo replace it.

Detailed introduction

Here isConddata structure and externally provided methods,CondAn waiting queue and lock instance is maintained internally.

type Cond struct {
   noCopy noCopy

   // Lock   L Locker

   // Wait for queue   notify  notifyList
   checker copyChecker
}

func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
  • NeWCondNeWCondThe method needs to be passed in by the callerLockerInterface, this interface isLock/UnLockmethod, so we can pass in oneObject

  • Signal: Allow the caller to wake up a waiting timeCondofgoroutine. ifCondThere are one or more waiting queuesgoroutine, then remove the first one from the waiting queuegoroutineAnd wake it up

  • Broadcast: Allow the caller to wake up all waiting for the currentCondofgoroutine. If Cond waits for one or more waiting queuesgoroutine, then clear all waitinggoroutine, and wake up all

  • Wait: Will put the caller in Condwait queue and block until it isSignalorBroadcastMethods to remove and wake up from the waiting queue

Case: Redis connection pool

You can look at the following code, use itCondImplement aRedisThe most critical code for the connection pool is to call it when the linked list is emptyCondofWaitMethod,gorutinePerform blocking. ThengoruntineAfter using the connection, after returning the connection to the pool, you need to notify other blockinggoruntineto get the connection.

package main

import (
   "container/list"
   "fmt"
   "math/rand"
   "sync"
   "time"
)

// Connection pooltype Pool struct {
   lock     // Lock   clients   // connect   cond    * // cond instance   close   bool       // Whether to close}

// Redis Client
type Client struct {
   id int32
}

// Create Redis Clientfunc NewClient() *Client {
   return &Client{
      id: rand.Int31n(100000),
   }
}

// Close Redis Clientfunc (this *Client) Close() {
   ("Client:%d is closing", )
}

// Create a connection poolfunc NewPool(maxConnNum int) *Pool {
   pool := new(Pool)
    = (&)

   // Create a connection   for i := 0; i < maxConnNum; i++ {
      client := NewClient()
      (client)
   }

   return pool
}

// Get the connection from the poolfunc (this *Pool) Pull() *Client {
   ()
   defer ()

   // Closed   if  {
      ("Pool is closed")
      return nil
   }

   // If the connection pool is not connected, it needs to be blocked   for () <= 0 {
      ()
   }

   // Take out the head node from the linked list, delete and return   ele := (())
   return ele.(*Client)
}

// Put the connection back to the poolfunc (this *Pool) Push(client *Client) {
   ()
   defer ()

   if  {
      ("Pool is closed")
      return
   }

   // Insert a connection to the end of the linked list   (client)

   // Wake up a waiting goruntine   ()
}

// Close the poolfunc (this *Pool) Close() {
   ()
   defer ()

   // Close the connection   for e := (); e != nil; e = () {
      client := .(*Client)
      ()
   }

   // Reset data    = true
   ()
}

func main() {

   var wg 

   pool := NewPool(3)
   for i := 1; i <= 10; i++ {
      (1)
      go func(index int) {

         defer ()

         // Get a connection         client := ()

         ("Time:%s | 【goruntine#%d] Get client[%d]\n", ().Format("15:04:05"), index, )         ( * 5)
         ("Time:%s | 【goruntine#%d]After using it, put client[%d] back into the pool\n", ().Format("15:04:05"), index, )
         // Put the connection back to the pool         (client)
      }(i)
   }

   ()
}

Running results:

Time:15:10:25 | [goruntine#7] Obtain client[31847]
Time:15:10:25 | [goruntine#5] Obtain client[27887]
Time:15:10:25 | [goruntine#10] Get client[98081]
Time:15:10:30 | [goruntine#5] After using it, put client[27887] back into the pool
Time:15:10:30 | [goruntine#6] Get client[27887]
Time:15:10:30 | [goruntine#10] After using it, put client[98081] back into the pool
Time:15:10:30 | [goruntine#7] After using it, put client[31847] back into the pool.
Time:15:10:30 | [goruntine#1] Get client[31847]
Time:15:10:30 | [goruntine#9] Get client[98081]
Time:15:10:35 | [goruntine#6] After using it, put client[27887] back into the pool
Time:15:10:35 | [goruntine#3] Get client[27887]
Time:15:10:35 | [goruntine#1] After using it, put client[31847] back into the pool
Time:15:10:35 | [goruntine#4] Get client[31847]
Time:15:10:35 | [goruntine#9] After using it, put client[98081] back into the pool
Time:15:10:35 | [goruntine#2] Get client[98081]
Time:15:10:40 | [goruntine#3] After using it, put client[27887] back into the pool
Time:15:10:40 | [goruntine#8] Get client[27887]
Time:15:10:40 | [goruntine#2] After using it, put client[98081] back into the pool
Time:15:10:40 | [goruntine#4] After using it, put client[31847] back into the pool
Time:15:10:45 | [goruntine#8] After using it, put client[27887] back into the pool

Note

  • CallingWaitBefore the method, you need to add a lock, just like in my example abovePullThe method is to add lock first

Just look at the source code, becauseWaitThe execution logic of the method is to firstgoruntineAdd to the waiting queue, then release the lock, and then block. After wake up, the lock will continue to be added. If callingWaitThere is no lock in front, but it will be unlocked inside, and an error will be reported when executing.

//
//    ()
//    for !condition() {
//        ()
//    }
//    ... make use of condition ...
//    ()
//
func (c *Cond) Wait() {
   ()
   
   // Add to wait queue   t := runtime_notifyListAdd(&)
   ()
   
   // Blocking   runtime_notifyListWait(&, t)
   ()
}
  • stillWaitMethod, you need to continue to check after wake-upCondcondition

Take the aboveredisLet's explain the case, I'm using it hereforloop to perform detection. IfforChange to useif, that is, if you only judge once, what will happen? You can stop and think about it first

The caller can also use itBroadcastWake upgoruntine, if theBroadcastMethods, allgoruntineThey will be awakened, and then everyone will go to the linked list to obtain itredisIf connected, some will appeargoruntineIf you can't get the connection, there are actually not so many connections to get, because only one connection will be put back to the pool at a time.

// If the connection pool is not connected, it needs to be blockedfor () <= 0 {
  ()
}

// Get the connectionele := (())
return ele.(*Client)

This is the end of this article about the specific use of Go concurrent programming. For more related Go content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!