SoFunction
Updated on 2025-03-05

Analysis of Golang errgroup Design and Implementation Principle

Opening

Followed the last time I studiedsemaphoreAfter the design ideas and implementation of the extension library, we will continue to look at it today/x/syncAnother big killer under the package that is often used by Golang developers: errgroup.

In business research and development, we often encounter scenarios where multiple downstream calls are needed, such as loading a product details page. You may need to access product services, inventory services, coupon services, user services, etc. in order to obtain the required information from each data source, and after some authentication logic, it is assembled into the data format required by the front-end to issue.

Of course, serial calls are possible, but this potentially presets the [order] for each call, and operations D must be performed before operations A, B, and C can be performed. But if we do not have strong demands for order and semantically two calls are completely independent and concurrent, then we can let them execute concurrently.

At this time, you can use errgroup to solve the problem. In a sense, errgroup is based on WaitGroup's ability to perform some optimizations on error transmission. It can not only support the relevant control capabilities, but also pass the error of the subtask upwards.

errgroup source code disassembly

errgroup is defined in/x/sync/errgroup, the structure that carries the core capabilities is Group.

Group

type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
	cancel func()
	wg 
	sem chan token
	errOnce 
	err     error
}

Group is an abstraction of the group of sub-task execution plans we mentioned above. Each subtask will have its own corresponding goroutine to execute.

Through this structure, we can also see that the errgroup implements multiple goroutine schedulings at the bottom of the errgroup, and the waiting ability is still based on .

WithContext

We can call the function to create a group.

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx ) (*Group, ) {
	ctx, cancel := (ctx)
	return &Group{cancel: cancel}, ctx
}

As you can see here, the cancel function of the Group is essentially to support the cancel capability of context. The initialized Group only has one cancel attribute, and the others are all default. Once a subtask returns an error, or the Wait call returns, the new Context will be canceled.

Wait

In essence, it is the same as the Wait method semantics of WaitGroup. Since we are a group task, we need to wait for all tasks to be executed, and this semantics is provided by the Wait method. If multiple subtasks return an error, it will only return the first error, and if all subtasks are executed successfully, it will return nil.

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	()
	if  != nil {
		()
	}
	return 
}

Wait implementation is very simple. A front-mounted WaitGroup Wait, only two things did:

  • If there is a cancel function for the public Context, cancel it because the matter is done;
  • Returns the err in the public Group structure to the caller.

Go

The core capability of Group lies in the ability to execute multiple subtasks concurrently. From the caller's point of view, we only need to pass in the function to be executed, with the signature as:func() errorYes, very general. If the task is executed successfully, it returns nil, otherwise it returns error and the new Context will be canceled. The underlying scheduling logic is implemented by the Group's Go method:

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	if  != nil {
		 <- token{}
	}
	(1)
	go func() {
		defer ()
		if err := f(); err != nil {
			(func() {
				 = err
				if  != nil {
					()
				}
			})
		}
	}()
}
func (g *Group) done() {
	if  != nil {
		<-
	}
	()
}

Let's focus on analyzing what's going on here in Go.

WaitGroup adds 1 to use as count;

Start a new goroutine to execute the f() function passed by the caller;

  • If err is nil, it means that the execution is normal;
  • If err is not nil, it means an execution error. At this time, the returned err is assigned to the global group variable err and the context cancel is removed. Note that the processing here is in the once branch, that is, only the first error will be processed.

Call the done method of the Group in the defer statement, and the underlying dependency on the Done of WaitGroup indicates that the subtask is over.

As you can also see here, in fact, the so-called errgroup, we do not spell the error of all subtasks into a string to return, but directly return the first error in the Go method, and the underlying layer depends on the ability of once.

SetLimit

Actually, after seeing this, do you think the function of errgroup is a bit useless? The underlying core technologies are all completed by WaitGroup. I just developed a goroutine execution method, and I can only keep one err. And what is the chan in the sem in the Group used for?

In this section, we will take a look at Golang's expansion of errgroup capabilities.

So far, errgroup can do what people expect from it at the beginning, namely, performing subtasks concurrently. But the problem is that every subtask has opened a goroutine. If it is in a highly concurrency environment, frequent creation of a large number of goroutines can easily affect the resource load. The developers have proposed that there is a way to limit the number of goroutines created by errgroup, refer to this proposal:#27837

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
	if n < 0 {
		 = nil
		return
	}
	if len() != 0 {
		panic(("errgroup: modify limit while %v goroutines in the group are still active", len()))
	}
	 = make(chan token, n)
}

The parameter n of SetLimit is the maximum number of goroutines we expect from this group. In fact, except for the verification logic, we only do one thing: = make(chan token, n), that is, create a channel of length n and assign it to sem.

Recalling our performance in calling done by defer Go method, is it much clearer? Let's do it:

First, the Group structure contains the sem variable, which is only used as communication, and the elements are empty structures and do not contain actual meaning:

type Group struct {
	cancel func()
	wg 
	sem chan token
	errOnce 
	err     error
}

If you have no requirements for the Limit of the entire Group, which is fine, you just ignore the SetLimit, and the original capabilities of the errgroup can support your request.

However, if you want to keep the goroutine of the errgroup within an upper limit, setLimit before calling the Go method, so that the sem of the group is assigned to a channel of length n.

So, when you call the Go method, pay attention to the framework code below. It is not nil at this time, so we will stuff a token in, which means that I will apply for a goroutine.

func (g *Group) Go(f func() error) {
	if  != nil {
		 <- token{}
	}
	(1)
	go func() {
		defer ()
                ...
	}()
}

Of course, if the number of goroutines has reached the upper limit at this time, it will block here until other goroutines have finished their work and sem outputs a token before they can continue to stuff them inside.

After each goroutine is executed, defer'sThe method is to accomplish this:

func (g *Group) done() {
	if  != nil {
		<-
	}
	()
}

This way the usage of sem is stringed together. We realize the control of the number of goroutines by creating a fixed-length channel. We don’t care about the elements actually contained in the channel, so we use an empty structure to save space. This is a very excellent design, and you can refer to it in daily life.

TryGo

TryGo and SetLimit are actually the abilities that Ou Changkun submitted to errgroup not long ago.

As always, all functions with TryXXX will not block. In fact, the thing to do is very simple. Just like the Go method, one is sent in.func() errorto execute.

andGoThe difference between the method is that if you judge that limit is not enough, you will no longer block at this time, but will directly return false, which means that the execution has failed. The other parts are exactly the same.

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
	if  != nil {
		select {
		case  <- token{}:
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		}
	}
	(1)
	go func() {
		defer ()
		if err := f(); err != nil {
			(func() {
				 = err
				if  != nil {
					()
				}
			})
		}
	}()
	return true
}

How to use

Here we first look at the most common usage, for a set of tasks, each of which is executed separately on goroutine, and finally get the err returned by Wait as the err of the entire Group.

package main
import (
    "errors"
    "fmt"
    "time"
    "/x/sync/errgroup"
)
func main() {
    var g 
    // Start the first subtask, it executes successfully    (func() error {
        (5 * )
        ("exec #1")
        return nil
    })
    // Start the second subtask, it fails to execute    (func() error {
        (10 * )
        ("exec #2")
        return ("failed to exec #2")
    })
    // Start the third subtask, it executes successfully    (func() error {
        (15 * )
        ("exec #3")
        return nil
    })
    // Wait for all three tasks to be completed    if err := (); err == nil {
        ("Successfully exec all")
    } else {
        ("failed:", err)
    }
}

You will find that the last err is printed out as the err of the second subtask.

Of course, we only reported an error in the case above, but what if there are actually multiple tasks that have failed?

From the perspective of completeness, is there any way to return errors from multiple tasks?

In fact, the errgroup library does not provide very good support, and developers need to make some modifications on their own. Because there is only one err variable in the Group, it is impossible for us to achieve this based on the Group.

Normally, we create a slice to store itf()Execute err.

package main
import (
    "errors"
    "fmt"
    "time"
    "/x/sync/errgroup"
)
func main() {
    var g 
    var result = make([]error, 3)
    // Start the first subtask, it executes successfully    (func() error {
        (5 * )
        ("exec #1")
        result[0] = nil // Save successful or failed results        return nil
    })
    // Start the second subtask, it fails to execute    (func() error {
        (10 * )
        ("exec #2")
        result[1] = ("failed to exec #2") // Save the result successfully or failed        return result[1]
    })
    // Start the third subtask, it executes successfully    (func() error {
        (15 * )
        ("exec #3")
        result[2] = nil // Save successful or failed results        return nil
    })
    if err := (); err == nil {
        ("Successfully exec all. result: %v\n", result)
    } else {
        ("failed: %v\n", result)
    }
}

As you can see, we declare a result slice with a length of 3. There is no need to worry about concurrency here, because the location of each goroutine reads and writes is determined and unique.

In essence, we can understand that wef()The returned err not only gave the Group one, but also saved one. When something goes wrong, we may not really use the error returned by Wait, but directly use the error slice that we are wrong.

The example of using errgroup to implement pipeline in the official Go document is also very meaningful: a subtask traverses the files in the folder, and then handes the traversed files to 20 goroutines, allowing these goroutines to calculate md5 of the files in parallel.

Here I will post a simplified code to learn.

package main
import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	"/x/sync/errgroup"
)
// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// /pipelines.
func main() {
	m, err := MD5All((), ".")
	if err != nil {
		(err)
	}
	for k, sum := range m {
		("%s:\t%x\n", k, sum)
	}
}
type result struct {
	path string
	sum  []byte
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx , root string) (map[string][]byte, error) {
	// ctx is canceled when () returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := (ctx)
	paths := make(chan string)
	(func() error {
		defer close(paths)
		return (root, func(path string, info , err error) error {
			if err != nil {
				return err
			}
			if !().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-():
				return ()
			}
			return nil
		})
	})
	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		(func() error {
			for path := range paths {
				data, err := (path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, (data)}:
				case <-():
					return ()
				}
			}
			return nil
		})
	}
	go func() {
		()
		close(c)
	}()
	m := make(map[string][]byte)
	for r := range c {
		m[] = 
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := (); err != nil {
		return nil, err
	}
	return m, nil
}

In fact, channel plays a crucial role in essence. Here we recommend that you try to read the source article if you have time://…

It is very helpful for implementing pipeline mode with errgroup.

Conclusion

Today we learned the new SetLimit capabilities of the source code of errgroup. In fact, we have seen these sync-related libraries. The overall ability used is actually quite different, and many design ideas are very sophisticated. For example, in the errgroup, a fixed-length channel is used to control the number of goroutines, and an empty structure saves memory space.

In addition, the implementation of sync packages is generally very concise, such as once, singleflight, semaphore, etc. It is recommended that you go through it yourself if you have time, and your understanding of concurrency and design patterns will be taken to a higher level.

errgroup itself is not complicated, and there are many packaging implementations in the industry. You can compare the source code and think about what else can be improved.

The above is the detailed content of the Golang errgroup design and implementation principle analysis. For more information about the design principles of Golang errgroup, please pay attention to my other related articles!