SoFunction
Updated on 2025-03-03

Example code for Go to implement concurrent cache

Concurrent unsafe Memo

First, use an example to demonstrateFunction memory

// A Memo caches the results of calling a Func.
type Memo struct {
	f     Func
	cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
	res, ok := [key]
	if !ok {
		,  = (key)
		[key] = res
	}
	return , 
}

Where functionsfIt is a heavyweight calculation function, and it is expensive to call it, so you have to cache the result to amapspeed up each call. This is function memory.
Each callGet, willmemoIf the query result is not found, the function must be calledfCalculate the result and record it in the cache.
The above implementationGetMethod updates cache without synchronizationcache,entireGetfunctionNot concurrently safe

Safe but pseudo-concurrent Memo

Consider each callGetAll methods are locked:

type Memo struct {
	f     Func
	mu     // guards cache
	cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
	()
	res, ok := [key]
	if !ok {
		,  = (key)
		[key] = res
	}
	()
	return , 
}

Since each call requests a mutex,GetThe parallel request operations are serialized again.

Memo that will cause redundant calculations

Consider the following improvements:

func (memo *Memo) Get(key string) (value interface{}, err error) {
	()
	res, ok := [key]
	()
	if !ok {
		,  = (key)

		// Between the two critical sections, several goroutines
		// may race to compute f(key) and update the map.
		()
		[key] = res
		()
	}
	return , 
}

This version acquires locks in two times: the first time is used for query cache, and the second time is used for updates when there is no result in the query.
Ideally, we should avoid this extra treatment. This function is sometimes called duplication suppression.

Repeat suppression through the channel

In the fourth version of the cache, we will use eachentryA new channel has been addedready. Finished in setupentryofresultAfter the field, the channel will be closed, and the waiting goroutine will receive a broadcast, and you canentryThe result was read.

// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
	f     Func
	mu     // guards cache
	cache map[string]*entry	//The cache now returns an entry}

func (memo *Memo) Get(key string) (value interface{}, err error) {
	()
	e := [key]
	if e == nil {
		// This is the first request for this key.
		// This goroutine becomes responsible for computing
		// the value and broadcasting the ready condition.
		e = &entry{ready: make(chan struct{})}
		[key] = e
		()

		,  = (key)

		close() // broadcast ready condition
	} else {
		// This is a repeat request for this key.
		()

		<- // wait for ready condition
	}
	return , 
}

whenGetFunction discovery cachememoWhen there is no record in it, it constructs aentryPut it in cache, but at this timekeyThe corresponding results have not been calculated yet.
At this time, if other goroutines are calledGetFunction query the samekeyWhen it arrives<-Statements are blocked by waiting for channel data. Only when the calculation is completed and the goroutine responsible for the calculation results closes the channel, other goroutines can continue to be executed and the results are queried.

  • When a goroutine tries to query a result that does not exist, it creates aentryPut it in the cache, unlock it, and call itfPerform calculations. Update the corresponding calculation after completionentryYou canreadyChannel closes;
  • When a goroutine tries to query an existing result, it should immediately give up the lock and wait for the foundentrythe channel closes.

Another design of "shared memory through communication"

The above introductionShare variables and lock themAnother way to do this isCommunication sequence process
In the new design,mapThe variable is limited to a monitoring goroutine, andGetThe caller ofSend a message

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{} // closed when res is ready
}

// A request is a message requesting that the Func be applied to key.
type request struct {
	key      string
	response chan&lt;- result // the client wants a single result
}

type Memo struct{ requests chan request }

// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
	memo := &amp;Memo{requests: make(chan request)}
	go (f)
	return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
	response := make(chan result)
	 &lt;- request{key, response}
	res := &lt;-response
	return , 
}

func (memo *Memo) Close() { close() }

//!-get

//!+monitor

func (memo *Memo) server(f Func) {
	cache := make(map[string]*entry)
	for req := range  {
		e := cache[]
		if e == nil {
			// This is the first request for this key.
			e = &amp;entry{ready: make(chan struct{})}
			cache[] = e
			go (f, ) // call f(key)
		}
		go ()
	}
}

func (e *entry) call(f Func, key string) {
	// Evaluate the function.
	,  = f(key)
	// Broadcast the ready condition.
	close()
}

func (e *entry) deliver(response chan&lt;- result) {
	// Wait for the ready condition.
	&lt;-
	// Send the result to the client.
	response &lt;- 
}

GetMethod to create aresponsechannel, put it in a request, then send it to the monitor goroutine, and then create it from your ownresponseRead in the channel.

Monitor goroutine (i.e.serverMethod) ContinuouslyrequestRead in the channel until the channel is closed. For each request, it first querys from the cache, creates and inserts a new one if not foundentry
To monitor goroutine, create aentryPut it in cache, and then it callsgo (f, )Create a gorouitne to calculate the result and close itreadyaisle. At the same time it callsgo ()waitreadyThe channel is closed and the result is sent toresponsein the channel;

If monitoringgoroutineThe result was found directly from the cache, then according tokeyFoundentryIt already contains a closed channel, which callsgo ()You can put the results directly inresponsein the channel.

To sum up,serverResponsible for reading requests from the request channel, for unfinished calculationskey, it creates a new goroutine to perform calculation tasks, and then passes the included in the requestrespChannel reply request.

Further transformation can limit the number of goroutines to be calculated and passcontextPackage controlserverlife cycle, etc.

This is the end of this article about Go's example code to implement concurrent cache. For more related contents of Go's concurrent cache, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!