Today we will introduce oneGo official library xExtended concurrency primitives providedsemaphore
, translated as "semaphore". Because it controls collaboration between multiple goroutines just like a signal.
Concept explanation
I will first briefly introduce the concept of semaphores to supplement knowledge for unfamiliar readers.
An example in life: Suppose a restaurant has 10 dining tables in total, and each customer takes up 1 dining table, then a total of 10 people can be dining at the same time, and more than 10 people need to wait in line; if 1 customer has finished dining, the first customer who is waiting in line can come to eat.
If you add a semaphore, you can understand it like this:
- 10 dining tables are limited resources.That is, the initial value of the semaphore is 10。
- When a customer enters the restaurant, if the dining table is available, the customer can be assigned 1 dining table.The value of the semaphore is reduced by 1。
- If there is no empty dining table, customers need to wait in line until there is an empty dining table (When the semaphore value is 0, new customers must wait).
- After a customer has finished dining, the dining table is empty.The value of the semaphore is added to 1, new customers (customers waiting in line, or new customers when no one is in line) can be assigned a dining table.
This is the application scenario of semaphore, which helps manage the number of tables and ensures that the restaurant does not receive more than the number of customers it can accommodate at any time.
If placed in a Go program, we can use semaphores to implement the function of the task pool and control a limited number of goroutines to execute tasks concurrently. Let's put this in the future, let's take a look at it firstsemaphore
How is the source code implemented?
Source code interpretation
We cansemaphore
The exported method that is defined and implemented in the documentation:
//x/[email protected]/semaphore#pkg-index
type Weighted func NewWeighted(n int64) *Weighted func (s *Weighted) Acquire(ctx , n int64) error func (s *Weighted) Release(n int64) func (s *Weighted) TryAcquire(n int64) bool
is a structure representing a semaphore object.
NewWeighted
is its constructor, used to instantiate a containsn
The semaphore object of a resource, that is, the initial value of the semaphore isn
。
It implements 3 methods, namely:
-
Acquire
: Used to requestn
Resources, that is, the value of the semaphore is reducedn
operate. If resources are insufficient, block and wait until there are enough resources, orctx
Canceled. -
Release
: Used to releasen
resources, that is, add the value of the semaphoren
operate. -
TryAcquire
: Used to requestn
Resources, that is, the value of the semaphore is reducedn
operate. andAcquire
The difference is,TryAcquire
Will not block and wait, return successfullytrue
, failed to returnfalse
。
Now, can youDoes the object and its implementation correspond to the previous example of dining in restaurants?
Next, let's take a lookWeighted
How is the structure defined:
/golang/sync/blob/v0.9.0/semaphore/
// Weighted semaphore structuretype Weighted struct { size int64 // Total number of resources cur int64 // The number of resources currently used mu // Mutex locks ensure concurrent security of operations on other attributes waiters // Waiter queue, use list to implement}
Weighted
The structure contains 4 fields:
-
size
It is the total number of resources. In the example of a restaurant there are 10 dining tables. -
cur
Records the number of resources currently used. In the example of a restaurant, there are the number of tables that have been occupied by customers.size - cur
It is the number of remaining resources, that is, the number of empty dining tables. -
mu
It is a mutex lock, used to ensure that operations on other attributes are concurrent and safe. -
waiters
It is a waiting queue, recording all queued waiting users. In the restaurant example, when 10 dining tables are all filled, new customers are going to enter the waiting queue.
ConstructorNewWeighted
Implementation is as follows:
// NewWeighted Construct a semaphore objectfunc NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w }
also,semaphore
The package also defines a structure for the waiterwaiter
:
// Waiter structuretype waiter struct { n int64 // Number of requested resources ready chan<- struct{} // Closed when a resource is obtained, used to wake up the current waiter}
waiter
The structure contains 2 fields:
-
n
Records the number of resources requested by the current waiter. -
ready
It's onechannel
Type, when the resource satisfies,ready
Will beclose
If you drop, the waiter will be awakened.
You will see its usefulness later.
Let's look back and take a lookWeighted
The first method of structureAcquire
Implementation:
// Acquire requests n resources// If the resources are insufficient, block and wait until there are enough resources, or ctx is cancelled// Return nil successfully, return () after failure and does not change the number of resourcesfunc (s *Weighted) Acquire(ctx , n int64) error { done := () () // Locking to ensure concurrency security // If ctx has been cancelled before allocating resources, it will be returned directly () select { case <-done: () return () default: } // If the number of resources is sufficient and there are no other waiters, the requested resource is successful, add cur to n, and return if >= n && () == 0 { += n () return nil } // If the requested number of resources is greater than the total number of resources and cannot be satisfied, blocking waits for ctx to cancel and returns () if n > { // Don't make other Acquire calls block on one that's doomed to fail. () <-done return () } // If there are insufficient resources or other waiting users, continue to execute // Join the waiting queue ready := make(chan struct{}) // Create a channel as a property to record it into the waiter object waiter for subsequent notification of its wakeup w := waiter{n: n, ready: ready} // Construct a waiter object elem := (w) // Append waiter to waiter queue () // Use select to implement blocking and waiting select { case <-done: // Check whether ctx is cancelled () select { case <-ready: // Check whether the current waiter is awakened // Enter here, it means that the waiter is awakened after ctx is cancelled. -= n // Then just treat it as the waiter has not been awakened and return the requested resource count () // Notify the waiting queue and check whether the next waiter resource in the queue meets the default: // Remove the current waiter from the waiter queue isFront := () == elem // Is the current waiter the first waiter (elem) // Remove from the queue // If the current waiter is the first waiter in the queue and there are still resources left if isFront && > { () // Notify the waiting queue and check whether the next waiter resource in the queue meets the } } () return () case <-ready: // Wake up select { case <-done: // Check again whether ctx is cancelled // Enter here, it means that after the waiter is awakened, ctx is cancelled, and it is handled as if it has not been awakened (n) // Free up resources return () default: } return nil // Return to nil successfully } }
This isWeighted
The most complex method of implementing structures, but I wrote very detailed comments in the code to help you understand.
Acquire
The main logic is:
- Check first
ctx
Whether it is cancelled, if before allocation of resourcesctx
Cancel, return directly()
。 - Check whether the current remaining resources meet the requested resource number. If the resource number is sufficient and no other waiter exists, the requested resource will be successful and
cur
Plusn
, and return. - Verify whether the requested resource number is legal. If the requested resource number is greater than the total resource number, it cannot be satisfied, and it will block and wait.
ctx
Cancel and return()
。 - If the resources are not sufficient at present, or there are other waiters, the code will continue to execute and enter the blocking waiting logic:
- First construct a waiter object
waiter
and add it to the waiter queue. - Next, use
select
Implement blocking and waiting. There are two situations at this time.ctx
Cancel, orwaiter
Wake up.- in the case of
ctx
Cancelled, I'll check it againwaiter
Wake up, if awakened,Then stillctx
Cancelled shall prevail, will be treated aswaiter
Not awakened, return the requested resource count and call()
Notify the waiter queue and check the next one in the queuewaiter
Whether the resource number is satisfied; if it is not awakened, the current one will bewaiter
Remove from the waiter queue if the currentwaiter
It is the first waiter in the queue and there are still resources left, and it will also be called()
Notify the waiter queue and check the next one in the queuewaiter
Whether the number of resources is satisfied. - in the case of
waiter
Wake up, will check againctx
Whether it is cancelled, if it is cancelled,Thenctx
Cancelled shall prevail, will call(n)
Release the currentwaiter
Resource requested; if not cancelled, returnnil
Indicates that the requested resource is successful.
- in the case of
- First construct a waiter object
Then, let's take a look at the release of resourcesRelease
Method logic:
// Release releases n resourcesfunc (s *Weighted) Release(n int64) { () // Locking to ensure concurrency security -= n // Free up resources if < 0 { () panic("semaphore: released more than held") } () // Notify the waiting queue and check whether the next waiter resource in the queue meets the () }
The logic here is much simpler, first execute -= n
Reduce the number of resources currently used, that is, this step is to release the resource operation. Note that this is correct
It is judged whether it is less than 0, so when using it, you must release as much resources as you apply for, and don't use it wrong. Then call the same()
Notify the waiter queue and check the next one in the queuewaiter
Whether the number of resources is satisfied.
Now, it's time to seenotifyWaiters
How is the method implemented:
// Check whether the next waiter resource number in the queue is satisfiedfunc (s *Weighted) notifyWaiters() { // Loop checks whether the number of resources requested by the next waiter is satisfied. If it is satisfied, it will be dequeued. If it is not satisfied, it will be terminated. for { next := () // Get the team leader element if next == nil { break // There is no waiter, the queue is empty and terminates the loop } w := .(waiter) if < { // The current number of waiter resources is not satisfied, exit the loop // Do not continue to find whether the subsequent waiter request in the queue is satisfied to avoid hunger break } // The number of resources is satisfied, wake up the waiter += // Record the number of resources used (next) // Remove waiter from queue close() // Use the operation of closing the channel to wake up the waiter } }
notifyWaiters
The core logic inside the method is: loop check the next onewaiter
Whether the requested resource number is satisfied, if it is satisfied, it will be dequeued; if it is not satisfied, the loop will be terminated.
Can be found,next
It is the head element, so the waiter queue is first in first out (FIFO
)of.
Another point to note here is thatwaiter
When the number of resources is not satisfied, the loop is directly exited and no longer searches for subsequent searches in the queue.waiter
Whether the requested resource is satisfied. For example, the current number of available resources is 5, and there are two waiters in the waiter queue.waiter1
andwaiter2
,waiter1
The number of resources requested is 10.waiter2
The number of resources requested is 1, and the loop will be exited.waiter1
andwaiter2
All continue to wait, and will not allocate resources towaiter2
. This is done to avoid the subsequentwaiter3
、waiter4
... Always comparewaiter1
The number of resources requested is small, resulting inwaiter1
Long-term blockage, resulting in hunger.
Weighted
There is another last methodTryAcquire
Let's take a look at how it is implemented:
// TryAcquire tries to request n resources// No blocking, true for success, false for failure and no change in the number of resourcesfunc (s *Weighted) TryAcquire(n int64) bool { () // Locking to ensure concurrency security // If the remaining resources are sufficient and there are no other waiters, the requested resource will be successful success := >= n && () == 0 if success { += n // Record the number of resources currently used } () return success }
This method is also very simple to implement, which is to check whether the remaining resources are sufficient and there are no other waiters. If it is satisfied, the requested resource will be successful and the number of resources that have been used is increased. += n
, then returntrue
, otherwise returnfalse
。
At this point,semaphore
The source code of the package is explained.
Example of usage
Familiarsemaphore
The source code of the package is no problem how to use it. The following issemaphore
Provided in the package documentationworker pool
A pattern example that demonstrates how to use semaphores to limit the number of goroutines running in a parallel task. The code is as follows:
//x/[email protected]/semaphore#example-package-WorkerPool
package main import ( "context" "fmt" "log" "runtime" "/x/sync/semaphore" ) func main() { ctx := () var ( maxWorkers = (0) // The maximum number of workers supported by the worker pool, take the number of CPU cores of the current machine sem = (int64(maxWorkers)) // The total number of resources is the maximum number of workers out = make([]int, 32) // Total number of tasks ) // Start maxWorkers at a time. Number of goroutines. Calculate output. for i := range out { // When the maximum number of maxWorkers goroutines is being executed, Acquire blocks until one of the goroutines is completed if err := (ctx, 1); err != nil { // Request resources ("Failed to acquire semaphore: %v", err) break } // Open a new goroutine to perform calculation tasks go func(i int) { defer (1) // Release resources after task execution is completed out[i] = collatzSteps(i + 1) // Perform Collatz step calculation }(i) } // Get all tokens to wait for all goroutine execution to complete if err := (ctx, int64(maxWorkers)); err != nil { ("Failed to acquire semaphore: %v", err) } (out) } // collatzSteps computes the number of steps to reach 1 under the Collatz // conjecture. (See /wiki/Collatz_conjecture.) func collatzSteps(n int) (steps int) { if n <= 0 { panic("nonpositive input") } for ; n > 1; steps++ { if steps < 0 { panic("too many steps") } if n%2 == 0 { n /= 2 continue } const maxInt = int(^uint(0) >> 1) if n > (maxInt-1)/3 { panic("overflow") } n = 3*n + 1 } return steps }
This code uses semaphores (semaphore
) implement a working pool (worker pool
) to limit the maximum concurrent coroutine (goroutine
)number.
pass(0)
You can get the number of CPU cores of the current machine (for example, in my Apple M1 Pro machine this value is 10), asworker pool
The maximum supportedworker
Quantity, which is also the total number of resources of the semaphore.out
The slice length is 32, i.e. the total number of tasks is 32. usefor
Loop to start a new goroutine to perform calculation taskscollatzSteps(i + 1)
, but it will be called before opening a new goroutine(ctx, 1)
Towardsworker pool
Apply for a resource when the maximum number of jobsmaxWorkers
When a goroutine is being executed,Acquire
It will block until one of the goroutines is completed, and after the task is completed,defer
Called in a statement(1)
Free up resources.
NOTE:
About the calculation taskcollatzSteps
We don’t actually have to delve into it, we just need to know that this is a time-consuming task. Simply put, Collatz Conjecture is a mathematical conjecture that proposes a seemingly simple sequence rule for any positive integern
Perform an action ifn
is an even number, thenn
Divided by 2, ifn
It is an odd number, thenn
Multiply by 3 and add 1 (i.e.3n + 1
), after repeated execution of these operations, all integers will eventually reach 1.collatzSteps
Functions implement calculations in Collatz conjectures the next positive integern
How many steps are needed to reach 1.
It is worth noting that infor
After submitting all tasks in a loop, use(ctx, int64(maxWorkers))
All resources in the semaphore are obtained, so that all tasks will exit only after they are executed. Its function is similar.()
。
Then we useTo achieve the same function:
/jianghushinian/blog-go-example/blob/main/x/sync/semaphore/waitgroup/
func main() { var ( maxWorkers = (0) // Get the maximum number of CPU cores available to the system out = make([]int, 32) //Storing Collatz results wg // Used to wait for the goroutine to complete sem = make(chan struct{}, maxWorkers) // Used to limit the maximum number of concurrency ) for i := range out { // Manage concurrency through sem to ensure that at most maxWorkers goroutines are executed simultaneously sem <- struct{}{} // If sem is full, it will block until there is a free slot // Increasing WaitGroup count (1) go func(i int) { defer () // Reduce WaitGroup count when goroutine is completed defer func() { <-sem }() // When goroutine is completed, release a slot from the sem // Perform Collatz step calculation out[i] = collatzSteps(i + 1) }(i) } // Wait for all goroutines to complete () // Output result (out) }
Now compare it to useand
semaphore
Can the implemented code deepen your understanding of the function of semaphore?
Finally, let's leave another homework, please useerrgroup
Implement this sample program once.
NOTE:
If you're rightor
errgroup
If you are not familiar with it, please refer to my article "Go concurrency control: Detailed explanation"and"Go concurrency control: errgroup details」。
Summarize
This article provides extended concurrency primitives in Gosemaphore
I gave an explanation, and showed you the implementation of its source code and introduced how to use it.
I don't know if you've found it, initsemaphore
When passedn
If it is 1, then this semaphore is actually equivalent to a mutex lock。
semaphore
Can be used to implementworker pool
mode, and use routine or scene withIt's also similar, you can compare and learn.
The above is the detailed content of the principle and use of semaphore in concurrency control of Go language. For more information about Go semaphore, please follow my other related articles!