SoFunction
Updated on 2025-03-05

Design and implementation analysis of Golang infinite cache channel

1. Introduction

There are two types of channels in Go language, one is the channel without cache, and the other is the channel with cache. However, for channels with cache, the cache length is fixed when created and cannot be expanded in the middle, which makes it inconvenient for certain specific business scenarios.

The business scenarios are as follows:

Crawler scene, want to crawl all URLs that can be reached on a URL page

A pending URL exists in a channle

A bunch of worker groutines read URLs from channle, download and parse web pages and extract URLs, and then put URLs into channle

In this scenario, using message queues or sync packages can solve this problem, but it is quite complicated. If there is a Channle that can be cached infinitely, it may be a better solution.

2. Design

Based on the above specific business scenarios, our unlimited cached Channle should meet the following requirements:

Unlimited cache, the most core basic requirement.

You cannot block writes. The reason why the write operations of ordinary channle blocks is because the cache is full, and this problem should not exist in the infinitely cached channle.

Blocking reads when there is no data, this feature remains the same as normal channle.

Reading and writing should be operated through channle: the <- and -> of channle are convenient, and still follow the syntax of ordinary channle, and the second is that the internal cache cannot be exposed.

After the channle is closed, the unread data should still be readable. This feature is consistent with the normal channle.

It can automatically expand and scale based on the data volume. When the data volume is large, it is required to expand adaptively. After the data volume becomes smaller, in order to avoid memory waste, it is required to shrink adaptively.

In response to the above requirements, the design ideas are as follows:

There are two ordinary channles inside, used for reading and writing respectively. We call them In and Out, write data into In, and then read data from Out

There is a buf that can adaptively expand and scale. When the channle is full and cannot be written, it is written into this buf.

Inside contains a working goroutine, always put data in In into Out or buf

The internal adaptive expansion capacity buf can adopt a bidirectional ring link list

Compared with the implementation of arrays, the advantages are as follows:

There are limitations on the size of the array, and the language level cannot achieve real infinite cache.

The cost of expanding the array is high, and using a bidirectional ring-link list only requires adding nodes, and the same is true for shrinking the volume.

type T interface{}
type UnlimitSizeChan struct {
bufCount int64    // Statistics the number of elements, atomic operationsIn       chan&lt;- T // Write to channelOut      &lt;-chan T    // Read channelbuffer   *RingBuffer // Adaptive expansion capacity Buf}

Bidirectional ring-linked list How to write and read data and achieve adaptive expansion?

The structure of the bidirectional ring link list is similar to a bracelet. The beads on the bracelet can be regarded as a node, and each node can be an array of fixed size.

There are two read and write pointers readCell and writeCell on the bidirectional ring list buf, pointing to the cell that will perform read and write operations, and are responsible for reading and writing data.

readCell will always catch up with writeCell. When you catch up, the meaning is full and you will perform the expansion operation

The capacity expansion operation is to insert a newly created free cell behind the write pointer.

When there is no data in buf, it means that the traffic peak at this time should have passed, and the capacity reduction operation should be performed

The volume reduction operation can modify the link list pointer, let the buf restore it to its original state, and only keep two cells. Since other cells are no longer referenced, they will be automatically recycled by GC.

There are also two read and write pointers r and w on the cell, which are responsible for reading and writing on the cell, and the r read pointer always catches up with w write pointer

type cell struct {
	Data     []T   // Data section	fullFlag bool  // cell full sign	next     *cell // Point to the next cellBuffer	pre      *cell // Point to the previous cellBuffer	r int // The next pointer to read	w int // The next pointer to be placed}
type RingBuffer struct {
	cellCount int // cell count	readCell  *cell // The next cell to read	writeCell *cell // The next cell to be written}

How are the data FIFO principles guaranteed?

Unlimited cache of Goroutine inside Channle, we call it Worker

When the Out channle is not full and there is no data in the Buf, the Worker will read the data in In and put it into Out until Out is full

When there is data in Buf, regardless of whether Out is full, the data read in In will be directly written into Buf. The purpose is to ensure the FIFO principle of the data

When the cell mark is full, even if part of the data has been read in this cell, this cell cannot be used to write before reading all data. The purpose is to ensure the FIFO principle of data

3. Realization

1. Two-way ring link implementation

package unlimitSizeChan
import (
	"errors"
	"fmt"
)
var ErrRingIsEmpty = ("ringbuffer is empty")
// CellInitialSize The initial capacity of the cellvar CellInitialSize = 1024
// CellInitialCount Initializes the number of cellsvar CellInitialCount = 2
type cell struct {
	Data     []T   // Data section	fullFlag bool  // cell full sign	next     *cell // Point to the next cellBuffer	pre      *cell // Point to the previous cellBuffer	r int // The next pointer to read	w int // The next pointer to be placed}
type RingBuffer struct {
	cellCount int // cell count	readCell  *cell // The next cell to read	writeCell *cell // The next cell to be written}
// NewRingBuffer Create a new ringbuffe, including two cellsfunc NewRingBuffer() *RingBuffer {
	rootCell := &amp;cell{
		Data: make([]T, CellInitialSize),
	}
	lastCell := &amp;cell{
		Data: make([]T, CellInitialSize),
	}
	 = lastCell
	 = rootCell
	 = lastCell
	 = rootCell
	return &amp;RingBuffer{
		cellCount: CellInitialCount,
		readCell:  rootCell,
		writeCell: rootCell,
	}
}
// Read Read datafunc (r *RingBuffer) Read() (T, error) {
	// No data	if () {
		return nil, ErrRingIsEmpty
	}
	// Read data and move the read pointer one by one to the right	value := []
	++
	// This cell has been read	if  == CellInitialSize {
		// Read the pointer to zero and set the cell state to non-full		 = 0
		 = false
		// Point readCell to the next cell		 = 
	}
	return value, nil
}
// Pop Read an element and move the pointer after reading itfunc (r *RingBuffer) Pop() T {
	value, err := ()
	if err != nil {
		panic(())
	}
	return value
}
// Peek Peek Read an element, read only but not move the pointerfunc (r *RingBuffer) Peek() T {
	if () {
		panic(())
	}
	// Read only	value := []
	return value
}
// Write to write datafunc (r *RingBuffer) Write(value T) {
	// Write data at the position, and move the pointer one by one to the right	[] = value
	++
	// The cell is full	if  == CellInitialSize {
		// Set pointer 0, mark the cell as full, and point to the next cell		 = 0
		 = true
		 = 
	}
	// The next cell is full, the capacity is expanded	if  == true {
		()
	}
}
// grow expansionfunc (r *RingBuffer) grow() {
	// Create a new cell	newCell := &amp;cell{
		Data: make([]T, CellInitialSize),
	}
	// A total of three cells, writeCell, preCell, newCell	// Original relationship: preCell <===> writeCell	// Now insert newcell: preCell <===> newCell <===> writeCell	pre := 
	 = newCell
	 = pre
	 = 
	 = newCell
	// Point writeCell to the newly created cell	 = 
	// Add one cell number	++
}
// IsEmpty determines whether ringbuffer is emptyfunc (r *RingBuffer) IsEmpty() bool {
	// readCell and writeCell point to the same cell, and the read and write pointer of the cell also points to the same location, and the cell state is non-full	if  ==  &amp;&amp;  ==  &amp;&amp;  == false {
		return true
	}
	return false
}
// Capacity ringBuffer capacityfunc (r *RingBuffer) Capacity() int {
	return  * CellInitialSize
}
// Reset reset to ring pointing only to two cellsfunc (r *RingBuffer) Reset() {
	lastCell := 
	 = 0
	 = 0
	 = 0
	 = 0
	 = CellInitialCount
	 = 
}

2. Unlimited cache Channle implementation

package unlimitSizeChan
import "sync/atomic"
type T interface{}
// UnlimitSizeChan Unlimited Cache Channeltype UnlimitSizeChan struct {
	bufCount int64       // Statistics the number of elements, atomic operations	In       chan&lt;- T    // Write to channel	Out      &lt;-chan T    // Read channel	buffer   *RingBuffer // Adaptive expansion capacity Buf}
// The total number of elements in Len ucfunc (uc UnlimitSizeChan) Len() int {
	return len() + () + len()
}
// BufLen uc's number of elements in buffunc (uc UnlimitSizeChan) BufLen() int {
	return int(atomic.LoadInt64(&amp;))
}
// NewUnlimitSizeChan Create a new unlimited cached Channle and specify the size of In and Out (In and Out are set as large)func NewUnlimitSizeChan(initCapacity int) *UnlimitSizeChan {
	return NewUnlitSizeChanSize(initCapacity, initCapacity)
}
// NewUnlitSizeChanSize Create a new unlimited cached Channle and specify the In and Out sizes (In and Out are set differently)func NewUnlitSizeChanSize(initInCapacity, initOutCapacity int) *UnlimitSizeChan {
	in := make(chan T, initInCapacity)
	out := make(chan T, initOutCapacity)
	ch := UnlimitSizeChan{In: in, Out: out, buffer: NewRingBuffer()}
	go process(in, out, &amp;ch)
	return &amp;ch
}
// Internal Worker Groutine implementationfunc process(in, out chan T, ch *UnlimitSizeChan) {
	defer close(out) // in close, and then close out after data is read	// Continuously read data from in and put it into out or ringbufloop:
	for {
		// Step 1: Read data from in		value, ok := &lt;-in
		if !ok {
			// in Closed, exit loop			break loop
		}
		// Step 2: Store the data in out or buf		if atomic.LoadInt64(&amp;) &gt; 0 {
			// When there is data in buf, new data will be stored in buf first to ensure data FIFO principle			(value)
			atomic.AddInt64(&amp;, 1)
		} else {
			// out is not full, the data is put into out			select {
			case out &lt;- value:
				continue
			default:
			}
			// out is full, put the data into the buf			(value)
			atomic.AddInt64(&amp;, 1)
		}
		// Step 3: Process buf, try to put the data in buf into out until there is no data in buf		for !() {
			select {
			// In order to avoid blocking in, try to read data from in			case val, ok := &lt;-in:
				if !ok {
					// in Closed, exit loop					break loop
				}
				// Because out is full at this time, the new data is directly put into the buf				(val)
				atomic.AddInt64(&amp;, 1)
			// Put data in buf into out			case out &lt;- ():
				()
				atomic.AddInt64(&amp;, -1)
				if () { // Avoid memory leaks					()
					atomic.StoreInt64(&amp;, 0)
				}
			}
		}
	}
	// After in is closed and exited loop, there may be unprocessed data in the buf. Put them into out and reset the buf.	for !() {
		out &lt;- ()
		atomic.AddInt64(&amp;, -1)
	}
	()
	atomic.StoreInt64(&amp;, 0)
}

4. Use

ch := NewUnlimitSizeChan(1000)
// or ch := NewUnlitSizeChanSize(100,200)
go func() {
    for ...... {
        ...
         <- ... // send values
        ...
    }
    close() // close In channel
}()
for v := range  { // read values
    (v)
}

The above is the detailed content of the design and implementation analysis of Golang infinite cache channel. For more information about Golang infinite cache channel, please pay attention to my other related articles!