Preface
As we all know,go
There are only two types of themchannel
, one is an unbuffered channel, which is declared as
ch := make(chan interface{})
The other is buffered channel, which is declared as
bufferSize := 5 ch := make(chan interface{},bufferSize)
For a buffered channel, no matter how big its buffer is, it has its limits. This limit is the bufferSize specified when the channel is initially made.
There is a limit to the size of jojo,buffer channel, I will not make a channel anymore.
oncechannel
If it is full, adding elements to it will block.
so how can we make a infinite buffer channel?
This article refers to an article above medinum. Interested students can directly read the original text.
accomplish
Interface design
First of all, of course, build onestruct
, with the help of Baidu Translation, we will use thisstruct
NamedInfiniteChannel
type InfiniteChannel struct { }
Think about itchannel
There are actually two core behaviors, one in (Fan in) and the other out (Fan out), so we add the following methods.
func (c *InfiniteChannel) In(val interface{}) { // todo } func (c *InfiniteChannel) Out() interface{} { // todo }
Internal implementation
passIn()
The data received must be stored in a place. We can use oneslice
Come and store, even if it is usedIn()
Added a lot of elements to it, you can also use itappend()
Go to expandslice
,slice
The capacity can be expanded infinitely (if the memory is sufficient), sochannel
Tooinfinite
。 InfiniteChannel
The first member was finalized.
type InfiniteChannel struct { data []interface{} }
User callIn()
andOut()
When it may be a concurrent environment,go
How to perform concurrent programming in the process, the easiest thing to think of ischannel
So we prepare two internallychannel
,oneinChan
,oneoutChan
,useinChan
To receive data, useoutChan
to leak data.
type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} } func (c *InfiniteChannel) In(val interface{}) { <- val } func (c *InfiniteChannel) Out() interface{} { return <- }
in,inChan
andoutChan
All are unbuffered channels.
In addition, it is definitely necessary to have aselect
To process frominChan
andoutChan
Events on the body. So we set up a new coroutine and do it in itselect
operate.
func (c *InfiniteChannel) background() { for true { select { case newVal := <-: = append(, newVal) case <- (): // pop() will remove the first element of the queue } } } func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), } go () // Note that there is another coroutine here return c }
ps: I feel that this is also a routine for go concurrent programming. Right now
- When new struct, go to a select coroutine, execute a for loop within the select coroutine, constantly select, and listen to events of one or more channels.
- The method provided by struct only operates the channel in struct (in this case inChan and outChan), and does not operate other data in struct (in this case, neither In() nor Out() directly operates data).
- After the channel event is triggered, the data is updated by the select coroutine (in this case data). Because only the select coroutine reads and writes data members except channel, and go ensures that concurrent read and writes of the channel are safe, the code is concurrently safe.
- If struct is exported, the user may cross new and make a struct manually. Consider setting struct to unexported and lowercase its initial letter.
pop()
The implementation is also very simple.
// Take out the first element of the queue. If the queue is empty, a nil will be returnedfunc (c *InfiniteChannel) pop() interface{} { if len() == 0 { return nil } val := [0] = [1:] return val }
Test it
One coroutine produces one data every second, and another coroutine consumes one data every half second, and prints it.
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { (i) () } }() for i := 0; i < 50; i++ { val := () (val) ( * 500) } }
// out <nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> Process finished with the exit code 0
You can see thatInfiniteChannel
When there is no data available for consumption, callOut()
Will return anil
, but this is also expected, becausepop()
When the queue is empty, nil will be returned.
at presentInfiniteChannel
behavior and standards ofchannel
There are differences in behaviors.go
In-housechannel
, if there is no data but you still have to fetch data, it will be blocked. How to achieve this effect?
optimization
I think this is the most skillful part of the whole article. When I saw it for the first time, I couldn't help but slap the censor.
First, put the original onebackground()
Pick it out
func (c *InfiniteChannel) background() { for true { select { case newVal := <-: = append(, newVal) case <- (): } } }
rightoutChan
Make a simple package
func (c *InfiniteChannel) background() { for true { select { case newVal := <-: = append(, newVal) case () <- (): } } } func (c *InfiniteChannel) outChanWrapper() chan interface{} { return }
So far, everything is as usual.
The finishing touch is here:
func (c *InfiniteChannel) outChanWrapper() chan interface{} { if len() == 0 { return nil } return }
existWhen empty, return a
nil
existbackground()
When executedcase <- ():
When it actually becomes:
case nil <- nil:
existgo
In the middle, it is impossible to go to onenil
ofchannel
send element in. For example
func main() { var c chan interface{} select { case c <- 1: } } // fatal error: all goroutines are asleep - deadlock! func main() { var c chan interface{} select { case c <- 1: default: ("hello world") } } // hello world
Therefore, for
select { case newVal := <-: = append(, newVal) case () <- (): }
Will be blocked all the timeselect
There, untilinChan
The data is coming.
Test it again
012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
Finally, the programpanic
Because it is deadlocked.
Replenish
In factchannel
Apart fromIn()
andOut()
In addition, there is another behavior, that is,close()
, if the element is still taken from channel close, the default value of this type will be taken out.
func main() { c := make(chan interface{}) close(c) for true { v := <-c (v) () } } // output // <nil> // <nil> // <nil> // <nil> func main() { c := make(chan interface{}) close(c) for true { v, isOpen := <-c (v, isOpen) () } } // output // <nil> false // <nil> false // <nil> false // <nil> false
We also need to achieve the same effect.
func (c *InfiniteChannel) Close() { close() } func (c *InfiniteChannel) background() { for true { select { case newVal, isOpen := <-: if isOpen { = append(, newVal) } else { = false } case () <- (): } } } func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), isOpen: true, } go () return c } func (c *InfiniteChannel) outChanWrapper() chan interface{} { // Here is a judgment on if && len() == 0 { return nil } return }
Test it again
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { (i) () } () // Close is called here }() for i := 0; i < 50; i++ { val := () (val) ( * 500) } }
// output 012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> Process finished with the exit code 0
Meet expectations
Pity
It looks perfect now, but it is standardchannel
Compared, there is still a gap. Because of the standardchannel
There is this usage
v,isOpen := <- ch
Can be passedisOpen
Variables to getchannel
opening and closing situation.
thereforeInfiniteChannel
A similar one should also be providedmethod
func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) { // todo }
Unfortunately, I want to knowInfiniteChannel
Is it trueOpen
, you must visitInfiniteChannel
InsideisOpen
member.
type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} isOpen bool }
andisOpen
Notchannel
Type, according to previous routines,This is notchannel
Members of the type should only beselect
Coroutine access. Once multiple coroutines are accessed, concurrency problems will occur unless locked.
I can't accept it! So I won't provide this method anymore, hehe.
Complete code
func main() { c := NewInfiniteChannel() go func() { for i := 0; i < 20; i++ { (i) () } () }() for i := 0; i < 50; i++ { val := () (val) ( * 500) } } type InfiniteChannel struct { inChan chan interface{} outChan chan interface{} data []interface{} isOpen bool } func (c *InfiniteChannel) In(val interface{}) { <- val } func (c *InfiniteChannel) Out() interface{} { return <- } func (c *InfiniteChannel) Close() { close() } func (c *InfiniteChannel) background() { for true { select { case newVal, isOpen := <-: if isOpen { = append(, newVal) } else { = false } case () <- (): } } } func NewInfiniteChannel() *InfiniteChannel { c := &InfiniteChannel{ inChan: make(chan interface{}), outChan: make(chan interface{}), isOpen: true, } go () return c } // Take out the first element of the queue. If the queue is empty, a nil will be returnedfunc (c *InfiniteChannel) pop() interface{} { if len() == 0 { return nil } val := [0] = [1:] return val } func (c *InfiniteChannel) outChanWrapper() chan interface{} { if && len() == 0 { return nil } return }
refer to
/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
The above is another detailed discussion on the channel method of go to implement infinite buffer. For more information about go to the channel channel, please follow my other related articles!