Preface
When using coroutine concurrency to handle certain tasks, the number of concurrency often cannot be infinitely increased due to the limitations of various factors, such as network requests, database queries, etc. From the perspective of operational efficiency, as long as possible concurrency is as high as possible under the premise that related services can be loaded (limiting the maximum concurrency number). This article explores solutions and implementations on this problem. There are two ideas in total: one is to use buffered channels to implement them, and the other is to use locks to implement them.
1. Use buffered channels to limit the number of concurrency
1.1 Plan details
The code is as follows, the logic is very simple.
package golimit type GoLimit struct { ch chan int } func NewGoLimit(max int) *GoLimit { return &GoLimit{ch: make(chan int, max)} func (g *GoLimit) Add() { <- 1 func (g *GoLimit) Done() { <-
Create a buffered channel according to the maximum allowed concurrency. Before creating a coroutine, call Add() to write a data into the channel. The coroutine completion is to call the Done() method to read a data. If you cannot write data into the channel, it means that the channel is full, that is, the current coroutine concurrency is the maximum allowed number. The Add() method will be blocked, and a new coroutine cannot be created. Until a coroutine is completed, the Done() method is called to read a data into the channel.
The following is an example of usage
package main import ( "golimit" "log" "time" ) func main() { ("Start the test...") g := (2) //max_num (maximum allowed concurrency) is set to 2 for i := 0; i < 10; i++ { //Try to add a coroutine. If the maximum concurrency number has been reached, it will block () go func(g *, i int) { defer () //A concurrent coroutine has been completed ( * 2) (i, "done") }(g, i) } ("End of loop") ( * 3)//Waiting for execution to complete ("Test end") }
1.2 Evaluation Summary
Advantages: The implementation logic of this solution is simple and clear, easy to understand and maintain. If the needs can be met, this solution is the first choice in general scenarios.
Hidden worries: I use the size of the channel's buffer to represent the maximum concurrency number. If the number of concurrency is allowed, such as tens of thousands or even larger, I am not sure whether the performance of the channel and the memory load will be problematic. If any friend knows, please let me know.
Insufficient: It is difficult to adjust the maximum concurrent number during operation. In some scenarios, there is such a need, such as the B service that service A depends on has expanded or reduced capacity, but the A service cannot be stopped, and the maximum concurrency number of requested service interfaces needs to be adjusted. 2. Use locks to implement the number of concurrency limits for coroutines 2.1 Solution details
Also, start the code first (Note: I have opened source of this code on github/zh-five/golimit)
// Coroutine concurrency limit librarypackage golimit import ( "sync" ) type GoLimit struct { max uint //The maximum number of concurrency count uint //There are currently concurrent numbers isAddLock bool //Is it locked to increase zeroChan chan interface{} //Broadcast at 0 addLock //(Increase the number of concurrency) lock dataLock //(Modify data) lock} func NewGoLimit(max uint) *GoLimit { return &GoLimit{max: max, count: 0, isAddLock: false, zeroChan: nil} } //Concurrent count is added 1. If count >=max_num, it blocks until count <max_numfunc (g *GoLimit) Add() { () () += 1 if < { //Unlock if it does not exceed concurrency, it can continue to increase in the future () } else { //The maximum concurrency number has been reached, and it is not unlocked and marked. Unlocked after the number is reduced. = true } () } //Concurrent count is reduced by 1//If counting <max_num, the original blocking Add() can be quickly unblockedfunc (g *GoLimit) Done() { () -= 1 //Unlock if == true && < { = false () } //0 Broadcast if == 0 && != nil { close() = nil } () } //Update the maximum concurrency count to, if it is increased, the original blocking Add() can be quickly unblocked.func (g *GoLimit) SetMax(n uint) { () = n //Unlock if == true && < { = false () } // Add lock if == false && >= { = true () } () } //If the current concurrency count is 0, it will return quickly; otherwise, it will block and wait until the concurrency count is 0func (g *GoLimit) WaitZero() { () //No need to wait if == 0 { () return } //No broadcast channel, create a if == nil { = make(chan interface{}) } //Unlock after copying the channel to avoid reading data from nil c := () <-c } //Get concurrent countfunc (g *GoLimit) Count() uint { return } //Get the maximum concurrency countfunc (g *GoLimit) Max() uint { return }
A total of two locks were used. One is a data lock (dataLock), which is used to lock data to ensure the security of data modification. Lock unlocking is performed before and after modifying data; the other is to add a coroutine lock (addLock). When adding coroutine, you must first add lock. After the lock is successful, the concurrent number is modified. If the concurrent number is less than the maximum concurrent number, it will be unlocked. Otherwise, it will not be unlocked, which will cause the subsequent increase of the coroutine lock operation blocking, thereby limiting the concurrent number of coroutines. Examples of use are as follows:
package main import ( "/zh-five/golimit" "log" "time" ) func main() { ("Start the test...") g := (2) //max_num (maximum allowed concurrency) is set to 2 for i := 0; i < 10; i++ { //Concurrent count is added 1. If count >=max_num, it blocks until count <max_num () //The maximum concurrent data can be modified at any time during operation //(3) go func(g *, i int) { defer () //Concurrent count is reduced by 1 ( * 2) (i, "done") }(g, i) } ("End of loop") () //Block until all concurrency is completed ("Test end") }
In addition to adding the SetMax() method to modify the maximum concurrent number in the solution 2 GoLimit. For fun and lazy, a WaitZero() method has been added (in fact, external use can also quickly implement this function) to block and wait for all concurrent coroutines to complete. It can be used in the following scenario: There is a large number of URLs that require concurrent collection of data with restrictions. The main program only needs to simply call the WaitZero() method to block and other acquisition coroutines.
2.2 Evaluation summary
- Advantages: From the implementation logic, it can be determined that performance and consumption do not increase linearly with the increase of maximum concurrent number. There are also many scalable imaginations.
- Disadvantages: The implementation logic is relatively complex
other
Actually, I really want to compare and test the performance of the two solutions, especially when the maximum concurrency is relatively large. But I have never found a good test method. If any friend has a method or idea, please feel free to communicate.
This is the article about the detailed solution of the Go language restricting the concurrency number of coroutines. For more related content on the go language restricting the concurrency number of coroutines, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!