Runtime semaphore mechanism semaphore
Preface
I've been looking at the source code recently and found that this has been used in many places.semaphore
。
This article is ingo version go1.13.15 darwin/amd64
On
What is the function
Below is the official description
// Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case // of other synchronization primitives. // Thus it targets the same goal as Linux's futex, // but it has much simpler semantics. // // That is, don't think of these as semaphores. // Think of them as a way to implement sleep and wakeup // such that every sleep is paired with a single wakeup, // even if, due to races, the wakeup happens before the sleep. // The specific usage is to provide sleep and wakeup primitives// to enable it to be used in competition situations in other synchronous primitives// Therefore, the semaphore and futex goals in Linux are consistent// It's just a bit simpler semantically// // That is, don't think these are semaphores// Think of what is here as a way to implement sleep and wakeup// Every sleep will be paired with a wakeup// Even if it happens race hour,wakeup exist sleep 之前hour也是如此
The above mentioned andfutex
The same function, aboutfutex
futex (abbreviation of fast user zone mutual exclusion) is a basic tool for implementing locking and building advanced abstract locks such as semaphores and POSIX mutual exclusion on Linux
Futex consists of a piece of memory space that can be shared by multiple processes (an aligned integer variable); the value of this integer variable can be increased or decreased by calling the atomic operation instructions provided by the CPU through assembly language, and a process can wait until that value becomes a positive number. Futex operations are almost all completed in user space; only when the operation results are inconsistent and arbitration is required, it is necessary to enter the operating system kernel space for execution. This mechanism allows the locking primitives that use futex to have very high execution efficiency: since most operations do not require arbitration between multiple processes, most operations can be executed in the application space without using (relatively expensive) kernel system calls.
In gosemaphore
Function andfutex
Like the goal, providesleep
andwakeup
primitives that enable them to be used in competition situations in other synchronous primitives. When agoroutine
When it is necessary to hibernate, store it in a centralized manner when it is requiredwakeup
When , remove it and put it back into the scheduler.
For example, in the implementation of read and write lock, the mutual blocking awakening before the read lock and write lock is throughsleep
andwakeup
Implementation: When a read lock exists, the newly added write lock passessemaphore
Block yourself, the current read lock is completed, and it is passedsemaphore
Wake up the blocked write lock.
Write lock
// Get the mutex lock// Blocking and waiting for all read operations to end (if any)func (rw *RWMutex) Lock() { ... // Modify the value of readerCount by atom, and directly subtract the readerCount from rwmutexMaxReaders // Instructions: A write lock has come in, which is also reflected in the read lock above r := atomic.AddInt32(&, -rwmutexMaxReaders) + rwmutexMaxReaders // When r is not 0, there is a read lock before the current write lock. // Modify readerWait, which is the number of read locks that the current write lock needs to wait for if r != 0 && atomic.AddInt32(&, r) != 0 { // Block the current write lock runtime_SemacquireMutex(&, false, 0) } ... }
passruntime_SemacquireMutex
Perform the current write locksleep
Read lock release
// Reduce the read operation count, that is, readerCount--// Wake up the coroutine waiting for write operations (if any)func (rw *RWMutex) RUnlock() { ... // First make readerCount-1 through atomic atomicity // 1. If readerCount is greater than 0, it proves that there is still a read lock, and the operation will be directly terminated. // 2. If readerCount is less than 0, it proves that there is no read lock, but there is still a write lock that is blocked because the read lock is blocked. if r := atomic.AddInt32(&, -1); r < 0 { // Try to wake up the blocked write lock (r) } ... } func (rw *RWMutex) rUnlockSlow(r int32) { ... // readerWait--operation. If the value after readerWait--operation is 0, it means that there is no read lock before the write lock. //With the writerSem semaphore, wake up the first blocking write lock in the queue if atomic.AddInt32(&, -1) == 0 { // Wake up a write lock runtime_Semrelease(&, false, 1) } }
After the write lock is processed, call itruntime_Semrelease
Come to wake upsleep
Write lock
Several main methods
existgo/src/sync/
These methods are defined
// Semacquire waits for *s > 0, then decrements it atomically.// It is a simple sleep primitive for synchronizing// library and should not be used directly.func runtime_Semacquire(s *uint32) // SemacquireMutex is similar to Semacquire, used to block mutually exclusive objects// If lifo is true, waiter will be inserted into the head of the queue// skipframes is the number of frames to be omitted during the tracking process, and start to calculate it from here// runtime_SemacquireMutex's caller. func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) // Semrelease will automatically increase *s and notify a waiting goroutine blocked by Semacquire// It is a simple wake-up primitive for synchronization// library and should not be used directly.// If handoff is true, pass the signal to the waiter at the head of the queue// skipframes is the number of frames to be omitted during the tracking process, and start to calculate it from here// runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
The specific implementation isgo/src/runtime/
middle
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) }
How to implement it
sudog cache
semaphore
The implementation has been usedsudog
, let's take a look first
sudog is used to store blocking states when running.goroutine
An upper-level abstraction is one of the main mechanisms used to implement user-state semaphores. For example, when agoroutine
Because waitingchannel
When the data needs to be blocked,sudog
Will begoroutine
It is used to record the locations used to wait for data, and then connect it into a waiting queue, or binary balance tree.
// sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // The following fields are protected by hchan g *g // isSelect means g is participating in a select, so // Therefore, wake-up race must be obtained in CAS. isSelect bool next *sudog prev *sudog elem // Data element (may point to stack) // The following fields will not be accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the above fields) // Accessed only when holding a semroot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog //semaRoot binary tree waitlink *sudog // List or semaRoot waittail *sudog // semaRoot c *hchan // channel }
sudog
Acquisition and return of the following strategies:
1. Get it first fromper-P
cache fetch, forper-P
cache, ifper-P
The cache is empty, then half of it is fetched from the global pool and then fetched it out.per-P
The last one in the cache;
2. Return, return toper-P
cache, ifper-P
When the cache is full, just put itper-P
Half of the cache is returned to the global cache and then returnedsudog
arriveper-P
In cache.
acquireSudog
1. Ifper-P
If the cached content does not reach the length, half of it will be fetched from the global cache;
2. Then returnper-P
The last one in the cachesudog
Return and empty;
// go/src/runtime/ //go:nosplit func acquireSudog() *sudog { // Delicate dance: The implementation of the semaphore calls acquireSudog, and then acquireSudog calls new(sudog) // new calls malloc, malloc calls garbage collector, garbage collector calls semaphore in stopTheWorld // Break the loop by executing acquirem/releasem around new(sudog) // acquirem/releasem is added during new(sudog) to prevent the garbage collector from being called. // Get the m where g is currently located mp := acquirem() // Get the pointer to p pp := () if len() == 0 { lock(&) // First, try to get a batch of data from the central cache. for len() < cap()/2 && != nil { s := = = nil = append(, s) } unlock(&) // If there is no in the central cache, the new allocation if len() == 0 { = append(, new(sudog)) } } // Get the last one in the cache n := len() s := [n-1] [n-1] = nil // Remove the just-removed one in the cache = [:n-1] if != nil { throw("acquireSudog: found != nil in cache") } releasem(mp) return s }
releaseSudog
1. Ifper-P
Once the cache is full, return itper-P
Cache general content to global cache;
2. Then the recycledsudog
Put it inper-P
In cache.
// go/src/runtime/ //go:nosplit func releaseSudog(s *sudog) { if != nil { throw("runtime: sudog with non-nil elem") } if { throw("runtime: sudog with non-false isSelect") } if != nil { throw("runtime: sudog with non-nil next") } if != nil { throw("runtime: sudog with non-nil prev") } if != nil { throw("runtime: sudog with non-nil waitlink") } if != nil { throw("runtime: sudog with non-nil c") } gp := getg() if != nil { throw("runtime: releaseSudog with non-nil ") } // Avoid rescheduling to another P mp := acquirem() // avoid rescheduling to another P pp := () // If the cache is full if len() == cap() { // Transfer half of the local cache to the central cache var first, last *sudog for len() > cap()/2 { n := len() p := [n-1] [n-1] = nil = [:n-1] if first == nil { first = p } else { = p } last = p } lock(&) = = first unlock(&) } // Return sudog to `per-P` cache = append(, s) releasem(mp) }
semaphore
// go/src/runtime/ // Asynchronous semaphore used. // semaRoot has a sudog balance tree with different addresses().// Each sudog can point to a list in turn (by) other sudogs waiting for at the same address.// All operations performed on sudog internal lists with the same address are O(1). The scan of the top-level semaRoot list is O(log n),// where n is the number of different addresses that block goroutines, through them hashing to the given semaRoot.type semaRoot struct { lock mutex // The root node of the balanced tree of the waiters treap *sudog // The number of waiters, nothing is available when reading nwait uint32 } // Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize]struct { root semaRoot pad [ - (semaRoot{})]byte }
poll_runtime_Semacquire/sync_runtime_SemacquireMutex
// go/src/runtime/ //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { // Determine whether this goroutine is the one running on m gp := getg() if gp != { throw("semacquire not on the G stack") } // *addr -= 1 if cansemacquire(addr) { return } // Increase the waiting count // Try again cansemacquire if successful, return directly // Join yourself as a waiter // Sleep // (The waiter descriptor generates dequeue behavior from the dequeue signal) // Get a sudog s := acquireSudog() root := semroot(addr) t0 := int64(0) = 0 = 0 = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } = t0 } for { lock(&) // Add ourselves to nwait to disable "easy case" in semrelease (&, 1) // Check cansemacquire to avoid missing wake-up if cansemacquire(addr) { (&, -1) unlock(&) break } // Any semrelease after cansemacquire knows that we are waiting (because nwait is set), so sleeping // Queue adds s to the blocked goroutine in semaRoot (addr, s, lifo) // Put the current goroutine in a waiting state and unlock the lock. // By calling goroutine can be run again. goparkunlock(&, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if != 0 || cansemacquire(addr) { break } } if > 0 { blockevent(-t0, 3+skipframes) } // Return sudog releaseSudog(s) } func cansemacquire(addr *uint32) bool { for { v := (addr) if v == 0 { return false } if (addr, v, v-1) { return true } } }
sync_runtime_Semrelease
// go/src/runtime/ //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) } func semrelease1(addr *uint32, handoff bool, skipframes int) { root := semroot(addr) (addr, 1) // Easy case: No waiter // This check must occur after xadd to avoid missing wake-up if (&) == 0 { return } // Harder case: Find the waiter and wake up lock(&) if (&) == 0 { // The count has been taken up by another goroutine, // Therefore there is no need to wake up other goroutines. unlock(&) return } // Search for one and wait and wake it up s, t0 := (addr) if s != nil { (&, -1) } unlock(&) if s != nil { // It may be slow, so unlock it first acquiretime := if acquiretime != 0 { mutexevent(t0-acquiretime, 3+skipframes) } if != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { = 1 } // goready(, 5) // Mark runnable, waiting to be rescheduled readyWithTime(s, 5+skipframes) } }
Excerpted from a summary of "Synchronous Primitives"
This pair of semacquire and semrelease may not be very intuitive to understand. First of all, we must realize that these two functions must be executed on two different M (threads), otherwise there will be no concurrency, so we might as well set it to M1 and M2. When G1 on M1 executes to semacquire1, if the fast path is successful, it means that G1 has grabbed the lock and can continue to execute. However, once it fails and still cannot get the lock under the slow path, it will enter goparkunlock, put the current G1 into the waiting queue, and then let M1 switch and execute other Gs. When G2 on M2 starts calling semrelease1, it simply puts G1 waiting for the queue back into the scheduling queue, and when G1 is rescheduled (assuming that luck is scheduled on M1 again), the code will still start executing after goparkunlock and try to compete for semaphore again. If successful, sudog will be returned.
refer to
【Synchronous primitive】/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Practical Go concurrent programming practice--the use method and its implementation principle of semaphores]/post/6906677772479889422
【Semaphore】/cch123/golang-notes/blob/master/
[Semaphore mechanism (pv operation) and three classic synchronization problems in process synchronization]/SpeedMe/article/details/17597373
This is the article about the interpretation of semaphore source code in go. For more relevant semaphore source code content in go, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!