Goroutine
Goroutine is a lightweight thread provided by Golang, which we usually call "coroutines". Compared with threads, the cost of creating a coroutine is very low. So you will often see thousands of coroutine concurrent scenarios in applications developed by Golang.
Advantages of Goroutine:
- Goroutines is cheap compared to threads.
Their stack size is only a few kb, the stack can grow and shrink according to the needs of the application, the context switch is also fast, and in the case of threads, the stack size must be specified and fixed.
- Goroutine is multiplexed to a smaller number of OS threads.
There may be only one thread in a program containing thousands of Goroutines. If any Goroutines in that thread block waiting for user input, create another OS thread and move the remaining Goroutine to the new OS thread. All of this is handled by the runtime, and as developers, it doesn't have to worry about it, which also gives us a clean API to support concurrency.
- Goroutines uses channel to communicate.
The channel is designed to effectively prevent race conditions when accessing shared memory using Goroutine. A channel can be considered a pipeline for Goroutine to communicate.
Hereinafter, we will refer to Goroutine as "coroutine".
Coroutine Pool
In high concurrency scenarios, we may start a large number of coroutines to handle business logic. Coroutine pooling is a technology that uses pooling technology to multiplex objects, reduce the frequency of memory allocation and coroutine creation overhead, thereby improving coroutine execution efficiency.
I recently took the time to learn about the official open source of BytesgopkgProvided by the librarygopool
The implementation of coroutine pools feels very high-quality, and the code is very concise and clear, andKitex
The bottom layer is also usedgopool
Let’s manage coroutines, here we sort out the design and implementation.
gopool
Repository:/bytedance/gopkg/tree/develop/util/gopool
gopool
is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to thego
keyword.
If you understand the official README, you will findgopool
The usage is actually very simple, and we used it oftengo func(){...}
Replace with(func(){...})
Just do it.
at this timegopool
The default configuration will be used to manage the coroutines you started, and you can also choose to configure the pool size and capacity limit for business scenarios.
old:
go func() { // do your job }()
new:
import ( "/bytedance/gopkg/util/gopool" ) (func(){ /// do your job })
Core implementation
Let's take a look belowgopool
How to implement coroutine pool management.
Pool
Pool
It is an interface that defines the capabilities of coroutine pools.
type Pool interface { // The name of the pool Name() string // Set the capacity of the Goroutine in the pool SetCap(cap int32) // Execute the f function Go(f func()) // With ctx, execute the f function CtxGo(ctx , f func()) // Set the function called when panic occurs SetPanicHandler(f func(, interface{})) }
gopool
Provides the default implementation of this interface (that is, what will be introduced belowpool
), this is what we rely on when we call directly.
Such a design patternKitex
It also often appears in the process. All dependencies are designed as interfaces for subsequent expansion. The underlying layer provides a default implementation to expose it, which is also very friendly to the caller.
type pool struct { // Pool name name string // The capacity of the pool, that is, the number of goroutines that are maximum concurrently working cap int32 // Pool configuration config *Config // task link list taskHead *task taskTail *task taskLock taskCount int32 // Record the number of currently running workers workerCount int32 // Called when the worker appears panic panicHandler func(, interface{}) } // NewPool creates a new coroutine pool, initializes the name, capacity, and configurationfunc NewPool(name string, cap int32, config *Config) Pool { p := &pool{ name: name, cap: cap, config: config, } return p }
CallNewPool
ObtainedPool
Returns the form ofpool
Structure.
Task
type task struct { ctx f func() next *task }
task
It is a linked list structure, which can be understood as a task to be executed, which contains functions that the current node needs to execute.f
, and point to the next onetask
pointer.
Comprehensive previous sectionpool
We can see that a coroutine poolpool
Corresponding to a grouptask
。
pool
Two pointers to the beginning and end of the linked list are maintained:taskHead
andtaskTail
, and the length of the linked listtaskCount
and corresponding locktaskLock
。
Worker
type worker struct { pool *pool }
oneworker
It is a logical executor, which only corresponds to a coroutine poolpool
. When aworker
Being called, agoroutine
, constantly frompool
In-housetask
The linked list obtains the task and executes it.
func (w *worker) run() { go func() { for { // Declare the task to be executed var t *task // Operate the task list in the pool and add lock () if != nil { // Get taskHead to be executed t = // Update the head and quantity of the linked list = atomic.AddInt32(&, -1) } // If the taskHead obtained in the previous step is empty, it means that no task needs to be executed. Return after cleaning if t == nil { () () () return } () // Execute the task, recover the panic and call the configured handler func() { defer func() { if r := recover(); r != nil { msg := ("GOPOOL: panic in pool: %s: %v: %s", , r, ()) (, msg) if != nil { (, r) } } }() () }() () } }() }
Overall
Seeing this, you can actually string together the entire process. Let's take a look at the external interfaceCtxGo(, f func())
What exactly did you do?
func Go(f func()) { CtxGo((), f) } func CtxGo(ctx , f func()) { (ctx, f) } func (p *pool) CtxGo(ctx , f func()) { // Create a task object and assign ctx and the function to be executed t := ().(*task) = ctx = f // Insert task into the end of the pool's linked list to update the number of linked lists () if == nil { = t = t } else { = t = t } () atomic.AddInt32(&, 1) // When the following two conditions are met, create a new worker and evoke execution: // 1. The number of tasks exceeds the configuration limit // 2. The number of currently running workers is less than the upper limit (or no worker runs) if (atomic.LoadInt32(&) >= && () < atomic.LoadInt32(&)) || () == 0 { // Worker number +1 () // Create a new worker and assign the current pool value w := ().(*worker) = p // Invoke worker execution () } }
I believe that after reading the code comments, you will understand what happened.
gopool
Will maintain one by itselfdefaultPool
, this is a defaultpool
The structure is initialized when the package is introduced. When we call it directly()
When, it is essentially calleddefaultPool
Method of the same name
func init() { defaultPool = NewPool("", 10000, NewConfig()) } const ( defaultScalaThreshold = 1 ) // Config is used to config pool. type Config struct { // The threshold for controlling capacity expansion. Once the task to be executed exceeds this value and the number of workers does not reach the upper limit, start a new worker ScaleThreshold int32 } // NewConfig creates a default Config. func NewConfig() *Config { c := &Config{ ScaleThreshold: defaultScalaThreshold, } return c }
defaultPool
The name of, the pool capacity is 10,000, and the lower limit of expansion is 1.
When we callCtxGo
hour,gopool
The maintained task list will be updated and determine whether the capacity needs to be expanded.worker
:
- If there are already many
worker
Start (the bottom oneworker
Corresponding to onegoroutine
), return directly without expansion. - If you determine that the capacity needs to be expanded, create a new one
worker
, and call()
Method starts, eachworker
Will check asynchronouslypool
Whether there are still tasks to be executed in the task list, if there are, execute them.
Positioning of three characters
-
task
It is a task node to be executed, and it also contains a pointer to the next task and a linked list structure; -
worker
is an executor that actually executes a task, which will start agoroutine
Execute the unexecuted coroutine pooltask
; -
pool
It is a logical coroutine pool, corresponding to atask
Linked lists, and are responsible for maintenancetask
Update status, and create new ones when neededworker
。
Use for performance optimization
Actually, to this place,gopool
It is already a coroutine pool library with concise and clear code, but there is obviously room for improvement in performance, sogopool
The author has applied it several timesTo create pooled objects, reuse woker and task objects.
Here we suggest you look at the source code directly, but it has been covered in the above code.
- task pooling
var taskPool func init() { = newTask } func newTask() interface{} { return &task{} } func (t *task) Recycle() { () (t) }
- Worker Pooling
var workerPool func init() { = newWorker } func newWorker() interface{} { return &worker{} } func (w *worker) Recycle() { () (w) }
This is the article about the design and implementation of Golang coroutine pool gopool. For more related Golang coroutine pool gopool content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!