SoFunction
Updated on 2025-03-04

Golang coroutine pool gopool design and implementation

Goroutine

Goroutine is a lightweight thread provided by Golang, which we usually call "coroutines". Compared with threads, the cost of creating a coroutine is very low. So you will often see thousands of coroutine concurrent scenarios in applications developed by Golang.

Advantages of Goroutine:

  • Goroutines is cheap compared to threads.

Their stack size is only a few kb, the stack can grow and shrink according to the needs of the application, the context switch is also fast, and in the case of threads, the stack size must be specified and fixed.

  • Goroutine is multiplexed to a smaller number of OS threads.

There may be only one thread in a program containing thousands of Goroutines. If any Goroutines in that thread block waiting for user input, create another OS thread and move the remaining Goroutine to the new OS thread. All of this is handled by the runtime, and as developers, it doesn't have to worry about it, which also gives us a clean API to support concurrency.

  • Goroutines uses channel to communicate.

The channel is designed to effectively prevent race conditions when accessing shared memory using Goroutine. A channel can be considered a pipeline for Goroutine to communicate.

Hereinafter, we will refer to Goroutine as "coroutine".

Coroutine Pool

In high concurrency scenarios, we may start a large number of coroutines to handle business logic. Coroutine pooling is a technology that uses pooling technology to multiplex objects, reduce the frequency of memory allocation and coroutine creation overhead, thereby improving coroutine execution efficiency.

I recently took the time to learn about the official open source of BytesgopkgProvided by the librarygopoolThe implementation of coroutine pools feels very high-quality, and the code is very concise and clear, andKitexThe bottom layer is also usedgopoolLet’s manage coroutines, here we sort out the design and implementation.

gopool

Repository:/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to the go keyword.

If you understand the official README, you will findgopoolThe usage is actually very simple, and we used it oftengo func(){...}Replace with(func(){...})Just do it.

at this timegopoolThe default configuration will be used to manage the coroutines you started, and you can also choose to configure the pool size and capacity limit for business scenarios.

old:

go func() {
	// do your job
}()

new:

import (
    "/bytedance/gopkg/util/gopool"
)

(func(){
	/// do your job
})

Core implementation

Let's take a look belowgopoolHow to implement coroutine pool management.

Pool

PoolIt is an interface that defines the capabilities of coroutine pools.

type Pool interface {
	// The name of the pool	Name() string
        
	// Set the capacity of the Goroutine in the pool	SetCap(cap int32)
        
	// Execute the f function	Go(f func())
        
	// With ctx, execute the f function	CtxGo(ctx , f func())
        
	// Set the function called when panic occurs	SetPanicHandler(f func(, interface{}))
}

gopoolProvides the default implementation of this interface (that is, what will be introduced belowpool), this is what we rely on when we call directly.

Such a design patternKitexIt also often appears in the process. All dependencies are designed as interfaces for subsequent expansion. The underlying layer provides a default implementation to expose it, which is also very friendly to the caller.

type pool struct {
	// Pool name	name string

	// The capacity of the pool, that is, the number of goroutines that are maximum concurrently working	cap int32
        
	// Pool configuration	config *Config
        
	// task link list	taskHead  *task
	taskTail  *task
	taskLock  
	taskCount int32

	// Record the number of currently running workers	workerCount int32

	// Called when the worker appears panic	panicHandler func(, interface{})
}

// NewPool creates a new coroutine pool, initializes the name, capacity, and configurationfunc NewPool(name string, cap int32, config *Config) Pool {
	p := &pool{
		name:   name,
		cap:    cap,
		config: config,
	}
	return p
}

CallNewPoolObtainedPoolReturns the form ofpoolStructure.

Task

type task struct {
	ctx 
	f   func()

	next *task
}

taskIt is a linked list structure, which can be understood as a task to be executed, which contains functions that the current node needs to execute.f, and point to the next onetaskpointer.

Comprehensive previous sectionpoolWe can see that a coroutine poolpoolCorresponding to a grouptask

poolTwo pointers to the beginning and end of the linked list are maintained:taskHeadandtaskTail, and the length of the linked listtaskCountand corresponding locktaskLock

Worker

type worker struct {
	pool *pool
}

oneworkerIt is a logical executor, which only corresponds to a coroutine poolpool. When aworkerBeing called, agoroutine, constantly frompoolIn-housetaskThe linked list obtains the task and executes it.

func (w *worker) run() {
	go func() {
		for {
                        // Declare the task to be executed			var t *task
                        
                        // Operate the task list in the pool and add lock			()
			if  != nil {
                                // Get taskHead to be executed				t = 
                                
                                // Update the head and quantity of the linked list				 = 
				atomic.AddInt32(&, -1)
			}
                        // If the taskHead obtained in the previous step is empty, it means that no task needs to be executed. Return after cleaning			if t == nil {
				()
				()
				()
				return
			}
			()
                        
                        // Execute the task, recover the panic and call the configured handler			func() {
				defer func() {
					if r := recover(); r != nil {
						msg := ("GOPOOL: panic in pool: %s: %v: %s", , r, ())
						(, msg)
						if  != nil {
							(, r)
						}
					}
				}()
				()
			}()
			()
		}
	}()
}

Overall

Seeing this, you can actually string together the entire process. Let's take a look at the external interfaceCtxGo(, f func())What exactly did you do?

func Go(f func()) {
	CtxGo((), f)
}

func CtxGo(ctx , f func()) {
	(ctx, f)
}

func (p *pool) CtxGo(ctx , f func()) {

        // Create a task object and assign ctx and the function to be executed	t := ().(*task)
	 = ctx
	 = f
        
        // Insert task into the end of the pool's linked list to update the number of linked lists	()
	if  == nil {
		 = t
		 = t
	} else {
		 = t
		 = t
	}
	()
	atomic.AddInt32(&, 1)
        
        
	// When the following two conditions are met, create a new worker and evoke execution:	// 1. The number of tasks exceeds the configuration limit	// 2. The number of currently running workers is less than the upper limit (or no worker runs)	if (atomic.LoadInt32(&) >=  && () < atomic.LoadInt32(&)) || () == 0 {
        
                // Worker number +1		()
                
                // Create a new worker and assign the current pool value		w := ().(*worker)
		 = p
                
                // Invoke worker execution		()
	}
}

I believe that after reading the code comments, you will understand what happened.

gopoolWill maintain one by itselfdefaultPool, this is a defaultpoolThe structure is initialized when the package is introduced. When we call it directly()When, it is essentially calleddefaultPoolMethod of the same name

func init() {
	defaultPool = NewPool("", 10000, NewConfig())
}

const (
	defaultScalaThreshold = 1
)

// Config is used to config pool.
type Config struct {
	// The threshold for controlling capacity expansion. Once the task to be executed exceeds this value and the number of workers does not reach the upper limit, start a new worker	ScaleThreshold int32
}

// NewConfig creates a default Config.
func NewConfig() *Config {
	c := &Config{
		ScaleThreshold: defaultScalaThreshold,
	}
	return c
}

defaultPoolThe name of, the pool capacity is 10,000, and the lower limit of expansion is 1.

When we callCtxGohour,gopoolThe maintained task list will be updated and determine whether the capacity needs to be expanded.worker

  • If there are already manyworkerStart (the bottom oneworkerCorresponding to onegoroutine), return directly without expansion.
  • If you determine that the capacity needs to be expanded, create a new oneworker, and call()Method starts, eachworkerWill check asynchronouslypoolWhether there are still tasks to be executed in the task list, if there are, execute them.

Positioning of three characters

  • taskIt is a task node to be executed, and it also contains a pointer to the next task and a linked list structure;
  • workeris an executor that actually executes a task, which will start agoroutineExecute the unexecuted coroutine pooltask
  • poolIt is a logical coroutine pool, corresponding to ataskLinked lists, and are responsible for maintenancetaskUpdate status, and create new ones when neededworker

Use for performance optimization

Actually, to this place,gopoolIt is already a coroutine pool library with concise and clear code, but there is obviously room for improvement in performance, sogopoolThe author has applied it several timesTo create pooled objects, reuse woker and task objects.

Here we suggest you look at the source code directly, but it has been covered in the above code.

  • task pooling
var taskPool 

func init() {
	 = newTask
}

func newTask() interface{} {
	return &task{}
}

func (t *task) Recycle() {
	()
	(t)
}
  • Worker Pooling
var workerPool 

func init() {
	 = newWorker
}

func newWorker() interface{} {
	return &worker{}
}

func (w *worker) Recycle() {
	()
	(w)
}

This is the article about the design and implementation of Golang coroutine pool gopool. For more related Golang coroutine pool gopool content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!