1. Introduction to the concept of lock
First of all, why do you need a lock?
In concurrent programming, multiple threads or processes may access and modify the same shared resource (such as variables, data structures, files), etc. If a suitable synchronization mechanism is not introduced, the following problems will be caused:
- Data competition: Multiple threads modify a resource at the same time, and the final result is related to the execution order of the threads, and the result is unpredictable.
- Data inconsistent: One thread is modifying the resource, while another thread reads unmodified data, resulting in reading the wrong data.
- Resource competition: Multi-thread competition for the same resource is wasted system performance.
Therefore, we need a lock to ensure that only one person can write data at the same time and ensure the correctness and consistency of shared resources under concurrent access.
Here, two common concurrency control processing mechanisms are introduced, namely optimistic lock and pessimistic lock:
Optimistic lock: Assuming that in concurrent operations, resource preemption is not very fierce and the possibility of data being modified is not very high. At this time, there is no need to lock the shared resource area before operation. Instead, the data is modified first, and finally determine whether the data has been modified. If it has not been modified, the modification is submitted, otherwise try again.
Pessimistic lock: In contrast to optimistic lock, it assumes that the resource competition in the scenario is fierce, and access to shared resource areas must require holding locks.
For different scenarios, we need to adopt strategies that adapt to local conditions, and compare optimistic and pessimistic factors. Their advantages and disadvantages are obvious:
Strategy | advantage | shortcoming |
---|---|---|
Optimistic lock | No need for actual locks, high performance | If there is a conflict, it is necessary to perform again. Retrying multiple times may cause significant performance degradation. |
Pessimistic lock | Accessing data must be held to ensure the correctness of data in concurrent scenarios | During locking, other threads waiting for lock need to be blocked, with low performance |
2、
Go's implementation of stand-alone locks takes into account the changes in the degree of resource competition between coroutines in the actual environment, and formulates a set of lock upgrade processes. The specific plan is as follows:
- First, take an optimistic attitude, Goroutine will maintain its spin state and try to acquire the lock through CAS operation.
- When multiple acquisitions fail, the optimistic attitude will turn from a pessimistic attitude, determine the current competition for concurrent resources is fierce, and enter the blocking state and wait to be awakened.
The judgment rules for turning from optimism to pessimism are as follows:
- Goroutine spin attempts exceed 4 times
- There is a G waiting for execution in the execution queue of P (avoiding spin affecting GMP scheduling performance)
- The CPU is single-core (other Goroutines cannot execute, spin is meaningless)
In addition, in order to prevent blocked coroutines from waiting for too long and not getting locks, resulting in a decline in the overall user experience, the concept of hunger was introduced:
- Hungry state: If the Goroutine is blocked and waited for >1ms, the coroutine is considered to be in a hungry state
- Hunger mode: Indicates whether the current lock is in a specific mode. In this mode, the handover of the lock is fair and handed over to the coroutine that has been waiting for the longest time in order.
The rules for the transition between hunger mode and normal mode are as follows:
Normal mode -> Hunger mode: There is a blocking coroutine, the blocking time exceeds 1ms
Hungry mode -> Normal mode: if the blocking queue is cleared, or the waiting time of the coroutine that acquires the lock is less than 1ms, then restore
Next, enter the source code and watch the specific implementation.
2.1. Data structure
Located in the package sync/, the definition of the lock is as follows:
type Mutex struct { state int32 sema uint32 }
- state: Identify the status information of the current lock, including whether it is in hungry mode, whether there is a wake-up blocking coroutine, whether it is locked, and how many coroutines are waiting for locks.
- seme: Semaphore used to block and wake up coroutines.
Think of state as a binary string, and the rules for storing information are as follows:
- The first bit sign is locked, 0 means no, 1 means locked (mutexLocked)
- The second bit identifies whether there is a wake-up blocking coroutine (mutexWoken)
- The third digit is in starvation mode (mutexStarving)
- Starting from the fourth bit, the number of coroutines in the blocking state is recorded
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 //Hunger Threshold)
2.2. Obtain the lock Lock()
func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&, 0, mutexLocked) { return } () }
Try to directly acquire the lock through the CAS operation. If it succeeds, it will return. Otherwise, it means that the lock has been acquired and enter LockSlow.
2.3、LockSlow()
The source code is long, please explain it in splitting:
var waitStartTime int64 starving := false awoke := false iter := 0 old :=
(1) The basic constants are defined, with the following meanings:
- waitStartTime: Record the current coroutine waiting time, and will only be used if it is blocked.
- awoke: Identifies whether the current coroutine is awakened by Unlock
- Iter: Record the current number of coroutine spin attempts
- old: record the status information of the old lock
for { //In locked and not in hungry state, and the current coroutine allows to continue spinning if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = continue } //... }
(2) Enter the loop trying to acquire the lock, and two ifs represent:
- If lockedLocked, and notIn a state of hunger, and the current coroutineAllow to continue spinning(Non-single-core CPU, spin number <=4, and the local queue of scheduler P does not have G waiting for execution), then enter:
- If the current coroutineNot awakened from the waiting queue,andThere is no awakened waiting coroutine,andThere are coroutines located in blocking, trySet the mutexWoken flag to 1, if successful:
- Identifies the current coroutine as the awakened coroutine. (Although it is not actually awakened from the blockage)
- Tell P that the current coroutine is in a spin state
- renew
iter
Counter, withold
Recorded status information of the current lock, perform the next retry loop
- If the current coroutineNot awakened from the waiting queue,andThere is no awakened waiting coroutine,andThere are coroutines located in blocking, trySet the mutexWoken flag to 1, if successful:
The only doubt that exists here is why awoke should be identified as true?
First of all, because the current lock is not in starvation mode, the current preemption lock mode is unfair. If the current lock's blocking queue has not been awakened, it is required not to wake up and try to get the currently trying coroutine to acquire the lock to avoid awakening the coroutine for resource competition.
for { //... new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { new &^= mutexWoken } //... }
(3) Status update:
When the coroutine comes out of step 2, it can only be stated that it is in one of the following two states:
- Can't spin, or the lock enters hunger mode, the lock needs to be given to others, anywayCan't get the lock(pessimistic).
- The lock was released.
Regardless of the matter, some status updates are required to prepare for the next plan.
Use new to store the new status information that a lock is about to enter, and update the rules:
- If the lock is not in hunger mode: it means that the lock may be released or it may be that it has spinned too much, regardless of whether the lock can be obtained next.Locks will be acquired by a coroutine, therefore
mutexLocked
is 1. - If the lock may be in a hungry state, or the lock is not released: that means you can't get the lock.Coming into blocking state, blocking coroutine counter +1.
- If the current coroutine is awakened and is already in a hungry state and the lock is still locked:Locks enter absolutely fair hunger mode。
- If the current coroutine is awakened: Clear
mutexWoken
Identify bits, because nextCoroutines may need to be awakened(Hunger Mode).
Although there are a lot of updates,In summary:
- If the lock is released, then just mark it as the lock will be acquired next.
- If the lock is not released and the current coroutine has been waiting for a long time, the lock will enter a hungry state, and the blocking coroutine needs to be awakened next.
(4) Try to update the information:
if atomic.CompareAndSwapInt32(&, old, new) { //... } else { old = }
Next, try to update new into state. If the update fails, it means that another coroutine has intervened. In order to prevent data consistency loss, you must do it all again.
(5) The status update is successful. The specific judgment is whether to sleep or acquire the lock successfully:
Step into the if main branch of step 4, there are two states:
if atomic.CompareAndSwapInt32(&, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } //... } else { //... }
Because of the current state, it may be that the lock has been released, check whether the lock has been released before it is updated and it is not hungry mode. If it is, it means that the lock has been successfully acquired and the function is over.
if atomic.CompareAndSwapInt32(&, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&, queueLifo, 2) //.... } else { //... }
Otherwise, it means that the current coroutine is about to enter a blocking state. Record the time when the blocking starts. This is used to wake up to determine whether you are hungry. Then enter the blocking sleep.
(6) If step 5 enters blocking, then after being awakened:
if atomic.CompareAndSwapInt32(&, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&, queueLifo, 2) //wake starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = //If the lock is in hunger mode if old&mutexStarving != 0 { //Exception handling of lock if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } //The semaphore to be updated delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&, delta) break } awoke = true iter = 0 //.... } else { //... }
Wake up from blocking, first calculate the blocking time of some coroutines, as well as the current latest lock state.
If the lock is in hungry mode: then the current coroutine will directly acquire the lock. The current coroutine is awakened because of hungry mode, and no other coroutine preempts the lock. Then update the semaphore, record the number of blocking coroutines -1, and set the locked state of the lock to 1. If the currently awakened coroutine from hunger mode has less than 1ms or the last waiting coroutine is the last waiting coroutine, the lock will be converted from hunger mode to normal mode. At this point, the function is successfully obtained and exited.
Otherwise, it's just a normal random wake-up, so you start trying to preempt and return to step 1.
2.4. Release the lock Unlock()
func (m *Mutex) Unlock() { //Directly release the lock new := atomic.AddInt32(&, -mutexLocked) if new != 0 { (new) } }
Through atomic operation, the mutexLocked flag of the lock is set to 0 directly. If the lock status is not 0 after setting 0, it means that there is a coroutine that needs to acquire the lock and enter unlockSlow.
2.5、unlockSlow()
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&, old, new) { runtime_Semrelease(&, false, 2) return } old = } } else { runtime_Semrelease(&, true, 2) } }
(1) First, the exception state processing is performed. If a lock has been released, then the fatal is directly and the program terminates.
if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }
(2) If the lock is not in a hungry state:
- If the number of waiting coroutines at this time is 0, or the lock is locked, contains the awakened coroutines, and the lock is in hunger mode: it means that a new coroutine has intervened in the process and has completed the handover, and you can exit directly.
- Wake up a coroutine in a blocked state.
Otherwise, in a state of hunger, awaken the coroutine that has been waiting for the longest time.
3、
For the operations of shared resource areas, they can be divided into two categories: read and write. Suppose in a scenario, the operation of continuing to read on the shared resource area is much larger than the operation of write. If each coroutine read operation needs to acquire a mutex, the performance loss will be very large.
RWMutex is a lock that can be used to improve performance in read operations > write operations, which can be regarded as consisting of a read lock and a write lock. The operating rules are as follows:
- A read lock allows multiple read coroutines to read the shared resource area at the same time. If a coroutine needs to modify the data in the resource area, it needs to be blocked.
- Write locks are strictly exclusive, and no other goroutine is allowed to be accessed when the shared resource area is loaded with a write lock.
- It can be seen that in the worst case, when all coroutines require write operations, the read and write lock will degenerate into ordinary Mutex.
3.1. Data structure
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount atomic.Int32 // number of pending readers readerWait atomic.Int32 // number of departing readers } const rwmutexMaxReaders = 1 << 30 //The maximum number of read coroutines
- w: A mutually exclusive write lock
- writerSem: The semaphore associated with blocked write coroutines
- readerSem: The semaphore associated with blocked read coroutine
- readerCount: Under normal circumstances, record the number of coroutines being read; but if the current write coroutine is holding a lock, then the actual number of coroutines being recorded is readerCount - rwmutexMaxReader
- readerWait: record the number of release of the next write coroutine and waiting for the read coroutine to complete.
3.2. Read lock process RLock()
func (rw *RWMutex) RLock() { if (1) < 0 { // A writer is pending, wait for it. runtime_SemacquireRWMutexR(&, false, 0) } }
For readerCount+1, it means that a new read coroutine is added. If the result is <0, it means that the current lock is being occupied by the write coroutine, causing the current read coroutine to block.
3.3. Read and release lock process RUnlock()
func (rw *RWMutex) RUnlock() { if r := (-1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined (r) } }
For readerCount-1, it means that one read coroutine is reduced. If the result is <0, it means that the current lock is being occupied by the write coroutine and enters runlockslow.
3.4、rUnlockSlow()
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { () fatal("sync: RUnlock of unlocked RWMutex") } if (-1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&, false, 1) } }
First, error processing is performed. If you find that the current coroutine occupies an overread lock, or the number of coroutines in the read process is upper limit, the system will have an exception, fatal.
Otherwise, for readerWait-1, if the result is 0, it means that the current coroutine is the last coroutine to intervene in the read lock process, and a write lock needs to be released at this time.
3.5. Write lock process Lock()
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. () // Announce to readers there is a pending writer. r := (-rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && (r) != 0 { runtime_SemacquireRWMutex(&, false, 0) } }
First try to obtain the write lock. If the acquisition is successful, you need to use readerCount - the maximum number of read coroutines, indicating that the lock is now occupied by the read coroutines.
r represents the number of coroutines in the reading process. If r is not 0, add readerWait to r, and then write them after all these coroutines are read. Block this write coroutine. (Read and write locks are not fair to read and write, and read coroutines are preferred.)
3.6. Write release lock process Unlock()
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := (rwmutexMaxReaders) if r >= rwmutexMaxReaders { () fatal("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&, false, 0) } // Allow other writers to proceed. () }
Reset readerCount to normal finger, indicating that the write lock has been released. If the read coroutine exceeds the maximum limit, it is abnormal.
Then wake up all blocked read coroutines. (Reading coroutines is preferred)
Unlock.
The above is the detailed explanation of the specific implementation of the single-machine lock in golang. For more information about the single-machine lock, please pay attention to me.