SoFunction
Updated on 2025-03-04

Read the source code of semaphore (semaphore) in go in one article

Runtime semaphore mechanism semaphore

Preface

I've been looking at the source code recently and found that this has been used in many places.semaphore

This article is ingo version go1.13.15 darwin/amd64On

What is the function

Below is the official description

// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.

// The specific usage is to provide sleep and wakeup primitives// to enable it to be used in competition situations in other synchronous primitives// Therefore, the semaphore and futex goals in Linux are consistent// It's just a bit simpler semantically//
// That is, don't think these are semaphores// Think of what is here as a way to implement sleep and wakeup// Every sleep will be paired with a wakeup// Even if it happens race hour,wakeup exist sleep 之前hour也是如此 

The above mentioned andfutexThe same function, aboutfutex

futex (abbreviation of fast user zone mutual exclusion) is a basic tool for implementing locking and building advanced abstract locks such as semaphores and POSIX mutual exclusion on Linux

Futex consists of a piece of memory space that can be shared by multiple processes (an aligned integer variable); the value of this integer variable can be increased or decreased by calling the atomic operation instructions provided by the CPU through assembly language, and a process can wait until that value becomes a positive number. Futex operations are almost all completed in user space; only when the operation results are inconsistent and arbitration is required, it is necessary to enter the operating system kernel space for execution. This mechanism allows the locking primitives that use futex to have very high execution efficiency: since most operations do not require arbitration between multiple processes, most operations can be executed in the application space without using (relatively expensive) kernel system calls.

In gosemaphoreFunction andfutexLike the goal, providesleepandwakeupprimitives that enable them to be used in competition situations in other synchronous primitives. When agoroutineWhen it is necessary to hibernate, store it in a centralized manner when it is requiredwakeupWhen , remove it and put it back into the scheduler.

For example, in the implementation of read and write lock, the mutual blocking awakening before the read lock and write lock is throughsleepandwakeupImplementation: When a read lock exists, the newly added write lock passessemaphoreBlock yourself, the current read lock is completed, and it is passedsemaphoreWake up the blocked write lock.

Write lock

// Get the mutex lock// Blocking and waiting for all read operations to end (if any)func (rw *RWMutex) Lock() {
	...
	// Modify the value of readerCount by atom, and directly subtract the readerCount from rwmutexMaxReaders	// Instructions: A write lock has come in, which is also reflected in the read lock above	r := atomic.AddInt32(&, -rwmutexMaxReaders) + rwmutexMaxReaders
	// When r is not 0, there is a read lock before the current write lock.	// Modify readerWait, which is the number of read locks that the current write lock needs to wait for	if r != 0 && atomic.AddInt32(&, r) != 0 {
		// Block the current write lock		runtime_SemacquireMutex(&, false, 0)
	}
	...
}

passruntime_SemacquireMutexPerform the current write locksleep

Read lock release

// Reduce the read operation count, that is, readerCount--// Wake up the coroutine waiting for write operations (if any)func (rw *RWMutex) RUnlock() {
	...
	// First make readerCount-1 through atomic atomicity	// 1. If readerCount is greater than 0, it proves that there is still a read lock, and the operation will be directly terminated.	// 2. If readerCount is less than 0, it proves that there is no read lock, but there is still a write lock that is blocked because the read lock is blocked.	if r := atomic.AddInt32(&, -1); r < 0 {
		// Try to wake up the blocked write lock		(r)
	}
	...
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	...
	// readerWait--operation. If the value after readerWait--operation is 0, it means that there is no read lock before the write lock.	//With the writerSem semaphore, wake up the first blocking write lock in the queue	if atomic.AddInt32(&, -1) == 0 {
		// Wake up a write lock		runtime_Semrelease(&, false, 1)
	}
}

After the write lock is processed, call itruntime_SemreleaseCome to wake upsleepWrite lock

Several main methods

existgo/src/sync/These methods are defined

// Semacquire waits for *s > 0, then decrements it atomically.// It is a simple sleep primitive for synchronizing// library and should not be used directly.func runtime_Semacquire(s *uint32)

// SemacquireMutex is similar to Semacquire, used to block mutually exclusive objects// If lifo is true, waiter will be inserted into the head of the queue// skipframes is the number of frames to be omitted during the tracking process, and start to calculate it from here// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

// Semrelease will automatically increase *s and notify a waiting goroutine blocked by Semacquire// It is a simple wake-up primitive for synchronization// library and should not be used directly.// If handoff is true, pass the signal to the waiter at the head of the queue// skipframes is the number of frames to be omitted during the tracking process, and start to calculate it from here// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

The specific implementation isgo/src/runtime/middle

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
	semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
	semrelease1(addr, handoff, skipframes)
}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

How to implement it

sudog cache

semaphoreThe implementation has been usedsudog, let's take a look first

sudog is used to store blocking states when running.goroutineAn upper-level abstraction is one of the main mechanisms used to implement user-state semaphores. For example, when agoroutineBecause waitingchannelWhen the data needs to be blocked,sudogWill begoroutineIt is used to record the locations used to wait for data, and then connect it into a waiting queue, or binary balance tree.

// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
	// The following fields are protected by hchan	g *g

	// isSelect means g is participating in a select, so	// Therefore, wake-up race must be obtained in CAS.	isSelect bool
	next  *sudog
	prev  *sudog
	elem   // Data element (may point to stack)
	// The following fields will not be accessed concurrently.	// For channels, waitlink is only accessed by g.	// For semaphores, all fields (including the above fields)	// Accessed only when holding a semroot lock.	acquiretime int64
	releasetime int64
	ticket  uint32
	parent  *sudog //semaRoot binary tree	waitlink *sudog // List or semaRoot	waittail *sudog // semaRoot
	c   *hchan // channel
}

sudogAcquisition and return of the following strategies:

1. Get it first fromper-Pcache fetch, forper-Pcache, ifper-PThe cache is empty, then half of it is fetched from the global pool and then fetched it out.per-PThe last one in the cache;

2. Return, return toper-Pcache, ifper-PWhen the cache is full, just put itper-PHalf of the cache is returned to the global cache and then returnedsudogarriveper-PIn cache.

acquireSudog

1. Ifper-PIf the cached content does not reach the length, half of it will be fetched from the global cache;

2. Then returnper-PThe last one in the cachesudogReturn and empty;

// go/src/runtime/
//go:nosplit
func acquireSudog() *sudog {
	// Delicate dance: The implementation of the semaphore calls acquireSudog, and then acquireSudog calls new(sudog)	// new calls malloc, malloc calls garbage collector, garbage collector calls semaphore in stopTheWorld	// Break the loop by executing acquirem/releasem around new(sudog)	// acquirem/releasem is added during new(sudog) to prevent the garbage collector from being called.
	// Get the m where g is currently located	mp := acquirem()
	// Get the pointer to p	pp := ()
	if len() == 0 {
		lock(&)
		// First, try to get a batch of data from the central cache.		for len() < cap()/2 &&  != nil {
			s := 
			 = 
			 = nil
			 = append(, s)
		}
		unlock(&)
		// If there is no in the central cache, the new allocation		if len() == 0 {
			 = append(, new(sudog))
		}
	}
	// Get the last one in the cache	n := len()
	s := [n-1]
	[n-1] = nil
	// Remove the just-removed one in the cache	 = [:n-1]
	if  != nil {
		throw("acquireSudog: found  != nil in cache")
	}
	releasem(mp)
	return s
}

releaseSudog

1. Ifper-POnce the cache is full, return itper-PCache general content to global cache;

2. Then the recycledsudogPut it inper-PIn cache.

// go/src/runtime/
//go:nosplit
func releaseSudog(s *sudog) {
	if  != nil {
		throw("runtime: sudog with non-nil elem")
	}
	if  {
		throw("runtime: sudog with non-false isSelect")
	}
	if  != nil {
		throw("runtime: sudog with non-nil next")
	}
	if  != nil {
		throw("runtime: sudog with non-nil prev")
	}
	if  != nil {
		throw("runtime: sudog with non-nil waitlink")
	}
	if  != nil {
		throw("runtime: sudog with non-nil c")
	}
	gp := getg()
	if  != nil {
		throw("runtime: releaseSudog with non-nil ")
	}
	// Avoid rescheduling to another P	mp := acquirem() // avoid rescheduling to another P
	pp := ()
	// If the cache is full	if len() == cap() {
		// Transfer half of the local cache to the central cache		var first, last *sudog
		for len() > cap()/2 {
			n := len()
			p := [n-1]
			[n-1] = nil
			 = [:n-1]
			if first == nil {
				first = p
			} else {
				 = p
			}
			last = p
		}
		lock(&)
		 = 
		 = first
		unlock(&)
	}
	// Return sudog to `per-P` cache	 = append(, s)
	releasem(mp)
}

semaphore

// go/src/runtime/
// Asynchronous semaphore used.
// semaRoot has a sudog balance tree with different addresses().// Each sudog can point to a list in turn (by) other sudogs waiting for at the same address.// All operations performed on sudog internal lists with the same address are O(1).  The scan of the top-level semaRoot list is O(log n),// where n is the number of different addresses that block goroutines, through them hashing to the given semaRoot.type semaRoot struct {
	lock mutex
	// The root node of the balanced tree of the waiters	treap *sudog
	// The number of waiters, nothing is available when reading	nwait uint32
}

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
	root semaRoot
	pad [ - (semaRoot{})]byte
}

poll_runtime_Semacquire/sync_runtime_SemacquireMutex

// go/src/runtime/
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
	semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}


func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
	// Determine whether this goroutine is the one running on m	gp := getg()
	if gp !=  {
		throw("semacquire not on the G stack")
	}

	// *addr -= 1
	if cansemacquire(addr) {
		return
	}

	// Increase the waiting count	// Try again cansemacquire if successful, return directly	// Join yourself as a waiter	// Sleep	// (The waiter descriptor generates dequeue behavior from the dequeue signal)
	// Get a sudog	s := acquireSudog()
	root := semroot(addr)
	t0 := int64(0)
	 = 0
	 = 0
	 = 0
	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
		t0 = cputicks()
		 = -1
	}
	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
		if t0 == 0 {
			t0 = cputicks()
		}
		 = t0
	}
	for {
		lock(&)
		// Add ourselves to nwait to disable "easy case" in semrelease		(&, 1)
		// Check cansemacquire to avoid missing wake-up		if cansemacquire(addr) {
			(&, -1)
			unlock(&)
			break
		}
		// Any semrelease after cansemacquire knows that we are waiting (because nwait is set), so sleeping
		// Queue adds s to the blocked goroutine in semaRoot		(addr, s, lifo)
		// Put the current goroutine in a waiting state and unlock the lock.		// By calling goroutine can be run again.		goparkunlock(&, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
		if  != 0 || cansemacquire(addr) {
			break
		}
	}
	if  > 0 {
		blockevent(-t0, 3+skipframes)
	}

	// Return sudog	releaseSudog(s)
}

func cansemacquire(addr *uint32) bool {
	for {
		v := (addr)
		if v == 0 {
			return false
		}
		if (addr, v, v-1) {
			return true
		}
	}
}

sync_runtime_Semrelease

// go/src/runtime/
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
	semrelease1(addr, handoff, skipframes)
}

func semrelease1(addr *uint32, handoff bool, skipframes int) {
	root := semroot(addr)
	(addr, 1)

	// Easy case: No waiter	// This check must occur after xadd to avoid missing wake-up	if (&) == 0 {
		return
	}

	// Harder case: Find the waiter and wake up	lock(&)
	if (&) == 0 {
		// The count has been taken up by another goroutine,		// Therefore there is no need to wake up other goroutines.		unlock(&)
		return
	}

	// Search for one and wait and wake it up	s, t0 := (addr)
	if s != nil {
		(&, -1)
	}
	unlock(&)
	if s != nil { // It may be slow, so unlock it first		acquiretime := 
		if acquiretime != 0 {
			mutexevent(t0-acquiretime, 3+skipframes)
		}
		if  != 0 {
			throw("corrupted semaphore ticket")
		}
		if handoff && cansemacquire(addr) {
			 = 1
		}
		// goready(, 5) 
		// Mark runnable, waiting to be rescheduled		readyWithTime(s, 5+skipframes)
	}
}

Excerpted from a summary of "Synchronous Primitives"

This pair of semacquire and semrelease may not be very intuitive to understand. First of all, we must realize that these two functions must be executed on two different M (threads), otherwise there will be no concurrency, so we might as well set it to M1 and M2. When G1 on M1 executes to semacquire1, if the fast path is successful, it means that G1 has grabbed the lock and can continue to execute. However, once it fails and still cannot get the lock under the slow path, it will enter goparkunlock, put the current G1 into the waiting queue, and then let M1 switch and execute other Gs. When G2 on M2 starts calling semrelease1, it simply puts G1 waiting for the queue back into the scheduling queue, and when G1 is rescheduled (assuming that luck is scheduled on M1 again), the code will still start executing after goparkunlock and try to compete for semaphore again. If successful, sudog will be returned.

refer to

【Synchronous primitive】/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Practical Go concurrent programming practice--the use method and its implementation principle of semaphores]/post/6906677772479889422
【Semaphore】/cch123/golang-notes/blob/master/
[Semaphore mechanism (pv operation) and three classic synchronization problems in process synchronization]/SpeedMe/article/details/17597373

This is the article about the interpretation of semaphore source code in go. For more relevant semaphore source code content in go, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!