Preface
In daily development, we often encounter such scenarios:
- There are a bunch of tasks to run (such as: sending requests, processing data, crawling, etc.)
- Don't want to run all at once, worrying about blowing up the server or being blocked
- Want to set the concurrency number and speed limit, and also control task retry and failure records
So, can we use Go to implement a "lightweight concurrent task scheduler"? ——The answer is:sure!
Today we will use Go to implement a configurable task scheduler from scratch, supporting:
- Maximum concurrency control (worker pool)
- Rate limit per second
- Simple failure retry mechanism
- Supports result collection and error output
Effect display
You can call our scheduler like this:
scheduler := NewScheduler(5, 10) // Concurrency 5, rate limit 10 times per second for i := 0; i < 100; i++ { task := NewTask(func() error { // Simulate network requests or business logic ("Task processing:", i) (300 * ) return nil }) (task) } () ("All tasks are completed")
Core Component Design
1. Task
We abstract each task into a structure:
type Task struct { fn func() error retry int }
2. Scheduler
Responsible for maintaining task queues, workers, and rate limiters:
type Scheduler struct { tasks chan *Task wg rateLimiter <-chan }
Implement code
The following is the complete implementation (can be copied directly):
type Task struct { fn func() error retry int } func NewTask(fn func() error) *Task { return &Task{fn: fn, retry: 3} } type Scheduler struct { tasks chan *Task wg rateLimiter <-chan } func NewScheduler(concurrency int, ratePerSecond int) *Scheduler { s := &Scheduler{ tasks: make(chan *Task, 100), rateLimiter: ( / (ratePerSecond)), } for i := 0; i < concurrency; i++ { go () } return s } func (s *Scheduler) Submit(task *Task) { (1) <- task } func (s *Scheduler) worker() { for task := range { <- // Speed limit err := () if err != nil && > 0 { ("The task failed, try again...") -- (task) } else if err != nil { ("The mission failed in the end:", err) } () } } func (s *Scheduler) Wait() { () close() }
Practical application scenarios
- Speed-limited crawling of web crawlers
- Send emails/SMS/requests in batches to prevent interface current limit
- Cloud service task scheduling, batch automation operation
- Asynchronous data acquisition and aggregation
Summarize
Go's concurrency model is ideal for handling the needs of "massive tasks + control rate + error retry". The scheduler implemented in this article is very lightweight and suitable for integration into your own system as a basic component.
If you have more needs, for example:
- Add failed callback
- Support timeout control
- Task priority
- Background monitoring dashboard
This is the end of this article about Go implementing a lightweight concurrent task scheduler (supporting speed limit). For more related contents of Go concurrent task scheduler, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!