Introduction
There is a very famous saying in Go language that "share memory through communication means", channel is its best embodiment. Channel provides a mechanism that can synchronize two concurrently executed functions, and allows two functions to communicate by passing specific types of values to each other.
There are two initialization methods for channel, namely cached and cacheless:
make(chan int) // No cache chanmake(chan int, 10) // There is a cache chan
It is also very simple to use:
c := make(chan int) defer close(c) go func(){ c <- 5 // send }() n := <- c // recv
It has achieved very concise interaction between different coroutines.
Internal structure
The implementation of chan is in runtime/, which is a structure of hchan:
type hchan struct { qcount uint // Number of data in the queue dataqsiz uint // The size of the ring queue, the channel itself is a ring queue buf // Pointers that store actual data, use the storage address to avoid gc elemsize uint16 closed uint32 // Identify whether the channel is closed elemtype *_type // Data Element Type sendx uint // send index recvx uint // index of recv recvq waitq // Blocking the queue in recv sendq waitq // Blocking the queue in send lock mutex // Lock}
It can be seen that the channel itself is a ring buffer, and the data is stored on the heap. The synchronization of the channel is achieved through locks, not in the imagined lock-free method. There are two queues in the channel, one is the sending blocking queue and the other is the receiving blocking queue. When sending data to a full channel, it will be blocked, and the sending coroutine will be added to sendq. Similarly, when receiving data to an empty channel, the receiving coroutine will also be blocked and placed in recvq.
waitq is a linked list that simply encapsulates the g structure.
3. Create a channel
When we create a channel through make in the code, the following function is actually called:
CALL (SB)
The implementation of makechan is as follows:
func makechan(t *chantype, size int) *hchan { elem := //Judge the size of the element type if >= 1<<16 { throw("makechan: invalid channel element type") } // Determine alignment restrictions if hchanSize%maxAlign != 0 || > maxAlign { throw("makechan: bad alignment") } // Determine whether the size non-negative sum is greater than the maxAlloc limit mem, overflow := (, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // No buffer, that is, make has not set size c = (*hchan)(mallocgc(hchanSize, nil, true)) = () case == 0: // The data type does not contain pointers c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) = add((c), hchanSize) default: // If a pointer is included // Elements contain pointers. c = new(hchan) = mallocgc(mem, elem, true) } = uint16() = elem = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", , "; dataqsiz=", size, "\n") } return c }
According to the above code, we can see that creating a channel is divided into three situations:
1. The first buffer size is 0. At this time, only the memory of hchansize is needed is OK.
2. The second buffer size is not 0, and the channel type does not contain a pointer. At this time, buf is a continuous memory of hchanSize+element size*number of elements
3. The third buffer size is not 0 and the channel type contains pointers, so memory cannot be simply applied for based on the size of the element. Memory needs to be allocated through mallocgc.
4. Send data
Sending data will call the following interface:
CALL runtime.chansend1(SB)
chansend1 will call the chansend interface, and the chansend method is signed as follows:
func chansend(c *hchan, ep , block bool, callerpc uintptr) bool
c is a specific channel, ep is the data sent, and block is true to indicate blocking transmission. Generally, data sent to the channel is blocked. If the channel data is full, it will be blocked here. However, if a case is listening for the sending of a channel in the select, the block parameter at this time is false, and the subsequent analysis of the select implementation will be discussed.
select { case <-c: // This is non-blocking sending // do some thing default: // do some thing }
The chansend interface will make judgments on some conditions
If you send data to a channel with nil, if it is blocking, it will block continuously:
if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
First, locking will be added to ensure atomicity. If data is sent to a closed channel, it will be panic.
lock(&) if != 0 { unlock(&) panic(plainError("send on closed channel")) }
If there is a waiting coroutine in recvq at this time, directly call the send function to copy the data to the receiver, which is implemented as follows:
// sg is the receiver coroutine, ep is the sending elementfunc send(c *hchan, sg *sudog, ep , unlockf func(), skip int) { if raceenabled { if == 0 { racesync(c, sg) } else { qp := chanbuf(c, ) raceacquire(qp) racerelease(qp) raceacquireg(, qp) racereleaseg(, qp) ++ if == { = 0 } = // = (+1) % } } if != nil { sendDirect(, sg, ep) = nil } gp := unlockf() = (sg) if != 0 { = cputicks() } goready(gp, skip+1) }
If there is no waiting coroutine at this time and the data is not full, copy the data into the ring buffer and move the position back one by one.
if < { // If not full // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, ) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(, qp, ep) ++ if == { = 0 } ++ unlock(&) return true }
If the ring buffer data is full at this time, if the sending is blocked, the sender will be placed in the sendq queue.
5. Receive data
Receiving data will call the following interface:
CALL runtime.chanrecv1(SB)
chanrecv1 will call the chanrecv interface, and the chanrecv method signature is as follows:
func chanrecv(c *hchan, ep , block bool) (selected, received bool)
c refers to the channel that needs to be operated. The received data will be written to ep. The block is the same as in send, indicating whether it is blocking reception or non-blocking reception. Non-blocking reception refers to the case receiving a channel value in select:
select { case a := <-c: // This is non-blocking reception, no data is returned directly // do some thing default: // do some thing }
First of all, chanrecv will also do some parameter verification
If the channel is nil and is non-blocking mode, return directly. If it is blocking mode, wait forever
if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
Locks are then added to prevent competition for reading and writing
lock(&)
If you receive data from a closed channel and there is still data in the channel at this time, then you can still receive data, which is a normal data reception situation.
If you receive data from a closed channel and there is no data in the channel, then the returned time is (true, false), indicating that there is a value to return, but not the value we need:
if != 0 && == 0 { if raceenabled { raceacquire(()) } unlock(&) if ep != nil { typedmemclr(, ep) // Set the memory block pointed to by ep 0 } return true, false }
Reception is also divided into three situations:
If the sender in sendq is blocking at this time, the recv function will be called:
func recv(c *hchan, sg *sudog, ep , unlockf func(), skip int) { if == 0 { if raceenabled { racesync(c, sg) } if ep != nil { recvDirect(, sg, ep) } } else { qp := chanbuf(c, ) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(, qp) racereleaseg(, qp) } // copy data from queue to receiver if ep != nil { typedmemmove(, ep, qp) } // copy data from sender to queue typedmemmove(, qp, ) ++ if == { = 0 } = // = (+1) % } = nil gp := unlockf() = (sg) if != 0 { = cputicks() } goready(gp, skip+1) }
At this time, the sender is waiting, indicating that the data in the channel is full at this time. At this time, the data in the channel header will be copied to the receiver, and then the data of the sender in the sender's queue header will be copied to that location. This involves two copy operations.
The second case is that if there is no sender waiting, the data will be copied into the channel:
if > 0 { // Receive directly from queue qp := chanbuf(c, ) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(, ep, qp) } typedmemclr(, qp) ++ if == { = 0 } -- unlock(&) return true, true }
In the third case, if there is no data in the channel, if it is a non-blocking reception, it will directly return false. If it is a blocking reception, it will put the receiver's coroutine into the channel's recvq.
6. Close channel
When closing the channel, the following interface will be called:
func closechan(c *hchan)
First, some data verification will be done:
if c == nil { panic(plainError("close of nil channel")) } lock(&) if != 0 { unlock(&) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc((), callerpc, funcPC(closechan)) racerelease(()) } = 1 //Set the close mark bit
If you initiate a close operation to a channel with nil or a closed channel, it will be panic.
Then all coroutines in recvq or sendq will be awakened:
var glist gList // release all readers for { sg := () if sg == nil { break } if != nil { typedmemclr(, ) = nil } if != 0 { = cputicks() } gp := = nil if raceenabled { raceacquireg(gp, ()) } (gp) } // release all writers (they will panic) for { sg := () if sg == nil { break } = nil if != 0 { = cputicks() } gp := = nil if raceenabled { raceacquireg(gp, ()) } (gp) } unlock(&)
If a recipient exists, the received data is set to 0 by typedmemclr.
If a sender exists, all senders will be panic.
7. Summary
To sum up the analysis, there are a few things to pay attention to when using channel
1. Make sure that all data is sent before closing the channel, and the sender will close it.
2. Do not close the channel repeatedly
3. Do not send values to channels that are nil
4. Do not receive values into channels that are nil
5. When receiving data, you can determine whether it is OK by the return value.
n , ok := <- c if ok{ // do some thing }
This prevents the channel from returning zero value after being closed, which will affect the business
This is the end of this article about the detailed explanation of the underlying code implementation of GoLang channel. For more related GoLang channel content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!