SoFunction
Updated on 2025-04-07

The principle and use of semaphore in concurrency control of Go language

Today we will introduce oneGo official library xExtended concurrency primitives providedsemaphore, translated as "semaphore". Because it controls collaboration between multiple goroutines just like a signal.

Concept explanation

I will first briefly introduce the concept of semaphores to supplement knowledge for unfamiliar readers.

An example in life: Suppose a restaurant has 10 dining tables in total, and each customer takes up 1 dining table, then a total of 10 people can be dining at the same time, and more than 10 people need to wait in line; if 1 customer has finished dining, the first customer who is waiting in line can come to eat.

If you add a semaphore, you can understand it like this:

  • 10 dining tables are limited resources.That is, the initial value of the semaphore is 10
  • When a customer enters the restaurant, if the dining table is available, the customer can be assigned 1 dining table.The value of the semaphore is reduced by 1
  • If there is no empty dining table, customers need to wait in line until there is an empty dining table (When the semaphore value is 0, new customers must wait).
  • After a customer has finished dining, the dining table is empty.The value of the semaphore is added to 1, new customers (customers waiting in line, or new customers when no one is in line) can be assigned a dining table.

This is the application scenario of semaphore, which helps manage the number of tables and ensures that the restaurant does not receive more than the number of customers it can accommodate at any time.

If placed in a Go program, we can use semaphores to implement the function of the task pool and control a limited number of goroutines to execute tasks concurrently. Let's put this in the future, let's take a look at it firstsemaphoreHow is the source code implemented?

Source code interpretation

We cansemaphoreThe exported method that is defined and implemented in the documentation:

//x/[email protected]/semaphore#pkg-index

type Weighted
    func NewWeighted(n int64) *Weighted
    func (s *Weighted) Acquire(ctx , n int64) error
    func (s *Weighted) Release(n int64)
    func (s *Weighted) TryAcquire(n int64) bool

is a structure representing a semaphore object.NewWeightedis its constructor, used to instantiate a containsnThe semaphore object of a resource, that is, the initial value of the semaphore isn

It implements 3 methods, namely:

  • Acquire: Used to requestnResources, that is, the value of the semaphore is reducednoperate. If resources are insufficient, block and wait until there are enough resources, orctxCanceled.
  • Release: Used to releasenresources, that is, add the value of the semaphorenoperate.
  • TryAcquire: Used to requestnResources, that is, the value of the semaphore is reducednoperate. andAcquireThe difference is,TryAcquireWill not block and wait, return successfullytrue, failed to returnfalse

Now, can youDoes the object and its implementation correspond to the previous example of dining in restaurants?

Next, let's take a lookWeightedHow is the structure defined:

/golang/sync/blob/v0.9.0/semaphore/

// Weighted semaphore structuretype Weighted struct {
	size    int64      // Total number of resources	cur     int64      // The number of resources currently used	mu       // Mutex locks ensure concurrent security of operations on other attributes	waiters   // Waiter queue, use list to implement}

WeightedThe structure contains 4 fields:

  • sizeIt is the total number of resources. In the example of a restaurant there are 10 dining tables.
  • curRecords the number of resources currently used. In the example of a restaurant, there are the number of tables that have been occupied by customers.size - curIt is the number of remaining resources, that is, the number of empty dining tables.
  • muIt is a mutex lock, used to ensure that operations on other attributes are concurrent and safe.
  • waitersIt is a waiting queue, recording all queued waiting users. In the restaurant example, when 10 dining tables are all filled, new customers are going to enter the waiting queue.

ConstructorNewWeightedImplementation is as follows:

// NewWeighted Construct a semaphore objectfunc NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}

also,semaphoreThe package also defines a structure for the waiterwaiter

// Waiter structuretype waiter struct {
	n     int64           // Number of requested resources	ready chan<- struct{} // Closed when a resource is obtained, used to wake up the current waiter}

waiterThe structure contains 2 fields:

  • nRecords the number of resources requested by the current waiter.
  • readyIt's onechannelType, when the resource satisfies,readyWill becloseIf you drop, the waiter will be awakened.

You will see its usefulness later.

Let's look back and take a lookWeightedThe first method of structureAcquireImplementation:

// Acquire requests n resources// If the resources are insufficient, block and wait until there are enough resources, or ctx is cancelled// Return nil successfully, return () after failure and does not change the number of resourcesfunc (s *Weighted) Acquire(ctx , n int64) error {
	done := ()

	() // Locking to ensure concurrency security
	// If ctx has been cancelled before allocating resources, it will be returned directly ()	select {
	case <-done:
		()
		return ()
	default:
	}

	// If the number of resources is sufficient and there are no other waiters, the requested resource is successful, add cur to n, and return	if  >= n && () == 0 {
		 += n
		()
		return nil
	}

	// If the requested number of resources is greater than the total number of resources and cannot be satisfied, blocking waits for ctx to cancel and returns ()	if n >  {
		// Don't make other Acquire calls block on one that's doomed to fail.
		()
		<-done
		return ()
	}

	// If there are insufficient resources or other waiting users, continue to execute
	// Join the waiting queue	ready := make(chan struct{})    // Create a channel as a property to record it into the waiter object waiter for subsequent notification of its wakeup	w := waiter{n: n, ready: ready} // Construct a waiter object	elem := (w)   // Append waiter to waiter queue	()

	// Use select to implement blocking and waiting	select {
	case <-done: // Check whether ctx is cancelled		()
		select {
		case <-ready: // Check whether the current waiter is awakened			// Enter here, it means that the waiter is awakened after ctx is cancelled.			 -= n        // Then just treat it as the waiter has not been awakened and return the requested resource count			() // Notify the waiting queue and check whether the next waiter resource in the queue meets the		default: // Remove the current waiter from the waiter queue			isFront := () == elem // Is the current waiter the first waiter			(elem)               // Remove from the queue			// If the current waiter is the first waiter in the queue and there are still resources left			if isFront &&  >  {
				() // Notify the waiting queue and check whether the next waiter resource in the queue meets the			}
		}
		()
		return ()

	case <-ready: // Wake up		select {
		case <-done: // Check again whether ctx is cancelled			// Enter here, it means that after the waiter is awakened, ctx is cancelled, and it is handled as if it has not been awakened			(n) // Free up resources			return ()
		default:
		}
		return nil // Return to nil successfully	}
}

This isWeightedThe most complex method of implementing structures, but I wrote very detailed comments in the code to help you understand.

AcquireThe main logic is:

  • Check firstctxWhether it is cancelled, if before allocation of resourcesctxCancel, return directly()
  • Check whether the current remaining resources meet the requested resource number. If the resource number is sufficient and no other waiter exists, the requested resource will be successful andcurPlusn, and return.
  • Verify whether the requested resource number is legal. If the requested resource number is greater than the total resource number, it cannot be satisfied, and it will block and wait.ctxCancel and return()
  • If the resources are not sufficient at present, or there are other waiters, the code will continue to execute and enter the blocking waiting logic:
    • First construct a waiter objectwaiterand add it to the waiter queue.
    • Next, useselectImplement blocking and waiting. There are two situations at this time.ctxCancel, orwaiterWake up.
      • in the case ofctxCancelled, I'll check it againwaiterWake up, if awakened,Then stillctxCancelled shall prevail, will be treated aswaiterNot awakened, return the requested resource count and call()Notify the waiter queue and check the next one in the queuewaiterWhether the resource number is satisfied; if it is not awakened, the current one will bewaiterRemove from the waiter queue if the currentwaiterIt is the first waiter in the queue and there are still resources left, and it will also be called()Notify the waiter queue and check the next one in the queuewaiterWhether the number of resources is satisfied.
      • in the case ofwaiterWake up, will check againctxWhether it is cancelled, if it is cancelled,ThenctxCancelled shall prevail, will call(n)Release the currentwaiterResource requested; if not cancelled, returnnilIndicates that the requested resource is successful.

Then, let's take a look at the release of resourcesReleaseMethod logic:

// Release releases n resourcesfunc (s *Weighted) Release(n int64) {
	() // Locking to ensure concurrency security	 -= n  // Free up resources	if  < 0 {
		()
		panic("semaphore: released more than held")
	}
	() // Notify the waiting queue and check whether the next waiter resource in the queue meets the	()
}

The logic here is much simpler, first execute -= nReduce the number of resources currently used, that is, this step is to release the resource operation. Note that this is correct It is judged whether it is less than 0, so when using it, you must release as much resources as you apply for, and don't use it wrong. Then call the same()Notify the waiter queue and check the next one in the queuewaiterWhether the number of resources is satisfied.

Now, it's time to seenotifyWaitersHow is the method implemented:

// Check whether the next waiter resource number in the queue is satisfiedfunc (s *Weighted) notifyWaiters() {
	// Loop checks whether the number of resources requested by the next waiter is satisfied. If it is satisfied, it will be dequeued. If it is not satisfied, it will be terminated.	for {
		next := () // Get the team leader element		if next == nil {
			break // There is no waiter, the queue is empty and terminates the loop		}

		w := .(waiter)
		if  <  { // The current number of waiter resources is not satisfied, exit the loop			// Do not continue to find whether the subsequent waiter request in the queue is satisfied to avoid hunger			break
		}

		// The number of resources is satisfied, wake up the waiter		 +=            // Record the number of resources used		(next) // Remove waiter from queue		close()         // Use the operation of closing the channel to wake up the waiter	}
}

notifyWaitersThe core logic inside the method is: loop check the next onewaiterWhether the requested resource number is satisfied, if it is satisfied, it will be dequeued; if it is not satisfied, the loop will be terminated.

Can be found,nextIt is the head element, so the waiter queue is first in first out (FIFO)of.

Another point to note here is thatwaiterWhen the number of resources is not satisfied, the loop is directly exited and no longer searches for subsequent searches in the queue.waiterWhether the requested resource is satisfied. For example, the current number of available resources is 5, and there are two waiters in the waiter queue.waiter1andwaiter2waiter1The number of resources requested is 10.waiter2The number of resources requested is 1, and the loop will be exited.waiter1andwaiter2All continue to wait, and will not allocate resources towaiter2. This is done to avoid the subsequentwaiter3waiter4... Always comparewaiter1The number of resources requested is small, resulting inwaiter1Long-term blockage, resulting in hunger.

WeightedThere is another last methodTryAcquireLet's take a look at how it is implemented:

// TryAcquire tries to request n resources// No blocking, true for success, false for failure and no change in the number of resourcesfunc (s *Weighted) TryAcquire(n int64) bool {
	() // Locking to ensure concurrency security	// If the remaining resources are sufficient and there are no other waiters, the requested resource will be successful	success :=  >= n && () == 0
	if success {
		 += n // Record the number of resources currently used	}
	()
	return success
}

This method is also very simple to implement, which is to check whether the remaining resources are sufficient and there are no other waiters. If it is satisfied, the requested resource will be successful and the number of resources that have been used is increased. += n, then returntrue, otherwise returnfalse

At this point,semaphoreThe source code of the package is explained.

Example of usage

FamiliarsemaphoreThe source code of the package is no problem how to use it. The following issemaphoreProvided in the package documentationworker poolA pattern example that demonstrates how to use semaphores to limit the number of goroutines running in a parallel task. The code is as follows:

//x/[email protected]/semaphore#example-package-WorkerPool

package main

import (
	"context"
	"fmt"
	"log"
	"runtime"

	"/x/sync/semaphore"
)

func main() {
	ctx := ()

	var (
		maxWorkers = (0)                    // The maximum number of workers supported by the worker pool, take the number of CPU cores of the current machine		sem        = (int64(maxWorkers)) // The total number of resources is the maximum number of workers		out        = make([]int, 32)                          // Total number of tasks	)

	// Start maxWorkers at a time. Number of goroutines. Calculate output.	for i := range out {
		// When the maximum number of maxWorkers goroutines is being executed, Acquire blocks until one of the goroutines is completed		if err := (ctx, 1); err != nil { // Request resources			("Failed to acquire semaphore: %v", err)
			break
		}

		// Open a new goroutine to perform calculation tasks		go func(i int) {
			defer (1)         // Release resources after task execution is completed			out[i] = collatzSteps(i + 1) // Perform Collatz step calculation		}(i)
	}

	// Get all tokens to wait for all goroutine execution to complete	if err := (ctx, int64(maxWorkers)); err != nil {
		("Failed to acquire semaphore: %v", err)
	}

	(out)

}

// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See /wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
	if n <= 0 {
		panic("nonpositive input")
	}

	for ; n > 1; steps++ {
		if steps < 0 {
			panic("too many steps")
		}

		if n%2 == 0 {
			n /= 2
			continue
		}

		const maxInt = int(^uint(0) >> 1)
		if n > (maxInt-1)/3 {
			panic("overflow")
		}
		n = 3*n + 1
	}

	return steps
}

This code uses semaphores (semaphore) implement a working pool (worker pool) to limit the maximum concurrent coroutine (goroutine)number.

pass(0)You can get the number of CPU cores of the current machine (for example, in my Apple M1 Pro machine this value is 10), asworker poolThe maximum supportedworkerQuantity, which is also the total number of resources of the semaphore.outThe slice length is 32, i.e. the total number of tasks is 32. useforLoop to start a new goroutine to perform calculation taskscollatzSteps(i + 1), but it will be called before opening a new goroutine(ctx, 1)Towardsworker poolApply for a resource when the maximum number of jobsmaxWorkersWhen a goroutine is being executed,AcquireIt will block until one of the goroutines is completed, and after the task is completed,deferCalled in a statement(1)Free up resources.

NOTE:

About the calculation taskcollatzStepsWe don’t actually have to delve into it, we just need to know that this is a time-consuming task. Simply put, Collatz Conjecture is a mathematical conjecture that proposes a seemingly simple sequence rule for any positive integernPerform an action ifnis an even number, thennDivided by 2, ifnIt is an odd number, thennMultiply by 3 and add 1 (i.e.3n + 1), after repeated execution of these operations, all integers will eventually reach 1.collatzStepsFunctions implement calculations in Collatz conjectures the next positive integernHow many steps are needed to reach 1.

It is worth noting that inforAfter submitting all tasks in a loop, use(ctx, int64(maxWorkers))All resources in the semaphore are obtained, so that all tasks will exit only after they are executed. Its function is similar.()

Then we useTo achieve the same function:

/jianghushinian/blog-go-example/blob/main/x/sync/semaphore/waitgroup/

func main() {
	var (
		maxWorkers = (0)           // Get the maximum number of CPU cores available to the system		out        = make([]int, 32)                 //Storing Collatz results		wg                             // Used to wait for the goroutine to complete		sem        = make(chan struct{}, maxWorkers) // Used to limit the maximum number of concurrency	)

	for i := range out {
		// Manage concurrency through sem to ensure that at most maxWorkers goroutines are executed simultaneously		sem <- struct{}{} // If sem is full, it will block until there is a free slot
		// Increasing WaitGroup count		(1)

		go func(i int) {
			defer ()          // Reduce WaitGroup count when goroutine is completed			defer func() { <-sem }() // When goroutine is completed, release a slot from the sem
			// Perform Collatz step calculation			out[i] = collatzSteps(i + 1)
		}(i)
	}

	// Wait for all goroutines to complete	()

	// Output result	(out)
}

Now compare it to useandsemaphoreCan the implemented code deepen your understanding of the function of semaphore?

Finally, let's leave another homework, please useerrgroupImplement this sample program once.

NOTE:

If you're rightorerrgroupIf you are not familiar with it, please refer to my article "Go concurrency control: Detailed explanation"and"Go concurrency control: errgroup details」。

Summarize

This article provides extended concurrency primitives in GosemaphoreI gave an explanation, and showed you the implementation of its source code and introduced how to use it.

I don't know if you've found it, initsemaphoreWhen passednIf it is 1, then this semaphore is actually equivalent to a mutex lock

semaphoreCan be used to implementworker poolmode, and use routine or scene withIt's also similar, you can compare and learn.

The above is the detailed content of the principle and use of semaphore in concurrency control of Go language. For more information about Go semaphore, please follow my other related articles!