1. Introduction
This article will introduce the WaitGroup concurrent primitives in the Go language, including the basic usage methods, implementation principles, usage precautions, and common usage methods of WaitGroup. Better understand and apply WaitGroup to coordinate the execution of multiple Goroutines, improving the efficiency and stability of Go concurrent programming.
2. Basic use
2.1 Definition
WaitGroup
It is a structure in the Go language standard library, which provides a simple mechanism for synchronizing the execution of multiple coroutines. Suitable for scenarios where multiple tasks need to be executed concurrently and wait for them to complete before continuing to perform subsequent operations.
2.2 How to use
First, the main coroutine creates a WaitGroup instance, and then at the beginning of each coroutine, callAdd(1)
Method, indicating that a task needs to be executed, and then the coroutine calls it after the task execution is completed.Done
Method, indicating that the task has been executed.
In the main coroutine, it needs to be calledWait()
Method, wait for all coroutines to complete tasks, the example is as follows:
func main(){ //First create a WaitGroup instance for the main coroutine var wg // Calling the Add method at the beginning means that a task has begun to be executed (1) go func() { // Start executing... //After completing, call the Done method () }() // Call Wait() method and wait for all coroutines to complete tasks () // Execute subsequent logic}
2.3 Use examples
package main import ( "fmt" "sync" ) func main() { var wg for i := 0; i < 5; i++ { (1) go func(i int) { defer () ("Task %d starts executing\n", i) // Simulate coroutine tasks for a period of time ((() % 100)) // Thread task execution is completed ("Task %d has been executed\n", i) }(i) } ("The main coroutine starts waiting for all tasks to be executed...") () ("All coroutines have been executed...") }
In this example, we useLet's wait for the 5 coroutines to complete execution. In the loop, every time a task is created, we call it once
(1)
Method, then start a coroutine to execute the task. When the coroutine completes the task, it is calledMethod, inform the main coroutine task that has been executed. Then the main coroutine will continue to execute downwards after all 5 coroutine tasks have been executed.
3. Implementation principle
3.1 Design Intention
WaitGroup
The original design intention is to wait for a set of operations to complete before performing the next operation, which is usually used in a set of coroutines.
3.2 Basic Principles
In the structure
state1
andstate2
Fields are used to implementWaitGroup
Important variables of function.
type WaitGroup struct { noCopy noCopy state1 uint64 state2 uint32 }
becauseWaitGroup
It is necessary to wait for a set of operations to complete before execution is executed, so it is necessary to wait for all operations to complete before continuing execution. To achieve this function, WaitGroup uses a countercounter
Let's record how many operations have not been completed, ifcounter
The value of 0 means that all operations have been completed.
at the same time,WaitGroup
After all tasks are completed, all waiting coroutines need to be awakened. At this time, you need to know how many coroutines are waiting. To achieve this function, WaitGroup uses a wait counterwaiter
To record how many coroutines are currently waiting for the operation to complete.
hereWaitGroup
The implementation of counters and wait counters is achieved through a 64-bit unsigned integer, that is,WaitGroup
state1 in the structure, where the higher 32 bits save the task countercounter
The value of the lower 32 bits save the waiting counterwaiter
value. When we create aWaitGroup
When an instance isTask CounterandWaiting for counterAll are initialized to 0.
Moreover, waiting for the coroutine needs to wait for all tasks to complete before continuing to execute, so waiting for the coroutine will be blocked when the task is not completed, and will automatically wake up after all tasks are completed.WaitGroup
usestate2
Used to implement semaphore mechanism. By callingruntime_Semacquire()
andruntime_Semrelease()
Functions can wait and notify operations without blocking threads.
3.3 Code implementation
3.3.1 Add method
CallAdd()
Methods increase/decreasecounter
The value of delta can be a positive or negative number. The following isAdd
Source code implementation of the method:
func (wg *WaitGroup) Add(delta int) { // The value of delta can be negative, and the Done method is implemented through Add(-1) // statep: is the address of state1 semap: is the address of state2 statep, semap := () // Add delta to increase the value of the task counter state := atomic.AddUint64(statep, uint64(delta)<<32) // v: Get the upper 32-bit data and get the number of tasks to be completed v := int32(state >> 32) // Get the lower 32 bits of data and get the value of the waiting thread w := uint32(state) // v > 0: Indicates the number of tasks to be completed, and the waiting coroutine should not be awakened at this time // w = 0: It means that there is no coroutine waiting, and you can exit directly at this time if v > 0 || w == 0 { return } // At this time, v = 0, all tasks are completed, wake up and wait for the coroutine *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
3.3.2 Done method implementation
CallDone()
The method indicates that a task has been completed and is calledAdd
method,delta
Value is -1, reducing the task countercounter
When the value of , when it is classified as 0, all waiting coroutines will automatically wake up.
// Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { (-1) }
3.3.3 Wait method implementation
CallWait
Method, wait for the task to be executed and increase the waiting counterWaiter
Value:
func (wg *WaitGroup) Wait() { // statep: is the address of state1 semap: is the address of state2 statep, semap := () for { // Load the value of state1 state := atomic.LoadUint64(statep) // v: Get the upper 32-bit data and get the number of tasks to be completed v := int32(state >> 32) // No tasks to be executed, all of them are completed if v == 0 { return } // Increase the value of the waiter counter if atomic.CompareAndSwapUint64(statep, state, state+1) { // Wait for awakening runtime_Semacquire(semap) return } } }
3.4 Implementing Supplement
Add
method,Done
Methods andWait
In the implementation of the method, I deleted the verification logic for some exception scenarios. When an abnormal scene occurs, the user's usage method andWaitGroup
The original intention of the design is contrary to this timeWaitGroup
It will be panic directly.
The following is an indirect introduction by explaining the precautions for useWaitGroup
exception verification logic.
4. Precautions for use
4.1 The Add method and Done method need to appear in pairs
The following is an example where the Add method and the Done method do not appear in pairs. At this time, the Add method is adjusted too much, and the counter is always greater than 0, and the Wait method will keep blocking and waiting.
package main import ( "fmt" "sync" ) func main() { var wg (2) go func() { defer () ("Goroutine 1") }() go func() { ("Goroutine 2") }() () ("All goroutines finished") }
In the above code, we call(2)
, but it was called only once()
. This will lead tocounter
The value of the value is greater than 0, so the call()
It will be permanently blocked and will not continue to execute downwards.
There is another situation when the Done method is called too much, and the task counter iscounter
The value of the negative number, fromWaitGroup
The semantics of the design means that the number of tasks that need to be completed is negative. This does not meet expectations and will be directlypanic
package main import ( "fmt" "sync" ) func main() { var wg (1) go func() { ("Goroutine 1 started") () // The first time I call the Done method () // The Done method is called the second time ("Goroutine 1 completed") }() () ("All goroutines completed") }
In the example above, we started a goroutine, the first callAdd
Method, the value of counter becomes 1, and is called on line 14Done
, the value of the counter becomes 0, and the waiting goroutine will be awakened. Called again on line 15Done
Method, when counter decreases to 0, call againDone
The method will cause panic because at this timewaitGroup
The counter has been 0, and decreasing again will result in a negative count, which is not allowed.
Therefore, when calling the Done method, it is necessary to ensure that each call corresponds one by one to the call of the Add method, otherwise it will cause an error in the program.
4.2 Call the Wait method to wait after all tasks have been added
WaitGroup
The original design intention is to wait for a set of operations to complete before performing the next operation. So, if all tasks are added, callWait
The method waits, which may cause the waiting coroutine to be awakened in advance and perform the next operation, while tasks that have not been added will not be waited, which violates the original design intention of WaitGroup and does not meet expectations. Here is a simple example:
package main import ( "fmt" "sync" "time" ) func main() { var wg for i := 1; i <= 3; i++ { go func(id int) { (1) defer () ("Goroutine %d started\n", id) ((id) * ) ("Goroutine %d finished\n", id) }(i) } // Start waiting without waiting for all tasks to be added () ("All goroutines finished") (10 * ) }
The code execution result is as follows: Wait for the coroutine to be awakened in advance and then the operation is executed, and the subtasks will only start to be executed after waiting for the coroutine to wake up:
All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished
In this example, we create three coroutines and print out the messages they start and end. However, we did not call before the task startedAdd
Methods add tasks, but call them after the task starts.Add
Method to add tasks.
This may cause some tasks to not be added toWaitGroup
In the process, wait for the coroutine to be calledMethod, this will cause some tasks to be not added
WaitGrou
, resulting in waiting coroutines not waiting for these tasks to be completed. If this happens, we will see "All goroutines finished" being output, but there are actually some coroutines that have not been completed yet.
Therefore, we should call after all tasks have been addedWait
method to ensure the correctness of waiting.
5. Common usage scenarios of WaitGroup
Used in functions or methods, if a large task can be split into multiple independent subtasks, it will be split at this time, and multiple coroutines will be used to perform these tasks concurrently to improve execution efficiency, and at the same time,WaitGroup
Wait for all subtasks to complete and complete synchronization between coroutines.
Let's see go-redisClusterClient
In structureForEachMaster
Method forWaitGroup
Use ofForEachMaster
Methods are usually used to perform some operation on all master nodes in a Redis cluster, such as adding or deleting keys in the cluster, or performing some global diagnostic operations, with specific operations passed from incoming parameters.fn
Specified.
hereForEachMaster
The method will perform some operation on all master nodes. The implementation here is to perform some operation on all master nodes. This big task is split into multiple independent subtasks. Each subtask completes the specified operation on a Master node, and then each subtask starts a coroutine to execute. The main coroutine usesWaitGroup
Wait for all coroutines to complete the specified subtask,ForEachMaster
This completes the task of performing some operation on all master nodes. The specific implementation is as follows:
func (c *ClusterClient) ForEachMaster( ctx , fn func(ctx , client *Client) error, ) error { // Reload the cluster state to ensure that the status information is up to date state, err := (ctx) if err != nil { return err } var wg // Used for inter-coroutine communication errCh := make(chan error, 1) // Get all master nodes in the redis cluster for _, master := range { // Start a coroutine to perform the task (1) go func(node *clusterNode) { // When the task is completed, call Done to inform WaitGroup that the task has been completed defer () err := fn(ctx, ) if err != nil { select { case errCh &lt;- err: default: } } }(master) } // The main coroutine waits for all tasks to complete () return nil }
Summarize
This article introduces the WaitGroup concurrent primitive in the Go language, which provides a simple and powerful mechanism to coordinate the execution of multiple Goroutines. We first learned the basic usage of WaitGroup, including how to create WaitGroup, how to add values to the counter, how to wait for all Goroutines to complete, and how to notify WaitGroup to complete in Goroutines.
Next, we understand the implementation principles of WaitGroup, including the implementation of counters and wait counters. After understanding the implementation principles, we can better understand the internal mechanisms of WaitGroup and how to better use it to achieve our needs.
In the following sections, we introduce some precautions for using WaitGroup and common ways to use it. Based on this, we have completed the introduction to WaitGroup. For more information about go synchronous coroutine WaitGroup, please follow my other related articles!