Overview
The rwlock in go is written preferred, which can avoid writing lock hunger.
Read locks and write locks hold locks according to the first-come and then-come rules. Once a coroutine holds the write lock, the subsequent coroutine can only get the read lock after the write lock is released.
Similarly, once >= 1 coroutine writes the read lock, the subsequent coroutine can only get the write lock after all these read locks are released.
Let’s learn how Go’s RWMutex is implemented. The following code is taken from go1.17.2/src/sync/ and deleted the race-related code.
PS: The code of rwmutex is quite short, but reading the source code is not that scary...
The structure of RWMutex
RWMutex is generally implemented through: normal locks and conditional variables
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers }
Lock
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. () // Announce to readers there is a pending writer. r := atomic.AddInt32(&, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&, r) != 0 { runtime_SemacquireMutex(&, false, 0) } }
Unlock
const rwmutexMaxReaders = 1 << 30 func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&, false, 0) } // Allow other writers to proceed. () }
RLock
func (rw *RWMutex) RLock() { if atomic.AddInt32(&, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&, false, 0) } }
RUnlock
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined (r) } } func (rw *RWMutex) rUnlockSlow(r int32) { // A writer is pending. if atomic.AddInt32(&, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&, false, 1) } }
Q1: Multiple coroutines take read locks concurrently. How to ensure that these read lock coroutines will not be blocked?
func (rw *RWMutex) RLock() { if atomic.AddInt32(&, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&, false, 0) } }
When you take a read lock, only readerCount will be added, so the read locks can be concurrently normal.
Q2: Multiple coroutines receive write locks concurrently. How to ensure that only one coroutine will receive the write lock?
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. () // Announce to readers there is a pending writer. r := atomic.AddInt32(&, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&, r) != 0 { runtime_SemacquireMutex(&, false, 0) } }
When you take a write lock, you will get it, which naturally guarantees that there will be only one write lock at the same time.
Q3: When the read lock is obtained, the new coroutine will take the write lock. If the write lock is guaranteed to be ready, it will be blocked?
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. () // Announce to readers there is a pending writer. r := atomic.AddInt32(&, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&, r) != 0 { runtime_SemacquireMutex(&, false, 0) } }
Assuming that 5 coroutines get the read lock at this time, readerCount = 5, and assuming rwmutexMaxReaders = 100.
At this time, there is a new coroutine w1 that wants to get the write lock.
Execution
r := atomic.AddInt32(&, -rwmutexMaxReaders) + rwmutexMaxReaders
After, = -95, r = 5.
Execution
atomic.AddInt32(&, r)
After, = 5.
readerWait
Records how many coroutines hold read locks at the moment of acquiring the write lock.After this moment, even if a new coroutine tries to acquire the read lock, it will only add readerCount and will not move to readerWait.
Then execute runtime_SemacquireMutex() and sleep on the writerSem semaphore.
Q4: When the read lock is obtained, the new coroutine will block the write lock when the old read lock coroutine is released. How to wake up the waiting write lock coroutine
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined (r) } } func (rw *RWMutex) rUnlockSlow(r int32) { // A writer is pending. if atomic.AddInt32(&, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&, false, 1) } }
Continuing with the previous scene, readerCount will be subtracted by 1 whenever RUnlock is executed. When readerCount is negative, it means that a coroutine is holding or is waiting to hold a write lock.
Four of the previous five read coroutines, after each RUnlock(), readerCount = -95 - 4 = -99, readerWait = 5 - 4 = 1.
When the last read coroutine calls RUnlock(), readerCount becomes -100 and readerWait becomes 0. At this time, the sleeping coroutine w1 on the writerSem will be awakened.
Q5: When the write lock is obtained, the new coroutine takes the read lock. How to make the new coroutine block?
func (rw *RWMutex) RLock() { if atomic.AddInt32(&, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&, false, 0) } }
Continuing with the above scene, readerCount = -100 + 1 = -99 < 0.
The new read coroutine r1 is sleeping under readerSem.
Suppose that another read coroutine r2 is available at this time, then readerCount = -98 and is still sleeping.
Q6: When the write lock is obtained, the new coroutine takes the read lock and the write lock coroutine is released. How to wake up the waiting read lock coroutine?
Continue with the above scene, at which time coroutine w1 releases the write lock
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&, false, 0) } // Allow other writers to proceed. () }
Execution
atomic.AddInt32(&, rwmutexMaxReaders)
After, r = readerCount = -98 + 100 = 2, which means that there are two read coroutines, r1 and r2, waiting
ps: If some new coroutines want to get the read lock at this time, they will execute smoothly because readerCount = 2 + 1 = 3 > 0 and will not be blocked
After that, the for loop is executed twice, awakening both coroutine r1 and coroutine r2.
Q7: When the write lock is obtained, there are two coroutines to grab the read lock and the write lock respectively. When the write lock is released, who will win the two coroutines?
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&, false, 0) } // Allow other writers to proceed. () }
Since you wake up the read lock first and then call (), it is definitely the first time you read the coroutine to win!
Two points that I think are more clever
-
The entanglement between readerCount and rwmutexMaxReaders
pass
readerCount + rwmutexMaxReaders
as well asreaderCount - rwmutexMaxReaders
These two operations can tell whether there is currently a coroutine waiting/holding a write lock and the number of coroutines currently waiting/holding a read lock. -
The entanglement between readerCount and readerWait
When Lock(), the value of readerCount is assigned to readerWait. When readerWait = 0 instead of readerCount = 0, it wakes up the write coroutine, which can prevent the read coroutines achieved later in Lock() from being executed before the write coroutine.
This is the end of this article about the implementation example of go RWMutex. For more related go RWMutex content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!