errgroup
It is a very practical tool provided in the official Go library x, which is used to execute multiple goroutines concurrently and to facilitate error handling.
We know that there is a Go standard libraryIt can be used to execute multiple goroutines concurrently.
errgroup
It is realized on the basis of. but,
and
There are differences in functionality, although they are all used to manage the synchronization of goroutines.
errgroup Advantages
andCompared to the design
Reasons and advantages:
Error handling:
-
It is only responsible for waiting for the goroutine to complete, and does not handle the return value or error of the goroutine.
-
Although the return value of goroutine cannot be processed directly at present, when goroutine returns an error, other running goroutines can be cancelled immediately and in
Wait
Return the first nonnil
Error.
Context Cancel:
errgroup
Can be withIn conjunction with this, it supports automatically canceling other goroutines when an error occurs in a goroutine, which can better control resources and avoid unnecessary work.
Simplify concurrent programming:
useerrgroup
Boilerboard code that can reduce error handling, and developers do not need to manually manage error status and synchronization logic, making concurrent programming simpler and easier to maintain.
Limit the number of concurrency:
errgroup
Provides a convenient interface to limit the number of concurrent goroutines to avoid overloading, andThere is no such function.
above,errgroup
It provides a more powerful error management and control mechanism for handling concurrent tasks, so it is a better choice in many concurrent scenarios.
With the in-depth explanation of this article, you will be able to deeply understand the advantages mentioned above.
Example of usage
IntroducingBefore, let’s review it together
Usage of .
Examples are as follows:
package main import ( "fmt" "net/http" "sync" ) func main() { var urls = []string{ "/", "/", "/", // This is a wrong URL that causes the task to fail } var err error var wg // Zero value is available, no explicit initialization is required for _, url := range urls { (1) // Increasing the WaitGroup counter // Start a goroutine to get the URL go func() { defer () // Decrement the WaitGroup counter when goroutine is completed resp, e := (url) if e != nil { // An error occurs and the error is logged err = e return } defer () ("fetch url %s status %s\n", url, ) }() } // Wait for all goroutine execution to complete () if err != nil { // err will record the last error ("Error: %s\n", err) } }
In the example, we useTo start 3 goroutine concurrent accesses 3 different
URL
, and print the response status code when successful, or record the error message when failure.
Execute the sample code and get the following output:
$ go run waitgroup/
fetch url / status 200 OK
fetch url / status 200 OK
Error: Get "/": dial tcp: lookup : no such host
We got two successful responses and printed an error message.
Based on the example, we can abstract itThe most typical idioms:
var wg for ... { (1) go func() { defer () // do something }() } ()
Example of usage
actuallyThe usage routine and
Very similar.
Basic use
errgroup
The basic usage routines are as follows:
- Import
errgroup
Bag. - Create a
Example.
- use
Method starts multiple concurrent tasks.
- use
Methods wait for all goroutines to complete or there is a return error.
Add the previous articleProgram example usage
Rewrite it to the following example:
package main import ( "fmt" "net/http" "/x/sync/errgroup" ) func main() { var urls = []string{ "/", "/", "/", // This is a wrong URL that causes the task to fail } // Create a new goroutine group using errgroup var g // Zero value is available, no explicit initialization is required for _, url := range urls { // Use errgroup to start a goroutine to get the URL (func() error { resp, err := (url) if err != nil { return err // An error occurred, returning the error } defer () ("fetch url %s status %s\n", url, ) return nil // Return nil means success }) } // Wait for all goroutines to complete and return the first error (if any) if err := (); err != nil { ("Error: %s\n", err) } }
It can be found that this program is related toThe examples are very similar, and they are easy to understand based on the comments in the code.
Execute the sample code and get the following output:
$ go run examples/
fetch url / status 200 OK
fetch url / status 200 OK
Error: Get "/": dial tcp: lookup : no such host
The output results have not changed much.
Context Cancel
errgroup
ProvidedCancel function can be attached. When any goroutine returns an error, other running goroutines can be cancelled immediately and in
Wait
Return the first nonnil
Error.
Examples are as follows:
package main import ( "context" "fmt" "net/http" "sync" "/x/sync/errgroup" ) func main() { var urls = []string{ "/", "/", "/", // This is a wrong URL that causes the task to fail } // Create an errgroup with context // Any goroutine returns non-nil errors, or Wait() waits for all goroutines to complete, the context will be cancelled g, ctx := (()) // Create a map to save the results var result for _, url := range urls { // Use errgroup to start a goroutine to get the URL (func() error { req, err := (ctx, "GET", url, nil) if err != nil { return err // An error occurred, returning the error } // Make a request resp, err := (req) if err != nil { return err // An error occurred, returning the error } defer () // Save the response status code of each URL (url, ) return nil // Return nil means success }) } // Wait for all goroutines to complete and return the first error (if any) if err := (); err != nil { ("Error: ", err) } // All goroutines are executed, traversing and printing successful results (func(key, value any) bool { ("fetch url %s status %s\n", key, value) return true }) }
Execute the sample code and get the following output:
$ go run examples/withcontext/
Error: Get "/": dial tcp: lookup : no such host
fetch url / status 200 OK
Judging from the test results, for[/](/)
The request can receive a successful response due to[/](/)
The request error was reported, the program did not have time to wait[/](/)
The response was cancelled.
In fact, we can roughly guess that the cancellation function should be passedTo implement it, we don’t need to go into it in depth. We can verify our conjecture by exploring the source code later.
Limit the number of concurrency
errgroup
ProvidedThe number of goroutines that can be restricted to concurrent execution.
Examples are as follows:
package main import ( "fmt" "time" "/x/sync/errgroup" ) func main() { // Create a var g // Set the maximum concurrency limit to 3 (3) // Start 10 goroutines for i := 1; i <= 10; i++ { (func() error { // Print running goroutine ("Goroutine %d is starting\n", i) (2 * ) // Simulation tasks take time ("Goroutine %d is done\n", i) return nil }) } // Wait for all goroutines to complete if err := (); err != nil { ("Encountered an error: %v\n", err) } ("All goroutines complete.") }
use(3)
The maximum concurrency can be limited to 3 goroutines.
Execute the sample code and get the following output:
$ go run examples/setlimit/
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.
According to the output, we can find that although we passfor
The loop starts 10 goroutines, but only 3 goroutines are allowed to be started at the same time when the program is executed. Only when one of the three goroutines is executed and exited, a new goroutine will be started.
Try starting
errgroup
Also providedCanTry to start a task, it returns a
bool
Value, identifying whether the task is started successfully.true
It means success,false
Indicates failure.
Need to match
Use together because if the number of concurrency is not limited, then
Always return
true
, when the maximum concurrency limit is reached,return
false
。
Examples are as follows:
package main import ( "fmt" "time" "/x/sync/errgroup" ) func main() { // Create a var g // Set the maximum concurrency limit to 3 (3) // Start 10 goroutines for i := 1; i <= 10; i++ { if (func() error { // Print running goroutine ("Goroutine %d is starting\n", i) (2 * ) // Simulation work ("Goroutine %d is done\n", i) return nil }) { // If it is successfully started, print the prompt ("Goroutine %d started successfully\n", i) } else { // If the concurrency limit is reached, print the prompt ("Goroutine %d could not start (limit reached)\n", i) } } // Wait for all goroutines to complete if err := (); err != nil { ("Encountered an error: %v\n", err) } ("All goroutines complete.") }
use(3)
The maximum concurrency is limited to 3 goroutines, callIf the task is started successfully, print
Goroutine {i} started successfully
Prompt message; if the task is started, printGoroutine {i} could not start (limit reached)
Prompt message.
Execute the sample code and get the following output:
$ go run examples/trygo/
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.
Because the maximum number of concurrency limits is 3, the first 3 goroutines are started successfully and the normal execution is completed, while all the other goroutines fail to execute.
The above iserrgroup
All the usages of , you can try and understand more usage scenarios in practice.
Source code interpretation
Next, let's read togethererrgroup
Source code to deepen theerrgroup
Understanding.
errgroup
There are very few source codes, only 3 files. The source code contents of these 3 files are as follows:
Main logic code:
/golang/sync/blob/v0.8.0/errgroup/
// Copyright 2016 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package errgroup provides synchronization, error propagation, and Context // cancelation for groups of goroutines working on subtasks of a common task. // // [] is related to [] but adds handling of tasks // returning errors. package errgroup import ( "context" "fmt" "sync" ) type token struct{} // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. type Group struct { cancel func(error) wg sem chan token errOnce err error } func (g *Group) done() { if != nil { <- } () } // WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx ) (*Group, ) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { () if != nil { () } return } // Go calls the given function in a new goroutine. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group's context, if the // group was created by calling WithContext. The error will be returned by Wait. func (g *Group) Go(f func() error) { if != nil { <- token{} } (1) go func() { defer () if err := f(); err != nil { (func() { = err if != nil { () } }) } }() } // TryGo calls the given function in a new goroutine only if the number of // active goroutines in the group is currently below the configured limit. // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { if != nil { select { case <- token{}: // Note: this allows barging iff channels in general allow barging. default: return false } } (1) go func() { defer () if err := f(); err != nil { (func() { = err if != nil { () } }) } }() return true } // SetLimit limits the number of active goroutines in this group to at most n. // A negative value indicates no limit. // // Any subsequent call to the Go method will block until it can add an active // goroutine without exceeding the configured limit. // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { if n < 0 { = nil return } if len() != 0 { panic(("errgroup: modify limit while %v goroutines in the group are still active", len())) } = make(chan token, n) }
Provided for Go 1.20 and laterwithCancelCause
Function implementation:
/golang/sync/blob/v0.8.0/errgroup/
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build go1.20 package errgroup import "context" func withCancelCause(parent ) (, func(error)) { return (parent) }
Provided for versions below Go 1.20withCancelCause
Function implementation:
/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build !go1.20 package errgroup import "context" func withCancelCause(parent ) (, func(error)) { ctx, cancel := (parent) return ctx, func(error) { cancel() } }
You can see,errgroup
The total source code is less than 100 lines, which is short and concise.
Now let's analyze iterrgroup
Source code.
We can know from the package comments,errgroup
The package provides synchronization, error propagation, and context cancellation functions for a set of goroutines to handle subtasks of common tasks.and
Related, increasing the ability to handle tasks returning errors.
In order to provide the above functions, firsterrgroup
Definedtoken
andGroup
Two structures:
// Define an empty structure type token, which will be passed as a signal to control the concurrency numbertype token struct{} // Group is a collection of coroutines that handle subtasks of the same overall task// // Zero Group is valid, there is no limit on the number of active coroutines, and will not be canceled in the event of an errortype Group struct { cancel func(error) // Cancel function, that is, type wg // Used internally sem chan token // Signal channel, which can control the number of coroutine concurrency errOnce // Make sure the error is handled only once err error // Record the first error returned in the subcoroutine set}
token
Defined as an empty structure to transmit signals, which is also the idiomatic way of Go hollow structure.
NOTE:
You can do another post in my"Go hollow structure idioms, I'll summarize it for you! 》See more usages of empty structures in .
Group
yeserrgroup
The only public structure provided by the package, and its associated methods carry all functions.
cancel
The property is a function, and it will be called when the context is cancelled. In fact, it isType, call
is assigned value.
wg
The attribute is, undertakes the main logic of concurrent control,
and
The internal concurrent control logic will be proxyed to
。
sem
The attribute istoken
Type ofchannel
, used to limit the number of concurrent callsis assigned.
err
The first error that occurs in all goroutines will be recorded,errOnce
Make sure that errors are handled only once, so that more errors will be ignored later.
Next, let's take a lookMethod definition:
// SetLimit limits the maximum number of active coroutines in this group to n, and a negative value means there is no limit// // Any subsequent calls to the Go method will be blocked until the active coroutine can be added without exceeding the limit// // When there is any active coroutine in the Group, the restrictions shall not be modifiedfunc (g *Group) SetLimit(n int) { // The n passed in is the channel length, which limits the number of concurrency of the coroutine if n < 0 { // Check here that if it is less than 0, the number of coroutine concurrency is not limited. Also, do not set it to 0, as it will cause a deadlock = nil return } if len() != 0 { // If an active coroutine exists, calling this method will produce a panic panic(("errgroup: modify limit while %v goroutines in the group are still active", len())) } = make(chan token, n) }
Methods can limit concurrent attributes, and their internal logic is very simple, but be careful when calling
or
Call before method
, in case the program appears
panic
。
Then look at the main logicMethod implementation:
// Go will call the given function in the new coroutine// It blocks until new coroutines can be added without exceeding the number of active coroutines configured// // The first call that returns a non-nil error will cancel the context (context) of the Group. If the context is created by calling WithContext, the error will be returned by Waitfunc (g *Group) Go(f func() error) { if != nil { // This is a signal channel that limits the number of concurrency <- token{} // If the configured number of active coroutines limit is exceeded, sending tokens to the channel will block } (1) // Forward to (1), add the number of active coroutines by one go func() { defer () // When a coroutine completes, this method is called, and the call will be forwarded to () if err := f(); err != nil { // f() is the task we want to perform (func() { // Only execute once, that is, only handle the error once, so the first non-nil error will be recorded, which has nothing to do with the order of coroutine startup = err // Record error if != nil { // If cancel is not nil, call the cancel function and set cause () } }) } }() }
First, it will check whether it is usedThe method sets concurrency restrictions. If there is a limit, use
channel
to control the number of concurrency.
Otherwise, executing the main logic is actuallyroutine code.
existdefer
Called()
,done
The method is defined as follows:
// When a coroutine is completed, call this methodfunc (g *Group) done() { // If the maximum concurrency number is set, sem is not nil. A token is consumed from the channel, indicating that a coroutine has been completed. if != nil { <- } () // Forward to (), subtract the number of active coroutines by one}
In addition, if a task returns an error, it passeserrOnce
Make sure that the error is processed only once, the processing method is to record the error first and then callcancel
method.
cancel
Actually it isAssigned values in the method:
// WithContext Returns a new Group and an association derived from ctx Context// // The derived Context will be canceled when the function passed to Go returns a non-nil error for the first time or the Wait returns for the first time, whichever occurs first.func WithContext(ctx ) (*Group, ) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx }
HerewithCancelCause
There are two implementations.
If the Go version is greater than or equal to 1.20, thewithCancelCause
The function implementation is as follows:
// The build constraint identifies that this file was added to the Go version 1.20//go:build go1.20 package errgroup import "context" // Agent tofunc withCancelCause(parent ) (, func(error)) { return (parent) }
If the Go version is less than 1.20, thewithCancelCause
The function implementation is as follows:
//go:build !go1.20 package errgroup import "context" func withCancelCause(parent ) (, func(error)) { ctx, cancel := (parent) return ctx, func(error) { cancel() } }
becauseThe method was added in Go 1.20 version, you can find it in Go 1.20 Release Notes, you can also find it in this oneCommit: 93782ccSeen in
withCancelCause
Function change record.
CallAfter the method starts the task, we will call
Wait for all tasks to be completed, and the implementation is as follows:
// Wait blocks until all function calls from the Go method return, and then returns the first non-nil error among them (if any)func (g *Group) Wait() error { () // Forward to (), waiting for all coroutine execution to complete if != nil { // If cancel is not nil, call the cancel function and set cause () } return // Return an error}
So, in the endThe error returned is actually
The first error recorded in the method.
Now, we have the last method left.The source code of has not been analyzed, I posted the source code below and wrote detailed comments:
// TryGo calls the given function in a new coroutine only when the number of active coroutines in the Group is below the limit// // The return value identifies whether the coroutine is startedfunc (g *Group) TryGo(f func() error) bool { if != nil { // If the maximum concurrency is set select { case <- token{}: // You can write token to the channel, indicating that the limit has not been reached, and you can start the coroutine // Note: this allows barging iff channels in general allow barging. default: // If the number of active coroutines configured is exceeded, this case will be reached. return false } } // The following code is the same as in Go (1) go func() { defer () if err := f(); err != nil { (func() { = err if != nil { () } }) } }() return true }
Main logic andThe method is the same, the difference is
The method will block if the concurrency limit is reached, and
The method returns directly when the concurrency limit is reached.
false
。
actually<font style="color:rgb(31, 35, 40);"></font>
and<font style="color:rgb(31, 35, 40);"></font>
The two methods are the functions added later, you can see the discussion records in issues/27837.
At this point,errgroup
The source code has been interpreted.
Summarize
errgroup
It is an extension library provided by the official, inOn the basis, the ability to handle tasks returning errors has been increased. Provides synchronization, error propagation, and context cancellation capabilities for a set of goroutines to handle subtasks of common tasks.
The method can attach a cancel function. When any goroutine returns an error, it will immediately cancel other running goroutines and in
Wait
Return the first nonnil
Error.
Methods can limit the number of goroutines executed concurrently.
You can try to start a task, and the return value indicates whether the startup is successful or failed.
errgroup
The source code design is exquisite and worth learning from.
The above is the detailed explanation of the use of Golang concurrency control errgroup. For more information about Golang errgroup, please follow my other related articles!