SoFunction
Updated on 2025-03-03

Detailed explanation of the use of ErrGroup in Go language

In concurrent programming, concurrent primitives are used very frequently, and they are often used in scenarios of collaborative waiting: goroutine A waits at a checkpoint for all worker goroutines that perform tasks. If these goroutines that perform tasks have not been completed, goroutine A will block at the checkpoint until all the woker goroutines are completed before they can continue to execute.

What should I do if I encounter an error during the execution of the woker goroutine and want to deal with it? WaitGroup does not provide the function of propagating errors. What should I do when I encounter this scenario? The Go language provides the ErrorGroup concurrent primitive in the extension library, which is just suitable for use in this scenario. It also provides the functions of error propagation and context cancellation based on WaitGroup.

The Go extension library provides the functionality of the ErrorGroup primitive by providing three methods to call:

func WithContext(ctx ) (*Group, )
func (g *Group) Go(f func() error)
func (g *Group) Wait() error

Next, we let the main goroutine use ErrorGroup instead of WaitGroup to wait for the subtask to be completed. One feature of ErrorGroup is

Will return the first error encountered by the goroutine that executes the task. Try to execute the following program and observe the output of the program.

package main
import (
	"fmt"
	"/x/sync/errgroup"
	"net/http"
)
func main() {
	var urls = []string{
		"/",
		"/",
		"http:///",
	}
	g := new()
	for _, url := range urls {
		url := url
		(func() error {
			resp, err := (url)
			if err != nil {
				(err)
				return err
			}
			("get [%s] success: [%d] \n", url, )
			return ()
		})
	}
	if err := (); err != nil {
		(err)
	} else {
		("All success!")
	}
}

Output:

Get "http:///": dial tcp: lookup : no such host
get [/] success: [200]
Get "/": dial tcp 172.217.24.113:80: connectex: A connection attempt failed because the connected party did not properly respond after a period o
f time, or established connection failed because connected host has failed to respond.
Get "http:///": dial tcp: lookup : no such host

ErrorGroup has a feature that it will return the first error encountered by the goroutine that executes the task:

package main
import (
	"fmt"
	"/x/sync/errgroup"
	"log"
	"time"
)
func main() {
	var eg 
	for i := 0; i < 100; i++ {
		i := i
		(func() error {
			(2 * )
			if i > 90 {
				("Error:", i)
				return ("Error occurred: %d", i)
			}
			("End:", i)
			return nil
		})
	}
	if err := (); err != nil {
		(err)
	}
}

In the above program, if i is greater than 90, an error will be generated to end the execution, but only the error generated during the first execution is returned by ErrorGroup. The output of the program is roughly as follows:

Output:

......
End: 35
End: 38
End: 28
End: 37
End:38;2;127;0;0m2023/06/29 14:18:03 Error occurred: 98
32
Error: 92
End: 23
End: 30
Error: 95
Error: 94
End: 74
End: 25
......

The first goroutine that encountered an error when executing an error outputs an error: 98, but all other tasks that have not been executed have not stopped executing. So what should I do if the program wants to terminate other subtasks when encountering an error? We can use the provided WithContext method to create an ErrorGroup with cancelled context functionality.

Pay attention to its two characteristics when using it:

  • After an error occurs or the wait is completed, the cancel method of the Context object will be called synchronously cancel the signal.
  • Only the first error will be returned, and the remaining errors will be discarded directly.
package main
import (
	"context"
	"fmt"
	"/x/sync/errgroup"
	"log"
	"time"
)
func main() {
	eg, ctx := (())
	for i := 0; i < 100; i++ {
		i := i
		(func() error {
			(2 * )
			select {
			case <-():
				("Canceled:", i)
				return nil
			default:
				if i > 90 {
					("Error:", i)
					return ("Error: %d", i)
				}
				("End:", i)
				return nil
			}
		})
	}
	if err := (); err != nil {
		(err)
	}
}

Gouroutine enabled by Go method. When executing a function passed in with parameters, if the function returns an error, it will assign a value to the err field held by ErrorGroup and call the cancel function in time to notify other subtasks to cancel the execution of the task through the context. Therefore, after the program updated above is run, there are similar outputs as follows.

......
Canceled: 87
Canceled: 34
Canceled: 92
Canceled: 86
Cancled: 78
Canceled: 46
Cancel[38;2;127;0;0m2023/06/29 14:22:07 Error: 99
ed: 45
Canceled: 44
Canceled: 77
Canceled: 43
Canceled: 50
Canceled: 42
Canceled: 25
Canceled: 76
Canceled: 24
Canceled: 75
Canceled: 40
......

errorgroup source code:

In the above example, after an error occurs in the sub goroutine, it will cancel to other subtasks, but we do not see the cancel method that calls ctx. Let's take a look at the source code and see how it is handled internally. The design of errgroup is very concise, and the code is as follows:

package errgroup
import (
    "context"
    "sync"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
    cancel func()
    wg 
    errOnce 
    err     error
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx ) (*Group, ) {
    ctx, cancel := (ctx)
    return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
    ()
    if  != nil {
        ()
    }
    return 
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
    (1)
    go func() {
        defer ()
        if err := f(); err != nil {
            (func() {
                 = err
                if  != nil {
                    ()
                }
            })
        }
    }()
}

As you can see, the implementation of errgroup relies on the structure group. It inherits the characteristics of WaitGroup through encapsulation, starts a new subtask goroutine in the Go() method, and blocks and waits through Wait in the Wait() method.

Meanwhile, Group utilizes guarantee that it has and will only retain the first child goroutine error.

The cancel function generated by the Group through the embedded method can promptly propagate the cancel signal of the Context in a timely manner by calling the cancel function when an error occurs in the sub goroutine.

Let’s take a look at another practical example:

package main
import (
	"context"
	"fmt"
	"/x/sync/errgroup"
)
func main() {
	g, ctx := (())
	dataChan := make(chan int, 20)
	// Data production terminal task sub-goroutine	(func() error {
		defer close(dataChan)
		for i := 1; ; i++ {
			if i == 10 {
				return ("data 10 is wrong")
			}
			dataChan &lt;- i
			(("sending %d", i))
		}
	})
	// Data consumer task sub-goroutine	for i := 0; i &lt; 3; i++ {
		(func() error {
			for j := 1; ; j++ {
				select {
				case &lt;-():
					return ()
				case number := &lt;-dataChan:
					(("receiving %d", number))
				}
			}
		})
	}
	// The main task goroutine waits for the pipeline to end the data flow	err := ()
	if err != nil {
		(err)
	}
	("main goroutine done!")
}

Output

sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 2
receiving 1
receiving 3
data 10 is wrong
main goroutine done!

Implement an ErrGroup by yourself:

package main
import (
	"context"
	"errors"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)
const (
	M = 2
	N = 8
)
func main() {
	ctx, cancel := ((), *50)
	defer cancel()
	result := make([]int, N+1)
	errCh := make(chan error, 1)
	var firstSendErr int32
	wg := new()
	done := make(chan struct{}, 1)
	limit := make(chan struct{}, M)
	for i := 1; i &lt;= N; i++ {
		limit &lt;- struct{}{}
		var quit bool
		select {
		// The context has been canceled, no new goroutine is needed		case &lt;-():
			quit = true
		default:
		}
		if quit {
			break
		}
		(1)
		go func(x int) {
			defer func() {
				()
				&lt;-limit
			}()
			if ret, err := doTask(ctx, x); err != nil {
				if atomic.CompareAndSwapInt32(&amp;firstSendErr, 0, 1) {
					errCh &lt;- err
					// cancel other requests					cancel()
				}
			} else {
				result[x] = ret
			}
		}(i)
	}
	go func() {
		()
		close(done)
	}()
	select {
	case err := &lt;-errCh:
		handleErr(err, result[1:])
		&lt;-done
	case &lt;-done:
		if len(errCh) &gt; 0 {
			err := &lt;-errCh
			handleErr(err, result[1:])
			return
		}
		("success handle all task:", result[1:])
	}
}
func handleErr(err error, result []int) {
	("task err occurs: ", err, "result", result)
}
func doTask(ctx , i int) (ret int, err error) {
	("task start", i)
	defer func() {
		("task done", i, "err", err)
	}()
	select {
	// Simulate the task time	case &lt;-( * (i)):
	// The processing task must support the context cancellation, otherwise you will wait until the processing is completed before returning.	case &lt;-():
		("task canceled", i)
		return -1, ()
	}
	// An error occurred in simulation	if i == 6 {
		return -1, ("err test")
	}
	return i, nil
}

Output

task start 2
task start 1
task done 1 err <nil>
task start 3
task done 2 err <nil>
task start 4
task done 3 err <nil>
task start 5
task done 4 err <nil>
task start 6
task done 5 err <nil>
task start 7
task done 6 err err test
task canceled 7
task done 7 err context canceled
task err occurs:  err test result [1 2 3 4 5 0 0 0]

Summarize:

Pay attention to its characteristics when using:

Inherited the functions of WaitGroup

After an error occurs or the wait is completed, the cancel method of the Context object will be called synchronously cancel the signal.

Only the first error will be returned, and the remaining errors will be discarded directly.

context signal propagation: If there is loop logic in the subtask goroutine, you can add logic. At this time, the subtask execution will be ended in advance through the cancel signal of context.

The above is a detailed explanation of the use of ErrGroup in Go language. For more information about Go ErrGroup, please follow my other related articles!