SoFunction
Updated on 2025-03-05

Again, discuss the channel method of go to implement infinite buffers

Preface

As we all know,goThere are only two types of themchannel, one is an unbuffered channel, which is declared as

ch := make(chan interface{})

The other is buffered channel, which is declared as

bufferSize := 5
ch := make(chan interface{},bufferSize)

For a buffered channel, no matter how big its buffer is, it has its limits. This limit is the bufferSize specified when the channel is initially made.

There is a limit to the size of jojo,buffer channel, I will not make a channel anymore.

oncechannelIf it is full, adding elements to it will block.

so how can we make a infinite buffer channel?

This article refers to an article above medinum. Interested students can directly read the original text.

accomplish

Interface design

First of all, of course, build onestruct, with the help of Baidu Translation, we will use thisstructNamedInfiniteChannel

type InfiniteChannel struct {
}

Think about itchannelThere are actually two core behaviors, one in (Fan in) and the other out (Fan out), so we add the following methods.

func (c *InfiniteChannel) In(val interface{}) {
	// todo
}

func (c *InfiniteChannel) Out() interface{} {
	// todo
}

Internal implementation

passIn()The data received must be stored in a place. We can use onesliceCome and store, even if it is usedIn()Added a lot of elements to it, you can also use itappend()Go to expandslicesliceThe capacity can be expanded infinitely (if the memory is sufficient), sochannelTooinfiniteInfiniteChannelThe first member was finalized.

type InfiniteChannel struct {
	data    []interface{}
}

User callIn()andOut()When it may be a concurrent environment,goHow to perform concurrent programming in the process, the easiest thing to think of ischannelSo we prepare two internallychannel,oneinChan,oneoutChan,useinChanTo receive data, useoutChanto leak data.

type InfiniteChannel struct {
	inChan  chan interface{}
	outChan chan interface{}
	data    []interface{}
}

func (c *InfiniteChannel) In(val interface{}) {
	 <- val
}

func (c *InfiniteChannel) Out() interface{} {
	return <-
}

in,inChanandoutChanAll are unbuffered channels.

In addition, it is definitely necessary to have aselectTo process frominChanandoutChanEvents on the body. So we set up a new coroutine and do it in itselectoperate.

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal := &lt;-:
			 = append(, newVal)
        case  &lt;- ():		// pop() will remove the first element of the queue		}
	}
}
func NewInfiniteChannel() *InfiniteChannel {
	c := &amp;InfiniteChannel{
		inChan:  make(chan interface{}),
		outChan: make(chan interface{}),
	}
	go ()	// Note that there is another coroutine here	return c
}

ps: I feel that this is also a routine for go concurrent programming. Right now

  1. When new struct, go to a select coroutine, execute a for loop within the select coroutine, constantly select, and listen to events of one or more channels.
  2. The method provided by struct only operates the channel in struct (in this case inChan and outChan), and does not operate other data in struct (in this case, neither In() nor Out() directly operates data).
  3. After the channel event is triggered, the data is updated by the select coroutine (in this case data). Because only the select coroutine reads and writes data members except channel, and go ensures that concurrent read and writes of the channel are safe, the code is concurrently safe.
  4. If struct is exported, the user may cross new and make a struct manually. Consider setting struct to unexported and lowercase its initial letter.

pop()The implementation is also very simple.

// Take out the first element of the queue. If the queue is empty, a nil will be returnedfunc (c *InfiniteChannel) pop() interface{} {
	if len() == 0 {
		return nil
	}
	val := [0]
	 = [1:]
	return val
}

Test it

One coroutine produces one data every second, and another coroutine consumes one data every half second, and prints it.

func main() {
	c := NewInfiniteChannel()
	go func() {
		for i := 0; i < 20; i++ {
			(i)
			()
		}
	}()

	for i := 0; i < 50; i++ {
		val := ()
		(val)
		( * 500)
	}
}
// out
<nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
Process finished with the exit code 0

You can see thatInfiniteChannelWhen there is no data available for consumption, callOut()Will return anil, but this is also expected, becausepop()When the queue is empty, nil will be returned.

at presentInfiniteChannelbehavior and standards ofchannelThere are differences in behaviors.goIn-housechannel, if there is no data but you still have to fetch data, it will be blocked. How to achieve this effect?

optimization

I think this is the most skillful part of the whole article. When I saw it for the first time, I couldn't help but slap the censor.

First, put the original onebackground()Pick it out

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal := <-:
			 = append(, newVal)
		case  <- ():
		}
	}
}

rightoutChanMake a simple package

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal := <-:
			 = append(, newVal)
		case () <- ():
		}
	}
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
	return 
}

So far, everything is as usual.

The finishing touch is here:

func (c *InfiniteChannel) outChanWrapper() chan interface{} {
	if len() == 0 {
		return nil
	}
	return 
}

existWhen empty, return anil

existbackground()When executedcase <- ():When it actually becomes:

case nil <- nil:

existgoIn the middle, it is impossible to go to onenilofchannelsend element in. For example

func main() {
	var c chan interface{}
	select {
	case c <- 1:
	}
}
// fatal error: all goroutines are asleep - deadlock!
func main() {
	var c chan interface{}
	select {
	case c <- 1:
	default:
		("hello world")
	}
}
// hello world

Therefore, for

select {
case newVal := <-:
	 = append(, newVal)
case () <- ():
}

Will be blocked all the timeselectThere, untilinChanThe data is coming.

Test it again

012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!

Finally, the programpanicBecause it is deadlocked.

Replenish

In factchannelApart fromIn()andOut()In addition, there is another behavior, that is,close(), if the element is still taken from channel close, the default value of this type will be taken out.

func main() {
	c := make(chan interface{})
	close(c)
	for true {
		v := <-c
		(v)
		()
	}
}
// output
// <nil>
// <nil>
// <nil>
// <nil>
func main() {
	c := make(chan interface{})
	close(c)
	for true {
		v, isOpen := <-c
		(v, isOpen)
		()
	}
}
// output
// <nil> false
// <nil> false
// <nil> false
// <nil> false

We also need to achieve the same effect.

func (c *InfiniteChannel) Close() {
	close()
}

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal, isOpen := &lt;-:
			if isOpen {
				 = append(, newVal)
			} else {
				 = false
			}
		case () &lt;- ():
		}
	}
}

func NewInfiniteChannel() *InfiniteChannel {
	c := &amp;InfiniteChannel{
		inChan:  make(chan interface{}),
		outChan: make(chan interface{}),
		isOpen:  true,
	}
	go ()
	return c
}

func (c *InfiniteChannel) outChanWrapper() chan interface{} {
    // Here is a judgment on	if  &amp;&amp; len() == 0 {
		return nil
	}
	return 
}

Test it again

func main() {
	c := NewInfiniteChannel()
	go func() {
		for i := 0; i &lt; 20; i++ {
			(i)
			()
		}
		()		// Close is called here	}()

	for i := 0; i &lt; 50; i++ {
		val := ()
		(val)
		( * 500)
	}
}
// output
012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
Process finished with the exit code 0

Meet expectations

Pity

It looks perfect now, but it is standardchannelCompared, there is still a gap. Because of the standardchannelThere is this usage

v,isOpen := <- ch

Can be passedisOpenVariables to getchannelopening and closing situation.

thereforeInfiniteChannelA similar one should also be providedmethod

func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {
	// todo
}

Unfortunately, I want to knowInfiniteChannelIs it trueOpen, you must visitInfiniteChannelInsideisOpenmember.

type InfiniteChannel struct {
	inChan  chan interface{}
	outChan chan interface{}
	data    []interface{}
	isOpen  bool
}

andisOpenNotchannelType, according to previous routines,This is notchannelMembers of the type should only beselectCoroutine access. Once multiple coroutines are accessed, concurrency problems will occur unless locked.

I can't accept it! So I won't provide this method anymore, hehe.

Complete code

func main() {
	c := NewInfiniteChannel()
	go func() {
		for i := 0; i &lt; 20; i++ {
			(i)
			()
		}
		()
	}()

	for i := 0; i &lt; 50; i++ {
		val := ()
		(val)
		( * 500)
	}
}

type InfiniteChannel struct {
	inChan  chan interface{}
	outChan chan interface{}
	data    []interface{}
	isOpen  bool
}

func (c *InfiniteChannel) In(val interface{}) {
	 &lt;- val
}

func (c *InfiniteChannel) Out() interface{} {
	return &lt;-
}

func (c *InfiniteChannel) Close() {
	close()
}

func (c *InfiniteChannel) background() {
	for true {
		select {
		case newVal, isOpen := &lt;-:
			if isOpen {
				 = append(, newVal)
			} else {
				 = false
			}
		case () &lt;- ():
		}
	}
}

func NewInfiniteChannel() *InfiniteChannel {
	c := &amp;InfiniteChannel{
		inChan:  make(chan interface{}),
		outChan: make(chan interface{}),
		isOpen:  true,
	}
	go ()
	return c
}

// Take out the first element of the queue. If the queue is empty, a nil will be returnedfunc (c *InfiniteChannel) pop() interface{} {
	if len() == 0 {
		return nil
	}
	val := [0]
	 = [1:]
	return val
}

func (c *InfiniteChannel) outChanWrapper() chan interface{} {
	if  &amp;&amp; len() == 0 {
		return nil
	}
	return 
}

refer to

/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd

The above is another detailed discussion on the channel method of go to implement infinite buffer. For more information about go to the channel channel, please follow my other related articles!