SoFunction
Updated on 2025-04-22

Go implements a lightweight concurrent task scheduler (supports speed limit)

Preface

In daily development, we often encounter such scenarios:

  • There are a bunch of tasks to run (such as: sending requests, processing data, crawling, etc.)
  • Don't want to run all at once, worrying about blowing up the server or being blocked
  • Want to set the concurrency number and speed limit, and also control task retry and failure records

So, can we use Go to implement a "lightweight concurrent task scheduler"? ——The answer is:sure!

Today we will use Go to implement a configurable task scheduler from scratch, supporting:

  • Maximum concurrency control (worker pool)
  • Rate limit per second
  • Simple failure retry mechanism
  • Supports result collection and error output

Effect display

You can call our scheduler like this:

scheduler := NewScheduler(5, 10) // Concurrency 5, rate limit 10 times per second
for i := 0; i < 100; i++ {
    task := NewTask(func() error {
        // Simulate network requests or business logic        ("Task processing:", i)
        (300 * )
        return nil
    })
    (task)
}

()
("All tasks are completed")

Core Component Design

1. Task

We abstract each task into a structure:

type Task struct {
    fn   func() error
    retry int
}

2. Scheduler

Responsible for maintaining task queues, workers, and rate limiters:

type Scheduler struct {
    tasks       chan *Task
    wg          
    rateLimiter <-chan 
}

Implement code

The following is the complete implementation (can be copied directly):

type Task struct {
    fn    func() error
    retry int
}

func NewTask(fn func() error) *Task {
    return &Task{fn: fn, retry: 3}
}

type Scheduler struct {
    tasks       chan *Task
    wg          
    rateLimiter <-chan 
}

func NewScheduler(concurrency int, ratePerSecond int) *Scheduler {
    s := &Scheduler{
        tasks:       make(chan *Task, 100),
        rateLimiter: ( / (ratePerSecond)),
    }

    for i := 0; i < concurrency; i++ {
        go ()
    }

    return s
}

func (s *Scheduler) Submit(task *Task) {
    (1)
     <- task
}

func (s *Scheduler) worker() {
    for task := range  {
        <- // Speed ​​limit
        err := ()
        if err != nil &&  > 0 {
            ("The task failed, try again...")
            --
            (task)
        } else if err != nil {
            ("The mission failed in the end:", err)
        }

        ()
    }
}

func (s *Scheduler) Wait() {
    ()
    close()
}

Practical application scenarios

  • Speed-limited crawling of web crawlers
  • Send emails/SMS/requests in batches to prevent interface current limit
  • Cloud service task scheduling, batch automation operation
  • Asynchronous data acquisition and aggregation

Summarize

Go's concurrency model is ideal for handling the needs of "massive tasks + control rate + error retry". The scheduler implemented in this article is very lightweight and suitable for integration into your own system as a basic component.

If you have more needs, for example:

  • Add failed callback
  • Support timeout control
  • Task priority
  • Background monitoring dashboard

This is the end of this article about Go implementing a lightweight concurrent task scheduler (supporting speed limit). For more related contents of Go concurrent task scheduler, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!