WaitGroup is used for thread synchronization. In many scenarios, multiple coroutines need to be executed in order to improve concurrency, but the results of multiple coroutines need to be returned before subsequent logical processing is performed. In this case, the main thread can be blocked through the method provided by WaitGroup until all goroutine executions are completed.
Directory structure of this article:
WaitGroup cannot be copied by value
Add needs to be called before Wait
Use channel to implement the functions of WaitGroup
Add and Done quantity issues
WaitGroup and channel control concurrency
WaitGroup and channel implement early exit
WaitGroup and channel return error
Use ErrGroup to return an error
Use ErrGroup to achieve early exit
Improved version of Errgroup
WaitGroup cannot be copied by value
When wg is passed as a parameter, when we operate in the function, we are still a copy of the variable, and will not change the original wg.
This can be seen from the struct defined by the source code implemented by WaitGroup. The struct of WaitGroup only has two fields, the first field is noCopy, indicating that this structure does not want to be copied directly. The implementation of noCopy is an empty struct{}. Its main function is to embed it into the structure as an auxiliary vet tool to check whether this WaitGroup instance is assigned through copy. If there is a copy of the value, it will be detected, and our general lint tools can also detect it.
In some cases, if WaitGroup needs to be passed as a parameter to other methods, it is necessary to use pointer type for passing.
type WaitGroup struct { noCopy noCopy // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. // 64-bit atomic operations require 64-bit alignment, but 32-bit // compilers do not ensure it. So we allocate 12 bytes and then use // the aligned 8 bytes in them as state, and the other 4 as storage // for the sema. state1 [3]uint32 }
This can be explained by using the following example:
// Incorrect usage, function pass wg is a value copyfunc main() { wg := {} (10) for i := 0; i < 10; i++ { go func(i int) { do(i, wg) }(i) } () ("success") } func do(i int, wg ) { // Copying wg value will cause the program (i) () } // Correct usage, waitgroup parameter passing using pointer formfunc main() { wg := {} (10) for i := 0; i < 10; i++ { go func(i int) { do(i, &wg) }(i) } () ("success") } func do(i int, wg *) { (i) () }
Add needs to be called before Wait
The WaitGroup structure provides three methods: Add, Done, Wait. The function of Add is to set the count value of WaitGroup (number of sub-goroutines); Done is to subtract the count value of WaitGroup by 1, which is actually to call Add(-1); Wait is to detect whether the value of the WaitGroup counter is 0. If it is 0, it means that all goroutines are completed, otherwise it will block and wait for the counter value to be 0 (all groutines are completed) before running the following code.
Therefore, when calling WaitGroup, it is necessary to ensure that the Add function is executed before the Wait function, otherwise the Wait method may be executed without waiting for all the results to be completed. That is, we cannot execute Add and Done in Groundine, so that the current Groundine may not be able to run, and the outer Wait function detects that the condition is met and then exits.
func main() { wg := {} () // Calling the Wait() method directly will not block, because the value of the goroutine counter in wg is 0 ("success") }
// Incorrect writing, perform Add(1) operation in goroutine.// Maybe before these goroutines can add(1) to perform the Wait operationfunc main() { wg := {} for i := 0; i < 10; i++ { go func(i int) { (1) (i) () }(i) } () ("success") } // The printing result is not the 10 elements we expected to print and then print success, but a part of it will be randomly printed.success 1 0 5 2 // Correct writing method 1func main() { wg := {} (10) // Set the number of goroutines to run in the outer layer of groutine first, so as to ensure that it is executed first than the Wait function for i := 0; i < 10; i++ { go func(i int) { (i) () }(i) } () ("success") } // Correct writing method twofunc main() { wg := {} for i := 0; i < 10; i++ { (1) // Enforce the Wait function first go func(i int) { (i) () }(i) } () ("success") }
Use channel to implement the functions of WaitGroup
If you want to implement the main thread waiting for the results of multiple coroutines to return before making subsequent calls, you can also use a channel with a cache area to implement it. The implementation idea is to first know the number of waiting for groutine to run, then initialize a channel with the same number of cache areas, put a value into the channel after the groutine run is finished, and block the listening in the main thread to get all the values in the channel back.
func main() { numGroutine := 10 ch := make(chan struct{}, numGroutine) for i := 0; i < numGroutine; i++ { go func(i int) { (i) ch <- struct{}{} }(i) } for i := 0; i < numGroutine; i++ { <-ch } ("success") } // Print result:7 5 3 1 9 0 4 2 6 8 success
Add and Done quantity issues
It is necessary to ensure that the number of Adds is consistent with the number of Done. If the number of Adds is less than the number of Done, calling the Wait method will detect that the counter value is negative, and the program will report panic; if the number of Adds is greater than the number of Done, it will cause the Wait loop to block the subsequent code and cannot be executed.
The number of Add is less than the number of Done:
func main() { wg := {} (1) // The number of Add is less than the number of Done for i := 0; i < 10; i++ { go func(i int) { (i) () }(i) } () ("success") } // There are two results for the operation resultResults One:Print part of the output and exit,This is because Done Execute one only,Wait Detected that the condition just met and then exited 1 success 9 5 Results Two:implement Wait When the function,The counter value is already a negative number 0 9 3 panic: sync: negative WaitGroup counter
The number of Add is greater than the number of Done:
func main() { wg := {} (20) for i := 0; i < 10; i++ { go func(i int) { (i) () }(i) } () ("success") } // Execution result: deadlock0 9 3 7 8 1 4 2 6 5 fatal error: all goroutines are asleep - deadlock!
WaitGroup and channel control concurrency
Use waitgroup to control a group of groutines to run simultaneously and wait for the result to return before performing subsequent operations. Although groutine consumes less resources, a large amount of groutine concurrency still puts a lot of pressure on the system. Therefore, if you need to control the number of groutine concurrency in waitgroup in this case, you can use the cached channel to control the number of groutines concurrent simultaneously.
func main() { wg := {} (200) ch := make(chan struct{}, 10) // The maximum number of concurrencies is 10 for i := 0; i < 200; i++ { ch <- struct{}{} go func(i int) { (i) () <-ch }(i) } () ("success") }
According to the idea of using channel to implement the functions of WaitGroup, our above code can also be transformed through two channels.
func main() { numGroutine := 200 // Total number of groutines running numParallel := 10 // Number of concurrent groutines chTotal := make(chan struct{}, numGroutine) chParallel := make(chan struct{}, numParallel) for i := 0; i < 200; i++ { chTotal <- struct{}{} go func(i int) { (i) <-chTotal chParallel <- struct{}{} }(i) } for i := 0; i < numGroutine; i++ { <-chParallel } ("success") }
WaitGroup and channel implement early exit
It is common to use WaitGroup to coordinate a set of concurrent goroutines, but WaitGroup itself has its shortcomings:
WaitGroup must wait for all the control goroutines to return the results before running downwards. However, in some cases, we hope to fail quickly. That is, as long as one of the goroutines fails, we should not wait until all goroutines end before ending the task, but end in advance to avoid wasting resources. At this time, we can use channel to cooperate with WaitGroup to achieve the effect of early exit.
func main() { wg := {} (10) ch := make(chan struct{}) // Use a channel to pass the exit signal for i := 0; i < 10; i++ { go func(i int) { ((i) * ) (i) if i == 2 { // If i==2 is detected, exit early ch <- struct{}{} } () }(i) } go func() { () // After execution, it means that all groutines have been executed, and no groutine sends an exit signal to ch ch <- struct{}{} // A signal needs to be passed, otherwise the main thread will keep blocking }() <-ch // Blocking and waiting for the exit signal to be received and then execution is carried out ("success") } // Print the result0 1 2 success
WaitGroup and channel return error
In addition to failing quickly, WaitGroup has another problem that cannot obtain the error returned when groutine error occurs in the main thread. In this case, the channel can be used for error transmission and the error is obtained in the main thread.
// Case 1: As long as one of the groutines fails, it returns err and returns to the main coroutine to run subsequent codefunc main() { wg := {} (10) ch := make(chan error) // Use a channel to pass the exit signal for i := 0; i < 10; i++ { go func(i int) { ((i) * ) if i == 2 { // If i==2 is detected, exit early ch <- ("i can't be 2") close(ch) return } (i) () }(i) } go func() { () // After execution, it means that all groutines have been executed, and no groutine sends an exit signal to ch ch <- nil // You need to pass a nil error, otherwise the main thread will keep blocking close(ch) }() err := <-ch (()) } // Running results:/* 0 1 i can't be 2 */ // Case 2: Wait for all groutines to complete before returning to the main thread and capturing all errorsfunc main() { wg := {} (10) ch := make(chan error, 10) // The settings are the same as groutine, and can buffer up to 10 errors for i := 0; i < 10; i++ { go func(i int) { defer func() { () }() ((i) * ) if i == 2 { ch <- ("i can't be 2") return } if i == 3 { ch <- ("i can't be 3") return } (i) }(i) } () // After execution, it means that all groutines have been executed. close(ch) // Close channel is required, otherwise the main thread will block for err := range ch { (()) } } // Print result:0 1 4 5 6 7 8 9 i can't be 2 i can't be 3
Use ErrGroup to return an error
It is precisely because WaitGroup has some of the shortcomings mentioned above that the Go team has added functions in the experimental repository (/x). Compared with WaitGroup, it has added functions such as error delivery, fast failure, timeout cancellation, etc., which is more convenient and more recommended than implementing these functions through the combination of channel and WaitGroup.
The structure is also relatively simple. On the basis of , an error and a cancel method are wrapped. The function of err is to return when a goroutine error occurs, and the function of cancel method is to fail quickly when an error occurs.
Three methods have been exposed to the outside world, WithContext, Go, Wait, and there are no Add and Done methods. In fact, Add and Done are wrapped in the Go method, and we don’t need to care about it when executing it.
// A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid and does not cancel on error. type Group struct { cancel func() wg errOnce err error } func WithContext(ctx ) (*Group, ) { ctx, cancel := (ctx) return &Group{cancel: cancel}, ctx } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { () if != nil { () } return } // Go calls the given function in a new goroutine. // // The first call to return a non-nil error cancels the group; its error will be // returned by Wait. func (g *Group) Go(f func() error) { (1) go func() { defer () if err := f(); err != nil { (func() { = err if != nil { () } }) } }() }
Here is an example of using : to implement a goroutine error:
func main() { eg := {} for i := 0; i < 10; i++ { i := i // Assignment operation is required here, otherwise there will be closure problems. The executed groutine will refer to the i of the for loop (func() error { if i == 2 { return ("i can't be 2") } (i) return nil }) } if err := (); err != nil { (()) } } // Print the result9 6 7 8 3 4 1 5 0 i can't be 2
One thing to note is that returning err by err will only return one of the groutines, and it is the first error to return err's groutine. This is achieved through errOnce of err.
Use ErrGroup to achieve early exit
It is also relatively simple to use to implement early exit. Call the method to obtain the object and a context of WithCancel that can be cancelled, and pass this context method into all groutines, and use select to listen to the Done() event of this context in groutine. If you listen, it means that you have received a cancel signal, then exit groutine. It should be noted that an err must be returned to trigger the execution of the cancel method.
// Case 1: Return err to trigger the underlying cancel method through groutine displayfunc main() { ctx := () eg, ctx := (ctx) for i := 0; i < 10; i++ { i := i // Assignment operation is required here, otherwise there will be closure problems. The executed groutine will refer to the i of the for loop (func() error { select { case <-(): return () case <-((i) * ): } if i == 2 { return ("i can't be 2") // The cancel method of err needs to be returned to eg } (i) return nil }) } if err := (); err != nil { (()) } } // Print result:0 1 i can't be 2 // Case 2: Notify each groutine to exit by calling the cancel method by displayingfunc main() { ctx, cancel := (()) eg, ctx := (ctx) for i := 0; i < 10; i++ { i := i // Assignment operation is required here, otherwise there will be closure problems. The executed groutine will refer to the i of the for loop (func() error { select { case <-(): return () case <-((i) * ): } if i == 2 { cancel() return nil // You can do not need to return err, because the cancel method is manually triggered //return ("i can't be 2") } (i) return nil }) } if err := (); err != nil { (()) } } // Print result:0 1 context canceled // Case 3:// Based on errgroup, it realizes the startup and shutdown of an http server, as well as the registration and processing of linux signal signals. It is necessary to ensure that one exit is possible and all log out is cancelled.// /post/ func main() { g, ctx := (()) mux := () ("/ping", func(w , r *) { ([]byte("pong")) }) // Simulate a single service error exit serverOut := make(chan struct{}) ("/shutdown", func(w , r *) { serverOut <- struct{}{} }) server := { Handler: mux, Addr: ":8080", } // g1 // Can all coroutines be exited if they exit g1? // After g1 exits, context will no longer block, g2 and g3 will exit accordingly // Then () in the main function exits, and all coroutines will exit (func() error { return () }) // g2 // Can all coroutines be exited if they exit g2? // When g2 exits, shutdown is called, g1 will exit // After g2 exits, context will no longer block, g3 will exit // Then () in the main function exits, and all coroutines will exit (func() error { select { case <-(): ("errgroup exit...") case <-serverOut: ("server will out...") } timeoutCtx, cancel := ((), 3*) // It is not necessary here, but if you use _, the static scanning tool will report an error, and it will not hurt. defer cancel() ("shutting down server...") return (timeoutCtx) }) // g3 // g3 captures the os exit signal and will exit // Can all coroutines be exited if they exit g3? // After g3 exits, context will no longer block, g2 will exit // When g2 exits, shutdown is called, g1 will exit // Then () in the main function exits, and all coroutines will exit (func() error { quit := make(chan , 0) (quit, , ) select { case <-(): return () case sig := <-quit: return ("get os signal: %v", sig) } }) ("errgroup exiting: %+v\n", ()) }
Improved version of Errgroup
Using WithContext We noticed that while returning the eg object, another context object that can be cancelled will be returned. The function of this context object is to pass it to a groutine that cancels the entire synchronization when an error occurs in the groutine that eg needs to be synchronized. However, many students may inadvertently pass this context to other non-eg synchronized business code groutines, which will cause the non-associated business code to receive cancel information inexplicably, similar to the following writing method:
func main() { ctx := () eg, ctx := (ctx) for i := 0; i < 10; i++ { i := i // Assignment operation is required here, otherwise there will be closure problems. The executed groutine will refer to the i of the for loop (func() error { select { case <-(): return () case <-((i) * ): } if i == 2 { return ("i can't be 2") // The cancel method of err needs to be returned to eg } (i) return nil }) } if err := (); err != nil { (()) } OtherLogic(ctx) } func OtherLogic(ctx ) { // The context here uses the context returned by eg, which may be passed to more funcs later // If you cancel the model for context listening in this method or subsequent func, these contexts will be cancelled}
In addition, neither WaitGroup nor the function of controlling the maximum concurrency limit and panic recovery is supported, because we cannot guarantee that we will not experience exceptions through the groutine created. If the exception is not caught in the created coroutine, it will directly cause the entire program to exit, which is very dangerous.
Here I recommend that go-kratos/kratos, the open source microservice framework of bilbil, implement an improved version itself. The idea is to use channel to control concurrency, and to create an errgroup, context will not be returned to avoid context passing into non-associated business methods.
This is the end of this article about the detailed explanation of the Golang standard library tips: waitgroup. For more related Golang waitgroup content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!