When using coroutine concurrency to handle certain tasks, the number of concurrency often cannot be infinitely increased due to the limitations of various factors, such as network requests, database queries, etc.
From the perspective of operational efficiency, as long as possible concurrency is as high as possible under the premise that related services can be loaded (limiting the maximum concurrency number).
In Go, some methods can be used to control the number of concurrency in a coroutine to prevent excessive concurrency from causing resource exhaustion or performance degradation.
1. Use semaphore
You can use the channel in Go language to implement simple semaphores and limit the number of concurrency
package main import ( "fmt" "sync" ) func worker(id int, sem chan struct{}) { sem <- struct{}{} // Takes up a semaphore defer func() { <-sem // The method is running and releases the semaphore }() // Perform work tasks ("Worker %d: Working...\n", id) } func main() { concurrency := 3 sem := make(chan struct{}, concurrency) var wg for i := 0; i < 10; i++ { (1) go func(id int) { defer () worker(id, sem) }(i) } () close(sem) }
sem
It is a buffered channel. By controlling the number of elements in the channel, a simple semaphore mechanism is implemented.
2. Use coroutine pools
A fixed number of coroutine pools can be created to distribute tasks to these coroutines for execution.
package main import ( "fmt" "sync" ) func worker(id int, jobs <-chan int, results chan<- int) { //jobs wait for the main coroutine to put data into the jobs for j := range jobs { ("Coecess Pool %d: The coroutine pool is working %d\n", id, j) results <- j } } func main() { const numJobs = 5 //The amount of work to be done by coroutine const numWorkers = 3 //Number of coroutine pools jobs := make(chan int, numJobs) results := make(chan int, numJobs) var wg // Start coroutine pool for i := 1; i <= numWorkers; i++ { (1) go func(id int) { defer () worker(id, jobs, results) }(i) } // Submit task for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // Wait for all work to be completed go func() { () close(results) }() // Processing results for result := range results { ("Result:", result) } }
jobs
Channels are used to store tasks,results
The channel is used to store processing results. By creating a fixed number of work coroutines, the number of concurrents can be effectively controlled.
3. Use other packages
Go 1.16 introduced/x/sync/semaphore
package, it provides a more flexible semaphore implementation.
Case 1
Restrict concurrent requests to external APIs
Suppose we have an external API that has restrictions on concurrent requests, and we want to make sure that this limit is not exceeded. We can useTo control concurrent access to the API
package main import ( "context" "fmt" "/x/sync/semaphore" "sync" "time" ) func main() { /* 1. When the concurrency amount is constant, the request task can be processed faster by changing the number of concurrent requests allowed (under the premise that the CPU is sufficient) 2. sem := (n), parameter n is the weight 3. The more weights a coroutine needs to obtain, the slower it will run (for example, the total weights n=5, a coroutine allocates 2, which is different from a coroutine allocating one). 4. The semaphore does not have enough available weights to occur when all assigned weight units have been occupied, that is, the current weight count of the semaphore reaches its total capacity. In this case, any call that attempts to get more weights through the Acquire method will not be completed immediately, causing the caller (usually goroutine) to block until some units of weight are released by other callers. */ /* 1. Tasks with higher weights have higher priority when competing for resources, making it easier to obtain execution opportunities. 2. If the current resources are sufficient to meet the needs of high-weight tasks, these tasks will be executed immediately; if the resources are insufficient, queue up in order of weight 3. Once the task starts to be executed, the speed of completion mainly depends on the task's own logical complexity, the required resources, and the current load of the system, and has nothing to do with the weight of the task in the semaphore. 3. High-weight tasks do not interrupt low-weight tasks that are already being executed, but wait for these tasks to release their resources themselves. Once the resource is released, high-weight tasks in the waiting queue will be awakened first. 4. The Acquire method will check whether the available resources of the current semaphore meet the requested weight. If it is satisfied, the resource count of the semaphore is immediately reduced and returns, allowing the task to continue execution. If not satisfied, the task will block and wait until enough resources are freed */ // Record the start time startTime := () // Assume that the maximum concurrent request allowed by the external API is 5 (the total capacity of the semaphore is 5 weight units) const ( maxConcurrentRequests = 5 ) sem := (maxConcurrentRequests) var wg // Simulate 10 concurrent requests to external APIs for i := 0; i < 10; i++ { (1) go func(requestId int) { defer () // Suppose we want to get the weight of 2 units if err := ((), 2); err != nil { ("Request %d cannot get semaphore: %v\n", requestId, err) return } defer (2) // Release the semaphore after the request is completed // Simulate request processing to the API ("Request %d start...\n", requestId) (2 * ) // Simulate network delay ("Request %d complete.\n", requestId) }(i) } () // Record the end time endTime := () // Calculate and print the total time ("Total program time: %v\n", (startTime)) }
The situation where the semaphore does not have sufficient available weights occurs when all assigned weight units have been occupied, that is, the current weight count of the semaphore reaches its total capacity. In this case, any attempt to passAcquire
Calls that get more weights will not be completed immediately, causing the caller (usually goroutine) to block until other callers release some units of weight.
Here are some specific situations where the semaphore does not have sufficient weight available:
Small initialization capacity of semaphore: If the total capacity of the semaphore is set to be smaller and the number of concurrent requests is larger, then insufficient weight will soon occur.
Long-term weight occupies: If some goroutines occupy weight units for a long time without releasing it, this will cause other goroutines to fail to get weights, even if these goroutines are only a few.
Uneven weight allocation: In some cases, there may be some goroutines that occupy disproportionate units of weight, causing other goroutines to fail to obtain sufficient weight.
Weight release is not timely: If goroutines exit early due to errors or exceptions and do not release the weights they occupy correctly, then these units of weight will not be recycled into the semaphore.
High frequency requests: There are a large number of goroutine request weights in a short period of time, and even if the weights they request are not large, they may accumulate more than the total capacity of the semaphore.
Semaphore weights are not properly managed: If the semaphore weight management logic is flawed, such as releasing too much weight incorrectly, or releasing weights at the wrong point in time, it may also lead to insufficient available weights.
To avoid insufficient available weights for the semaphore, the following measures can be taken:
- Set the semaphore capacity reasonably: Set the total capacity of the semaphore reasonably according to resource limitations and concurrency requirements.
- Release weights in a timely manner: Ensure that the weight is released in time after the goroutine is completed.
-
Use timeout:exist
Acquire
Use timeouts in the call to avoid waiting for weights indefinitely. - Monitoring and logging: Monitor the use of semaphores and record key information to discover and solve problems in a timely manner.
- Weight allocation strategy: Design a reasonable weight allocation strategy to ensure fair and efficient distribution of weights.
Through these measures, the use of semaphores can be better managed and concurrency problems caused by insufficient weight can be avoided.
Case 2
Suppose there is an online video platform that needs to handle video transcoding tasks of different resolutions. Since high-definition video transcoding consumes more computing resources than standard-definition video, the platform hopes to design a system that can prioritize processing more standard-definition video transcoding requests, while not completely blocking the transcoding of high-definition videos to maintain overall service quality and effective utilization of resources.
package main import ( "fmt" "/x/net/context" "/x/sync/semaphore" "runtime" "sync" "time" ) // VideoTranscodeJob Video Transcoding Tasktype VideoTranscodeJob struct { resolution string weight int64 } func main() { cpuCount := () ("Current CPUs %v\n", cpuCount) /* 1. Tasks with higher weights have higher priority when competing for resources, making it easier to obtain execution opportunities. 2. If the current resources are sufficient to meet the needs of high-weight tasks, these tasks will be executed immediately; if the resources are insufficient, queue up in order of weight 3. Once the task starts to be executed, the speed of completion mainly depends on the task's own logical complexity, the required resources, and the current load of the system, and has nothing to do with the weight of the task in the semaphore. 3. High-weight tasks do not interrupt low-weight tasks that are already being executed, but wait for these tasks to release their resources themselves. Once the resource is released, high-weight tasks in the waiting queue will be awakened first. 4. The Acquire method will check whether the available resources of the current semaphore meet the requested weight. If it is satisfied, the resource count of the semaphore is immediately reduced and returns, allowing the task to continue execution. If not satisfied, the task will block and wait until enough resources are freed */ // Initialize two semaphores, one for standard definition and one for high definition, assuming a total of 8 CPU cores are available normalSem := (6) // Standard Definition tasks, allocate 6 units of weights because they consume less resources highDefSem := (2) // HD tasks, allocate 2 units of weights, because they consume more resources var wg //Suppose there are 20 videos that need to be transcoded videoJobs := []VideoTranscodeJob{ {"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2}, {"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2}, } for _, job := range videoJobs { (1) go func(job VideoTranscodeJob) { defer () var sem * switch { case "SD": sem = normalSem //The allocation weight is large, currently 6, the task has an advantage in obtaining execution opportunities, but it does not directly mean that the execution speed is fast case "HD": sem = highDefSem default: panic("Invalid resolution") } if err := ((), ); err != nil { ("The video named %s cannot obtain semaphore: %v\n", , err) return } defer () //Release the signal quantity corresponding to the weight // Simulated transcoding task execution ("Transcoding %s video (Weight: %d)...\n", , ) //Use the weight value of VideoTranscodeJob to simulate the length of the transcoding time. The HD usage time is set to 2 more than SD 1, so the time will naturally be longer and the running time will be longer. ((*100) * ) // Time required to simulate video transcoding at different resolutions ("----------------------%s video transcoding is completed...\n", ) }(job) } () }
Standard Definition (SD) and HD (HD), respectively assigned different weights (1 and 2). By creating two semaphores with different weights, we can control the number of simultaneous executions of different types of tasks, giving priority to the rapid processing of standard definition videos, and also ensuring that high definition videos can be transcoded without affecting system stability. This demonstrates how weighted concurrency control can help optimize task scheduling and execution efficiency with limited resources.
Note: The weight unit number assigned to the coroutine cannot be greater than the unit number of parameter n in the corresponding context (n)
Case Three
package main import ( "context" "fmt" "sync" "time" "/x/sync/semaphore" ) type weightedTask struct { id int weight int64 } func main() { const ( maxTotalWeight = 20 // Maximum total weight ) sem := (maxTotalWeight) var wg tasksCh := make(chan weightedTask, 10) // Send a task for i := 1; i <= 10; i++ { tasksCh <- weightedTask{id: i, weight: int64(i)} // Assume that the task ID is its weight } close(tasksCh) // Start the task processor for task := range tasksCh { (1) go func(task weightedTask) { defer () if err := ((), int64()); err != nil { ("Task %d cannot obtain semaphore: %v\n", , err) return } defer (int64()) //release // Simulate task execution ("Task %d (Weight: %d) Running...\n", , ) ((*100) * ) // In the example, the effect of simply using time to simulate the weight ("Task %d completed.\n", ) }(task) } () }
Summarize
Which method to choose depends on the specific application scenario and requirements. Using semaphores is a simple and flexible approach, while coroutine pools are more suitable for situations where batch processing tasks are required./x/sync/semaphore
The package provides a more flexible semaphore implementation outside the standard library.
This is the article about the concurrent number of Go control coroutines (goroutines). For more information about the concurrent number of Go control coroutines, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!