1. Preface
Previous article introducedGo concurrency control--Channel
The advantage of using channels to control child coroutines is that they are simple to implement, and the disadvantage is that when a large number of coroutines are needed, there is a need to have the same number of channels, and it is inconvenient to control the coroutines that continue to be derived from child coroutines.
2. Use WaitGroup to control
WaitGroup, which can be understood as Wait-Goroutine-Group, is waiting for a group of goroutines to end. For example, if a certain goroutine needs to wait for several other goroutines to complete, then it can be easily implemented using WaitGroup.
2.1 Use scenarios
The following program shows an example of a goroutine waiting for the end of two other goroutines:
package main import ( "fmt" "time" "sync" ) func main() { var wg (2) //Set the counter, the value is the number of goroutines go func() { //Do some work (1*) ("Goroutine 1 finished!") () //Decrement the counter by 1 after the execution of the goroutine }() go func() { //Do some work (2*) ("Goroutine 2 finished!") () //Decrement the counter by 1 after the execution of the goroutine }() () //The main goroutine blocks waiting for the counter to change to 0 ("All Goroutine finished!") }
Simply put, wg maintains a counter in the above program:
- Before starting goroutine, set the counter to the number of goroutines to be started via Add(2).
- After starting goroutine, use the Wait() method to block yourself and wait for the counter to change to 0.
- Each goroutine execution ends and the counter is reduced by 1 via the Done() method.
- After the counter becomes 0, the blocking goroutine is awakened
In fact, WaitGroup can also implement one set of goroutines to wait for another set of goroutines. This is a bit like playing acrobatics and is very leniency-enabled to make mistakes, especially if you don’t understand the implementation principle. In fact, the implementation source code of WaitGroup is very simple.
2.2 Semaphore
Semaphore is a mechanism provided by Unix systems to protect shared resources, which is used to prevent multiple threads from accessing a resource at the same time.
It can be simply understood that the semaphore is a numeric value:
- When the semaphore is > 0, it means that the resource is available. When the semaphore is obtained, the system automatically reduces the semaphore by 1;
- When semaphore ==0, it means that the resource is temporarily unavailable. When obtaining the semaphore, the current thread will go to sleep and is awakened when the semaphore is positive;
1.3 WaitGroup Data Structure
In the source code package, src/sync/:WaitGroup defines its data structure:
type WaitGroup struct { state1 [3]uint32 }
state1 is an array of length 3, which contains state and a semaphore, and state is actually two counters:
- counter: The goroutine counter that has not yet been executed
- waiter count: The number of goroutines waiting for the end of goroutine-group, that is, how many waiters are there
- semaphore: semaphore
Considering whether the bytes are aligned, the positions of the three appear differently. For simplicity, the positions of the three in memory are as follows:
WaitGroup provides three interfaces:
- Add(delta int): Add delta value to counter
- Wait(): The waiter is incremented by 1 and blocks the waiting semaphore
- Done(): counter decrement by 1, release the corresponding number of times according to the waiter value
The implementation details of these three functions are introduced below.
2.3.1 Add () method
Add() does two things. One is to accumulate the delta value into the counter, because delta can be negative, that is, the counter may become 0 or negative. So the second thing is that when the counter value becomes 0, the equal amount of semaphore is released according to the waiter value, and all the waiting goroutines are awakened. If the counter becomes negative, panic.
The Add() pseudocode is as follows:
func (wg *WaitGroup) Add(delta int) { statep, semap := () //Get the state and semaphore address pointers state := atomic.AddUint64(statep, uint64(delta)<<32) //Cumulate delta to state by 32 bits left, that is, add it to counter v := int32(state >> 32) //Get counter value w := uint32(state) //Get the waiter value if v < 0 { //After the accumulation, the counter value becomes negative, panic panic("sync: negative WaitGroup counter") } //After accumulation, at this time, counter >= 0 //If the counter is positive, it means that there is no need to release the semaphore and exit directly //If the waiter is zero, it means there is no waiter, and there is no need to release the semaphore, and exit directly if v > 0 || w == 0 { return } //At this time, the counter must be equal to 0, and the waiter must be greater than 0 (the internal maintenance of the waiter will not occur if it is less than 0). //First set the counter to 0, and then release the number of waiter semaphores *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false) //Release the semaphore, perform release one at a time, awaken one waiter } }
2.3.2 Wait()
Wait() method also does two things: one is to accumulate waiter, and the other is to block waiting for semaphores
func (wg *WaitGroup) Wait() { statep, semap := () //Get the state and semaphore address pointers for { state := atomic.LoadUint64(statep) //Get the state value v := int32(state >> 32) //Get counter value w := uint32(state) //Get the waiter value if v == 0 { //If the counter value is 0, it means that all goroutines have been exited and do not need to wait. Return directly return } // Use CAS (comparison and exchange algorithm) to accumulate waiter, the accumulation may fail. After failure, try again via the for loop next time if atomic.CompareAndSwapUint64(statep, state, state+1) { runtime_Semacquire(semap) //After the accumulation is successful, wait for the semaphore to wake up yourself return } } }
Here we use the CAS algorithm to ensure that multiple goroutines can be correctly accumulated when Wait() is executed simultaneously.
2.3.3 Done()
Done() only does one thing, that is, subtract counter by 1. We know Add() can accept negative values, so Done actually just calls Add(-1).
The source code is as follows:
func (wg *WaitGroup) Done() { (-1) }
Done()'s execution logic goes to Add(), which is actually the last completed goroutine that awakens the waiter.
2.4 Summary
Simply put, WaitGroup is usually used to wait for a set of "work coroutines" to end, and it maintains two counters internally, which are called "work coroutines" counters and "sitting coroutines" counters.
The division of labor provided by WaitGroup is very clear:
- The Add(delta int) method is used to increase the "work coroutine" count, usually called before starting a new "work coroutine";
- The Done() method is used to reduce the "work coroutine" count, decrementing by 1 per call, usually called inside the "work coroutine" and before it is close to return;
- Wait() method is used to increase the "sitting coroutine" count, usually in all "working coroutines"
In addition to decrementing the "work coroutine" count, the Done() method will also check the "sitting coroutine" counter when the "work coroutine" count becomes 0 and wake up the "sitting coroutine".
Need to pay attention
- After the Done() method decrements the "work coroutine" count, if the "work coroutine" count becomes a negative number, panic will be triggered, which requires the call of the Add() method to be earlier than the Done() method.
- That is to say, in the code, if Done is called more than Add, painc will be generated.
- When the "work coroutine" count is more than the number of "work coroutines" that actually need to be waited, the "sitting coroutine" may never be awakened and generate a column lock. At this time, the deadlock detected by Go when running will trigger panic
- The number of worker coroutines added by Add is more than the number of times Done calls, panic will appear
- When the Work Coroutine count is less than the number of Work Coroutines that actually need to be waited for, Done() triggers panic when the Work Coroutine count becomes negative.
- The number of work coroutines added by Add() is less than the number of times Done calls, panic will appear
3. Summary
The WaitGroup controls child coroutines very simple and has a clear purpose. It is waiting for a group of child coroutines to execute before executing the main thread. However, when there are child coroutines in the child coroutines and there are other child coroutines in the child coroutines, it is difficult to use WaitGroup without knowing how many child coroutines there are, so ****Context* is required to play.
This is the article about Go concurrency control-WaitGroup. For more information about Go concurrency control, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!