1. Channel concurrent control
1.1 Channel slice control Ctrip execution
By creating a slice channel, control the concurrent execution of multiple coroutines, and collect data and error information obtained by Ctrip execution.
type ResultDto struct { Err error Data interface{} } func main() { channel := make([]chan *ResultDto, 10) for i := 0; i < 10; i++ { channel[i] = make(chan *ResultDto) temp := i go Process(temp, channel[i]) } for _, ch := range channel { (<-ch) } } func Process(i int, ch chan *ResultDto) { // Do some work... if i == 1 { ch <- &ResultDto{Err: ("do work err")} } else { ch <- &ResultDto{Data: i} } }
1.2 channel controls the number of concurrency
Control the number of concurrent execution of Ctrip through a channel with buffer. Note that it is necessary to cooperate here.Use it together, otherwise when the execution reaches i to 7 8 9, the main Ctrip will exit before the execution of the child Ctrip has been completed.
func main() { wg := &{} ch := make(chan struct{}, 3) for i := 0; i < 10; i++ { ch <- struct{}{} (1) // Execute Ctrip temp := i go Process(wg, temp, ch) } () } func Process(wg *, i int, ch chan struct{}) { defer func() { <-ch () }() // Do some work... (1 * ) (i) }
2. WaitGroup concurrent control
2.1 WaitGroup controls coroutine parallelism
WaitGroup is a concurrent control technology that is often used in Golang application development.
WaitGroup, which can be understood as Wait-Goroutine-Group, is waiting for a group of goroutines to end. For example, if a certain goroutine needs to wait for several other goroutines to complete, then it can be easily implemented using WaitGroup.
func main() { wg := &{} for i := 0; i < 10; i++ { (1) temp := i go Process(wg, temp) } () } func Process(wg *, i int) { defer func() { () }() // Do some work... (1 * ) (i) }
Simply put, wg maintains a counter in the above program:
- Before starting goroutine, set the counter to the number of goroutines to be started via Add(2).
- After starting goroutine, use the Wait() method to block yourself and wait for the counter to change to 0.
- Each goroutine execution ends and the counter is reduced by 1 via the Done() method.
- After the counter becomes 0, the blocking goroutine is awakened.
2.2 WaitGroup encapsulates general functions
waitGroup controls concurrent execution, limit concurrency limit, collect errors return
func main() { funcList := []ExeFunc{ func(ctx ) error { ("5 Start") (5 * ) ("5 End") return nil }, func(ctx ) error { ("3 Start") (3 * ) ("3 End") return nil }, } err := GoExeAll((), 2, funcList...) if err != nil { (err) } } type ExeFunc func(ctx ) error // GoExeAll executes all concurrently, limit is the concurrency limit, collect all errors and returnfunc GoExeAll(ctx , limit int, fs ...ExeFunc) (errs []error) { wg := &{} ch := make(chan struct{}, limit) errCh := make(chan error, len(fs)) for _, f := range fs { fTmp := f (1) ch <- struct{}{} go func() { defer func() { if panicErr := recover(); panicErr != nil { errCh <- ("execution panic:" + ("%v", panicErr)) } () <-ch }() if err := fTmp(ctx); err != nil { errCh <- err } }() } () close(errCh) close(ch) for chErr := range errCh { errs = append(errs, chErr) } return }
3. Context
Golang context is a commonly used concurrency control technology for Golang application development. The biggest difference between it and WaitGroup is that context has stronger control over derived goroutines, which can control multiple levels of goroutines.
3.1 Context-defined interface
The context actually only defines the interface. All classes that implement the interface can be called a kind of context. Several commonly used contexts are implemented in the official package, which can be used in different scenarios.
type Context interface { Deadline() (deadline , ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} }
Deadline()
This method returns a deadline and identifies whether the bool value of deadline has been set. If deadline is not set, ok == false. At this time, deadline is the value of the initial value.
Done()
This method returns a channel, which needs to be used in the select-case statement, such as "case <-():".
When the context is closed, Done() returns a closed pipeline, and the closed pipeline is still readable. According to this, goroutine can receive a closing request; when the context is not closed, Done() returns nil.
Err()
This method describes the reason why the context is closed. The reason for closing is controlled by context and does not require user settings. For example, if the Deadline context is closed, it may be because of deadline, or it may be closed actively in advance, so the reason for closing will be different:
Value()
There is a context that is not used to control goroutines distributed in a tree, but to pass information between goroutines distributed in a tree
3.2 Context control coroutine ends
func main() { wg := &{} ctx, cancelFunc := (()) for i := 0; i < 10; i++ { (1) temp := i go Process(ctx, wg, temp) } (5 * ) cancelFunc() () } func Process(ctx , wg *, i int) { defer () ch := make(chan error) go DoWork(ctx, ch, i) select { case <-(): ("cancelFunc") return case <-ch: return } } func DoWork(ctx , ch chan error, i int) { defer func() { ch <- nil }() ((i) * ) (i) }
4. ErrorGroup
A third-party library can be used/x/sync/errgroup
Heap multiple assists concurrent execution for control
4.1 errorGroup is executed concurrently, limit is the concurrency limit, timeout timeout
func main() { funcList := []ExeFunc{ func(ctx ) error { ("5 Start") (5 * ) ("5 End") return nil }, func(ctx ) error { ("3 Start") (3 * ) ("3 End") return nil }, } err := GoExe((), 2, 10*, funcList...) if err != nil { (err) } } type ExeFunc func(ctx ) error // GoExe executes concurrently, limit is the concurrency upper limit, any of which reports an error, other interrupts, timeout is 0 and does not timeoutfunc GoExe(ctx , limit int, timeout , fs ...ExeFunc) error { eg, ctx := (ctx) (limit) var timeCh <-chan if timeout > 0 { timeCh = (timeout) } for _, f := range fs { fTmp := f (func() (err error) { ch := make(chan error) defer close(ch) go DoWorkFunc(ctx, ch, fTmp) select { case <-(): return () case <-timeCh: return ("execution timeout") case err = <-ch: return err } }) } if err := (); err != nil { return err } return nil } func DoWorkFunc(ctx , ch chan error, fs ExeFunc) { var err error defer func() { if panicErr := recover(); panicErr != nil { err = ("execution panic:" + ("%v", panicErr)) } ch <- err }() err = fs(ctx) return }
V. General coroutine control tool packaging
import ( "context" "errors" "fmt" "/x/sync/errgroup" "sync" "time" ) // ExeFunc function or method to be executedtype ExeFunc func(ctx ) error // SeqExe executes in sequence, and returns if an error is encounteredfunc SeqExe(ctx , fs ...ExeFunc) error { for _, f := range fs { if err := f(ctx); err != nil { return err } } return nil } // GoExe executes concurrently, limit is the concurrency upper limit, any of which reports an error, other interrupts, timeout is 0 and does not timeoutfunc GoExe(ctx , limit int, timeout , fs ...ExeFunc) error { eg, ctx := (ctx) (limit) var timeCh <-chan if timeout > 0 { timeCh = (timeout) } for _, f := range fs { fTmp := f (func() (err error) { ch := make(chan error) defer close(ch) go DoWorkFunc(ctx, ch, fTmp) select { case <-(): return () case <-timeCh: return ("execution timeout") case err = <-ch: return err } }) } if err := (); err != nil { return err } return nil } func DoWorkFunc(ctx , ch chan error, fs ExeFunc) { var err error defer func() { if panicErr := recover(); panicErr != nil { err = ("execution panic:" + ("%v", panicErr)) } ch <- err }() err = fs(ctx) return } // SeqExeAll executes all in sequence, collects all errors and returnsfunc SeqExeAll(ctx , fs ...ExeFunc) (errs []error) { for _, f := range fs { if err := f(ctx); err != nil { errs = append(errs, err) } } return errs } // GoExeAll executes all concurrently, limit is the concurrency limit, collect all errors and returnfunc GoExeAll(ctx , limit int, fs ...ExeFunc) (errs []error) { wg := &{} ch := make(chan struct{}, limit) errCh := make(chan error, len(fs)) for _, f := range fs { fTmp := f (1) ch <- struct{}{} go func() { defer func() { if panicErr := recover(); panicErr != nil { errCh <- ("execution panic:" + ("%v", panicErr)) } () <-ch }() if err := fTmp(ctx); err != nil { errCh <- err } }() } () close(errCh) close(ch) for chErr := range errCh { errs = append(errs, chErr) } return }
The above is a detailed explanation of the common ways to implement concurrent control in Go. For more information about Go concurrent control, please pay attention to my other related articles!