Preface
The goroutine is created in a large number of unlimited ways, so I won’t talk about the consequences. I mainly discuss several ways to control goroutine.
Control the number of goroutines
By channel+sync
var ( // channel length poolCount = 5 // Number of multiplexed goroutines goroutineCount = 10 ) func pool() { jobsChan := make(chan int, poolCount) // workers var wg for i := 0; i < goroutineCount; i++ { (1) go func() { defer () for item := range jobsChan { // ... (item) } }() } // senders for i := 0; i < 1000; i++ { jobsChan <- i } // Close the channel. After the upstream goroutine reads the channel content, it will exit through the done of wg close(jobsChan) () }
Start a specified number of goroutines through WaitGroup to listen for notifications of channels. The sender pushes the information to the channel. After the information is processed, close the channel and wait for the goroutine to exit in turn.
Using semaphore
package main import ( "context" "fmt" "sync" "time" "/x/sync/semaphore" ) const ( // The maximum number of goroutines to run simultaneously Limit = 3 // The weight of the semaphore Weight = 1 ) func main() { names := []string{ "noob", "Little Red", "Xiao Ming", "Little Li", "Little Flower", } sem := (Limit) var w for _, name := range names { (1) go func(name string) { ((), Weight) // ... Specific business logic (name, "-It's time to eat") (2 * ) (Weight) () }(name) } () ("ending--------") }
With the help of semaphore in the x package, the number of goroutines can also be restricted.
Thread pool
However, the coroutines in Go are already very lightweight, so the coroutine pool should be analyzed based on specific scenarios.
For small scenarios, use channel+sync, and other complex scenarios, you can consider using third-party coroutine pool libraries.
panjf2000/ants
go-playground/pool
Jeffail/tunny
Design of several open source thread pools
Coroutine Pool Implementation in Fasthttp
The important reason why fastthttp is many times more efficient than net/http is that it uses coroutine pools. Let’s take a look at the design ideas of the boss.
1. Increase the number of goroutines as needed, with a maximum value, and listen to the channel at the same time. The server will put the accepted connection into the channel, so that the listened goroutine can handle consumption.
2. A channel list to be used is maintained locally. When the local channel list cannot get the ch, it will be picked up in it.
3. If the workersCount does not reach the upper limit, a workerFunc listens to workerChan from the generated workerFunc.
4. For the channel list to be used, the workerChan that exceeds the maximum free time will be cleaned up regularly.
See the specific implementation
// WorkerPool incoming connections through a set of workpool services// In FILO (first in and out) order, i.e. the recently stopped staff will pass in for the next job.// // This solution can keep the CPU cache efficient (theoretically)type workerPool struct { // This function is used for server connection // It must leave c unclosed. WorkerFunc ServeHandler // The maximum number of workers MaxWorkersCount int LogAllErrors bool MaxIdleWorkerDuration Logger Logger lock // The number of current workers workersCount int // Worker stop sign mustStop bool // Waiting for use workerChan // May be cleaned ready []*workerChan // Used to identify start and stop stopCh chan struct{} // WorkerChan's cache pool is implemented workerChanPool connState func(, ConnState) } // WorkerChan's structuretype workerChan struct { lastUseTime ch chan }
Start
func (wp *workerPool) Start() { // Determine whether it has already started if != nil { panic("BUG: workerPool already started") } // stopCh stuffs the value = make(chan struct{}) stopCh := = func() interface{} { // If the single-core CPU is blocked, workerChan will be blocked // Otherwise, use non-blocking, the length of workerChan is 1 return &workerChan{ ch: make(chan , workerChanCap), } } go func() { var scratch []*workerChan for { (&scratch) select { // Receive the exit signal and exit case <-stopCh: return default: (()) } } }() } // If the single-core CPU is blocked, workerChan will be blocked// Otherwise, use non-blocking, the length of workerChan is 1var workerChanCap = func() int { // If GOMAXPROCS=1, the length of workerChan is 0, and becomes a blocking channel if (0) == 1 { return 0 } // If GOMAXPROCS>1, use non-blocking workerChan return 1 }()
Clarify the process:
1. First, determine whether stopCh is nil. If it is not nil, it means that it has been started;
2. Initialize = make(chan struct{}), stopCh is a identifier, and you don’t use bool when using struct{}, because the memory footprint of the empty structure variable is 0, while the memory footprint of the bool type is 1, which can maximize the use of the memory space of our server;
3. Set the New function of workerChanPool, and then you can automatically create one when Get is out of the box; if the single-core CPU is blocked, otherwise, use non-blocking, and the length of workerChan is set to 1;
4. Start a goroutine, process the clean operation, and exit when the exit signal is received.
Stop
func (wp *workerPool) Stop() { // At the same start, stop can only be triggered once if == nil { panic("BUG: workerPool wasn't started") } // Close stopCh close() // Set stopCh to nil = nil // Stop all workers waiting to get the connection // Running workers do not need to wait for them to exit, they will be set to true to exit after completing the connection or mustStop () ready := // Loop sets ready's workerChan to nil for i := range ready { ready[i].ch <- nil ready[i] = nil } = ready[:0] // Set mustStop to true = true () }
Clarify the process:
1. It is judged that the stop can only be closed once;
2. Turn off stopCh and set stopCh to nil;
3. Stop all workers waiting to get the connection. Running workers do not need to wait for them to exit. They will be set to true to exit when they complete the connection or mustStop.
clean
func (wp *workerPool) clean(scratch *[]*workerChan) { maxIdleWorkerDuration := () // Clean up the least recently used workers if they have passed maxIdleWorkerDuration time and are not provided criticalTime := ().Add(-maxIdleWorkerDuration) () ready := n := len(ready) // Use the binary search algorithm to find out the worker that can be cleared recently // The last workerChan must be put back to the end of the queue. l, r, mid := 0, n-1, 0 for l <= r { mid = (l + r) / 2 if ([mid].lastUseTime) { l = mid + 1 } else { r = mid - 1 } } i := r if i == -1 { () return } // Clear all the i before ready *scratch = append((*scratch)[:0], ready[:i+1]...) m := copy(ready, ready[i+1:]) for i = m; i < n; i++ { ready[i] = nil } = ready[:m] () // Notify the elimination workers to stop // This notification must be outside, because // If there are many workers, it may block and may take a lot of time // Located on a non-local CPU. tmp := *scratch for i := range tmp { tmp[i].ch <- nil tmp[i] = nil } }
Mainly clean up the workers who have been using the least recently. If they have passed the maxIdleWorkerDuration time, no service is provided.
getCh
Get a workerChan
func (wp *workerPool) getCh() *workerChan { var ch *workerChan createWorker := false () ready := n := len(ready) - 1 // If ready is empty if n < 0 { if < { createWorker = true ++ } } else { // Take one from ready for non-empty ch = ready[n] ready[n] = nil = ready[:n] } () // If you don't get ch if ch == nil { if !createWorker { return nil } // Get a ch from the cache vch := () ch = vch.(*workerChan) go func() { // Specific execution functions (ch) // Put it in the pool (vch) }() } return ch }
Clarify the process:
1. Get an executable workerChan. If the workerCount is empty in ready and the workersCount does not reach the maximum value, increase the number of workersCount and set the current operation createWorker = true;
2. If you are not empty in ready, you can get one directly in ready;
3. If it is not obtained, get one in it and then put it back into the pool;
4. If you get it, start a workerFunc listening workerChan to handle specific business logic.
workerFunc
func (wp *workerPool) workerFunc(ch *workerChan) { var c var err error // Listen to workerChan for c = range { if c == nil { break } // Specific business logic ... c = nil // Release workerChan // The loop will be jumped when mustStop if !(ch) { break } } () -- () } // Put Conn into channelfunc (wp *workerPool) Serve(c ) bool { ch := () if ch == nil { return false } <- c return true } func (wp *workerPool) release(ch *workerChan) bool { // Revise = () () // If you need to stop, return directly if { () return false } // Put ch in ready = append(, ch) () return true }
Clarify the process:
1. WorkerFunc will listen to workerChan, and after using workerChan, return it to ready;
2. Serve will put the connection into the workerChan, so that workerFunc can get the connection request that needs to be processed through workerChan;
3. When the workerChan obtained by workerFunc is nil or set to true, the for loop will jump out.
panjf2000/ants
Let's take a look at the example first
Example 1
package main import ( "fmt" "sync" "sync/atomic" "time" "/panjf2000/ants" ) func demoFunc() { (10 * ) ("Hello World!") } func main() { defer () runTimes := 1000 var wg syncCalculateSum := func() { demoFunc() () } for i := 0; i < runTimes; i++ { (1) _ = (syncCalculateSum) } () ("running goroutines: %d\n", ()) ("finish all tasks.\n") }
Example 2
package main import ( "fmt" "sync" "sync/atomic" "time" "/panjf2000/ants" ) var sum int32 func myFunc(i interface{}) { n := i.(int32) atomic.AddInt32(&sum, n) ("run with %d\n", n) } func main() { var wg runTimes := 1000 // Use the pool with a method, // set 10 to the capacity of goroutine pool and 1 second for expired duration. p, _ := (10, func(i interface{}) { myFunc(i) () }) defer () // Submit tasks one by one. for i := 0; i < runTimes; i++ { (1) _ = (int32(i)) } () ("running goroutines: %d\n", ()) ("finish all tasks, result is %d\n", sum) if sum != 499500 { panic("the final result is wrong!!!") } }
Design ideas
Overall design ideas
Sorting out the ideas:
1. First initialize the size of the cache pool, and then when processing task events, a task allocates a goWorker;
2. There will be the following concentrated situations in the process of getting goWorker;
- There is free goWorker in the local cache, and it is directly retrieved;
- If there is no local cache, go and get a goWorker;
3. If the cache pool is full, the non-blocking mode directly returns nil, and the blocking mode will loop to get it until one is successfully taken out;
4. At the same time, expired goWorker will be cleaned up regularly and wait by waking up its blockage;
5. For the goWorker that has been completed, return it to the pool after the use is completed.
For specific design details, please refer to the author's articleGoroutine Concurrent Scheduling Model In-depth Analysis Hands to Push a High-Performance Goroutine Pool
go-playground/pool
go-playground/pool will start at the beginning
Put a few demos to use first
Per Unit Work
package main import ( "fmt" "time" "/go-playground/pool.v3" ) func main() { p := (10) defer () user := (getUser(13)) other := (getOtherInfo(13)) () if err := (); err != nil { // handle error } // do stuff with user username := ().(string) (username) () if err := (); err != nil { // handle error } // do stuff with other otherInfo := ().(string) (otherInfo) } func getUser(id int) { return func(wu ) (interface{}, error) { // simulate waiting for something, like TCP connection to be established // or connection from pool grabbed ( * 1) if () { // return values not used return nil, nil } // ready for processing... return "Joeybloggs", nil } } func getOtherInfo(id int) { return func(wu ) (interface{}, error) { // simulate waiting for something, like TCP connection to be established // or connection from pool grabbed ( * 1) if () { // return values not used return nil, nil } // ready for processing... return "Other Info", nil } }
Batch Work
package main import ( "fmt" "time" "/go-playground/pool.v3" ) func main() { p := (10) defer () batch := () // for max speed Queue in another goroutine // but it is not required, just can't start reading results // until all items are Queued. go func() { for i := 0; i < 10; i++ { (sendEmail("email content")) } // DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK // if calling Cancel() it calles QueueComplete() internally () }() for email := range () { if err := (); err != nil { // handle error // maybe call () } // use return value (().(bool)) } } func sendEmail(email string) { return func(wu ) (interface{}, error) { // simulate waiting for something, like TCP connection to be established // or connection from pool grabbed ( * 1) if () { // return values not used return nil, nil } // ready for processing... return true, nil // everything ok, send nil, error if not } }
See the implementation
workUnit
workUnit is passed as channel information and is used to pass the task information currently required to be executed to the work.
// WorkUnit contains a single uint of works values type WorkUnit interface { // Block until the current task is completed or cancelled Wait() // The result returned by the execution function Value() interface{} // Error returns the Work Unit's error Error() error // Cancel the current executable task Cancel() // Determine whether the current executable unit has been cancelled IsCancelled() bool } var _ WorkUnit = new(workUnit) // workUnit contains a single unit of works values type workUnit struct { // The result of task execution value interface{} // error message err error // Notify the task to be completed done chan struct{} // Task function to be executed fn WorkFunc // Will the task be cancelled cancelled // Is the task being cancelled cancelling // Is the task being executed writing }
limitedPool
var _ Pool = new(limitedPool) // limitedPool contains all information for a limited pool instance. type limitedPool struct { // Concurrency workers uint // work channel work chan *workUnit // The channel at the end of the notice cancel chan struct{} // Identification of whether to close closed bool // Read and write lock m } // Initialize a poolfunc NewLimited(workers uint) Pool { if workers == 0 { panic("invalid workers '0'") } // The number of works that initialize the pool p := &limitedPool{ workers: workers, } // Initialize the pool operation () return p } func (p *limitedPool) initialize() { // The length of the channel is twice the number of works = make(chan *workUnit, *2) = make(chan struct{}) = false // fire up workers here for i := 0; i < int(); i++ { (, ) } } // Pass the work and cancel the channel to newWorker() to avoid any potential competition situation// Between reading and writingfunc (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) { go func(p *limitedPool) { var wu *workUnit defer func(p *limitedPool) { //Catch the exception, end the work unit of the exception, and start it again as a new task if err := recover(); err != nil { trace := make([]byte, 1<<16) n := (trace, true) s := (errRecovery, err, string(trace[:int((float64(n), float64(7000)))])) iwu := wu = &ErrRecovery{s: s} close() // Restart (, ) } }(p) var value interface{} var err error // Listen to the channel and read the content for { select { // Extract data from channel case wu = <-work: // Prevent channel from reading zero value after being closed if wu == nil { continue } // Both single and batch cancellation support if () == nil { // Execute our business functions value, err = (wu) (struct{}{}) // If WorkFunc cancels this unit of work, you need to check again // Prevent competition conditions if () == nil && () == nil { , = value, err // Execution is completed, close the current channel close() } } // If canceled, exit case <-cancel: return } } }(p) } // Put an executed task to the channel and return the channelfunc (p *limitedPool) Queue(fn WorkFunc) WorkUnit { // Initialize a workUnit type channel w := &workUnit{ done: make(chan struct{}), // Specific execution functions fn: fn, } go func() { () // If the pool is closed, notify the channel to close if { = &ErrPoolClosed{s: errClosed} if () == nil { close() } () return } // Pass the channel to the pool's work <- w () }() return w }
Clarify the process:
1. First initialize the size of the pool;
2. Then start the corresponding number of workers according to the size of the pool, block and wait for the channel to be stuffed into the executable function;
3. Then the executable function will be put into workUnit and then passed to the blocking worker through the channel.
Also, batch execution methods are provided here
batch
// batch contains all information for a batch run of WorkUnits type batch struct { pool Pool m // Slice of WorkUnit units []WorkUnit // The result set, after execution, the workUnit will update its value and error, which can be read from the result set channel. results chan WorkUnit // Notify whether the batch is completed done chan struct{} closed bool wg * } // Initialize Batchfunc newBatch(p Pool) Batch { return &batch{ pool: p, units: make([]WorkUnit, 0, 4), results: make(chan WorkUnit), done: make(chan struct{}), wg: new(), } } // Put WorkFunc into WorkUnit and keep the reference for cancellation and output results.func (b *batch) Queue(fn WorkFunc) { () if { () return } // Return a WorkUnit wu := (fn) // Put it into a WorkUnit slice = append(, wu) // Execution control of goroutine through waitgroup (1) () // Execute tasks go func(b *batch, wu WorkUnit) { () // Write the execution result into results <- wu () }(b, wu) } // QueueComplete lets batches know that there are no more queued units of work// so that the result channels can be closed after all work is completed.// Warning: If this function is not called, the result channel will never be exhausted.// But will always block to get more results.func (b *batch) QueueComplete() { () = true close() () } // Cancel batch taskfunc (b *batch) Cancel() { () () // Cancel units one by one, cancel flashbacks for i := len() - 1; i >= 0; i-- { [i].Cancel() } () } // Output the result set of execution completedfunc (b *batch) Results() <-chan WorkUnit { // Start a coroutine listening notification // waitgroup blocks until all workers complete exit // Finally close the channel go func(b *batch) { <- () // Block until the goroutines in the waitgroup above are executed one by one and exit () () // Close channel close() }(b) return }
Clarify the process:
1. First initialize the size of the Batch;
2. Then Queue puts WorkFuncs into WorkUnit, executes, and writes the results into results, all execution is completed, QueueComplete is called, and a notification of execution is sent;
3. Results will print out all result sets, and listen to all worker executions complete, close the channel, and exit.
Summarize
There are two ways to control the number of goroutines:
- Just use sync+channel for simple scenarios;
- For complex scenarios, goroutine pool can be used.
refer to
【Does Golang need coroutine pools for development? 】/question/302981392
【Come on, control the number of concurrency in Goroutine】/a/1190000017956396
【golang coroutine pool design】/a/1190000018193161
【Co-program pool implementation in fastthttp】/a/1190000009133154
【panjf2000/ants】/panjf2000/ants
【golang coroutine pool design】/a/1190000018193161
This is the end of this article about how to control the number of goroutines in go. For more relevant content on controlling the number of goroutines, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!