SoFunction
Updated on 2025-03-04

Writing a concurrent work queue using golang

In fact, golang can use a function to build a concurrent queue. Now write a flexible and controllable queue program.

Define a job first

type Worker struct {
    ID      int
    RepJobs chan int64
    SM      *SM
    quit    chan bool
}

Contains the workid and the id of the task execution. The SM above is just the specific content of the task. This is related to the specific business. Everyone writes their own SM business logic.

Then define the working pool

type workerPool struct {
    workerChan chan *Worker
    workerList []*Worker
}

This defines a slice of a work queue, which can customize the number of work queues, and even add work in the later stage, and also defines a queue-type pipeline.

After the definition is completed, the working pool can be initialized.

func InitWorkerPool() error {
    n := 3
    WorkerPool = &workerPool{
        workerChan: make(chan *Worker, n),
        workerList: make([]*Worker, 0, n),
    }
    for i := 0; i < n; i++ {
        worker := NewWorker(i)
         = append(, worker)
        ()
        ("worker %d started", )
    }
    return nil
}

In this, the number of workers I have written is 3. Of course, this can be read by reading configuration files or passing parameters; in this, the work can be started one by one

(), this is the key

func (w *Worker) Start() {
    go func() {
        for {
             <- w
            select {
            case jobID := <-:
                ("worker: %d, will handle job: %d", , jobID)
                (jobID)
            case q := <-:
                if q {
                    ("worker: %d, will stop.", )
                    return
                }
            }
        }
    }()
}

This is go to start a coroutine, first put yourself in workerChan, then continuously get tasks from the pipeline and execute them, and if the execution is completed, then put yourself back to the queue.

So if you want to have a task to be executed, just put it in this pipeline

func Dispatch() {
    for {
        select {
        case job := <-jobQueue:
            go func(jobID int64) {
                println("Trying to dispatch job: %d", jobID)
                worker := <-
                 <- jobID
            }(job)
        }
    }
}

Take out a worker from the pipeline and put the task id into the worker to execute.

Of course you can stop the worker, or even stop the job

func (w *Worker) Stop() {
    go func() {
         <- true
    }()
}
func (wp *workerPool) StopJobs(jobs []int64) {
    ("Works working on jobs: %v will be stopped", jobs)
    for _, id := range jobs {
        for _, w := range  {
            if  == id {
                ("found a worker whose job ID is %d, will try to stop it", id)
                (id)
            }
        }
    }
}

To add, int64 and string conversion.

string to int

int,err:=(string)

string to int64

int64, err := (string, 10, 64)

int to string

string:=(int)

int64 to string

string:=(int64,10)

The above is personal experience. I hope you can give you a reference and I hope you can support me more. If there are any mistakes or no complete considerations, I would like to give you advice.