SoFunction
Updated on 2025-03-01

Detailed explanation of Redis example of Golang distributed application

text

Redis is a high-performance in-memory database, which is often used in distributed systems. In addition to being a distributed cache or a simple in-memory database, there are some special application scenarios. This article combines Golang to write corresponding middleware.

See all codes in this article/qingwave/go…

Distributed lock

We can use it in a stand-alone systemTo protect critical resources, there is also such a requirement in distributed systems. When multiple hosts seize the same resource, corresponding "distributed locks" need to be added.

In Redis we can passsetnxCommand to implement

  • If the key does not exist, the corresponding value can be set. If the setting is successful, the lock will be successfully added. If the key does not exist, the return failure will be made.
  • Release the lock can be passeddelaccomplish.

The main logic is as follows:

type RedisLock struct {
	client     *
	key        string
	expiration  // Expiration time to prevent downtime or abnormality}
func NewLock(client *, key string, expiration ) *RedisLock {
	return &RedisLock{
		client:     client,
		key:        key,
		expiration: expiration,
	}
}
// Locking will be successful and save the caller id to redisfunc (l *RedisLock) Lock(id string) (bool, error) {
	return ((), , id, ).Result()
}
const unLockScript = `
if (("get", KEYS[1]) == KEYS[2]) then
	("del", KEYS[1])
	return true
end
return false
`
// Unlock the lua script to ensure atomicity, and can only unlock the lock added by the current callerfunc (l *RedisLock) UnLock(id string) error {
	_, err := ((), unLockScript, []string{, id}).Result()
	if err != nil && err !=  {
		return err
	}
	return nil
}

An additional timeout is required to prevent system downtime or deadlock caused by abnormal requests. The timeout time is twice the maximum estimated running time.

When unlocking, the atomicity is guaranteed through the lua script, and the caller will only unlock the lock he has added. Avoid confusion caused by timeouts, for example: Process A acquires the lock at time t1, but due to slow execution, the lock timeout at time t2 fails, and Process B acquires the lock at t3. This is to cancel process B's lock if process A executes and unlocks it.

Run the test

func main() {
    client := (&{
		Addr:     "localhost:6379",
		Password: "123456",
		DB:       0, // use default DB
	})
	lock := NewLock(client, "counter", 30*)
    counter := 0
	worker := func(i int) {
		for {
			id := ("worker%d", i)
			ok, err := (id)
			("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
			if !ok {
				(100 * )
				continue
			}
			defer (id)
			counter++
			("worker %d, add counter %d", i, counter)
			break
		}
	}
	wg := {}
	for i := 1; i <= 5; i++ {
		(1)
		id := i
		go func() {
			defer ()
			worker(id)
		}()
	}
	()
}

The result of the operation can be seen withUsed similar

2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 3, add counter 5

It is particularly noteworthy that in a distributed Redis cluster, if an exception occurs (main node downtime), the availability of distributed locks may be reduced, and it can be achieved through strongly consistent components, etcd, ZooKeeper, etc.

Distributed filters

Suppose we want to develop a crawler service to crawl million-level web pages. How to determine whether a web page has been crawled? In addition to using the database and HashMap, we can use the Bronze filter to do it. Compared to other methods, the Bloom filter takes up very low space and the insertion query time is very fast.

Bloom filter is used to determine whether an element is in the set, using BitSet

  • Hash the value multiple times when inserting data, and set the corresponding position of BitSet 1.
  • When querying, we also perform Hash multiple times to compare whether all bits are 1, and if so, it exists.

The Bloom filter has a certain misjudgment rate and is not suitable for precise query scenarios. Also, deletion of elements is not supported. It is usually suitable for scenarios such as URL deduplication, spam filtering, and preventing cache breakdown.

In Redis, we can use the built-in BitSet implementation, and also use the atomicity of lua scripts to avoid inconsistent data query multiple times.

const (
	// Insert data, call setbit to set the corresponding bit	setScript = `
for _, offset in ipairs(ARGV) do
	("setbit", KEYS[1], offset, 1)
end
`
	// Query the data, return true if all bits are 1	getScript = `
for _, offset in ipairs(ARGV) do
	if tonumber(("getbit", KEYS[1], offset)) == 0 then
		return false
	end
end
return true
`
)
type BloomFilter struct {
	client *
	key    string // key in redis	bits   uint // BitSet size	maps   uint // Number of hash times}
func NewBloomFilter(client *, key string, bits, maps uint) *BloomFilter {
	((), key)
	if maps == 0 {
		maps = 14
	}
	return &amp;BloomFilter{
		key:    key,
		client: client,
		bits:   bits,
		maps:   maps,
	}
}
// Hash multiple times to get the location listfunc (f *BloomFilter) getLocations(data []byte) []uint {
	locations := make([]uint, )
	for i := 0; i &lt; int(); i++ {
		val := murmur3.Sum64(append(data, byte(i)))
		locations[i] = uint(val) % 
	}
	return locations
}
func (f *BloomFilter) Add(data []byte) error {
	args := getArgs((data))
	_, err := ((), setScript, []string{}, args).Result()
	if err != nil &amp;&amp; err !=  {
		return err
	}
	return nil
}
func (f *BloomFilter) Exists(data []byte) (bool, error) {
	args := getArgs((data))
	resp, err := ((), getScript, []string{}, args).Result()
	if err != nil {
		if err ==  {
			return false, nil
		}
		return false, err
	}
	exists, ok := resp.(int64)
	if !ok {
		return false, nil
	}
	return exists == 1, nil
}
func getArgs(locations []uint) []string {
	args := make([]string, 0)
	for _, l := range locations {
		args = append(args, (uint64(l), 10))
	}
	return args
}

Run the test

func main() {
	bf := NewBloomFilter(client,"bf-test", 2^16, 14)
	exists, err := ([]byte("test1"))
	("exist %t, err %v", exists, err)
	if err := ([]byte("test1")); err != nil {
		("add err: %v", err)
	}
	exists, err = ([]byte("test1"))
	("exist %t, err %v", exists, err)
	exists, err = ([]byte("test2"))
	("exist %t, err %v", exists, err)
// output
// 2022/07/22 10:05:58 exist false, err <nil>
// 2022/07/22 10:05:58 exist true, err <nil>
// 2022/07/22 10:05:58 exist false, err <nil>
}

Distributed current limiter

exist/x/time/rateThe package provides a token bucket-based current limiter. If you want to implement current limiting in a distributed environment, you can implement it based on Redis Lua scripts.

The main principles of token buckets are as follows:

  • Suppose a token bucket capacity is burnt, and put the token in it at the rate of qps per second
  • At the beginning, the token is filled, and the token will be discarded directly. When requesting the token, if there are enough tokens in the bucket, it will be allowed, otherwise it will be rejected.
  • When burst==qps, strictly follow the qps flow limit; when burst>qps, a certain burst traffic can be allowed

Here we mainly refer to the officialrateThe implementation of the package changes the core logic to Lua implementation.

--- RelatedKey
--- limit rate keyvalue,correspondvalueThe current number of tokens
local limit_key = KEYS[1]
--- Enter parameters
--[[
qps: Number of requests per second;
burst: Token bucket capacity;
now: currentTimestamp;
cost: Number of requested tokens;
max_wait: Maximum waiting time
--]]
local qps = tonumber(ARGV[1])
local burst = tonumber(ARGV[2])
local now = ARGV[3]
local cost = tonumber(ARGV[4])
local max_wait = tonumber(ARGV[5])
--- GetredisNumber of tokens in
local tokens = ("hget", limit_key, "token")
if not tokens then
	tokens = burst
end
--- Last modified time
local last_time = ("hget", limit_key, "last_time")
if not last_time then
	last_time = 0
end
--- Latest waiting time
local last_event = ("hget", limit_key, "last_event")
if not last_event then
	last_event = 0
end
--- 通过currenthour间与Last modified time的差value,qps计算出currenthour间得令牌数
local delta = (0, now-last_time)
local new_tokens = (burst, delta * qps + tokens)
new_tokens = new_tokens - cost --- Latest token count,Reduce request tokens
--- 如果Latest token count小于0,Calculate the time to wait
local wait_period = 0
if new_tokens &lt; 0 and qps &gt; 0 then
	wait_period = wait_period - new_tokens / qps
end
wait_period = (wait_period)
local time_act = now + wait_period --- Time stamp satisfying the waiting interval
--- There are two situations where requests are allowed
--- 当Number of requested tokens小于burst, 等待hour间不超过Maximum waiting time,Requests can be satisfied by supplementary tokens
--- qpsfor0hour,只要Latest token count不小于0Just
local ok = (cost &lt;= burst and wait_period &lt;= max_wait and qps &gt; 0) or (qps == 0 and new_tokens &gt;= 0)
--- 设置correspondvalue
if ok then
	("set", limit_key, new_tokens)
	("set", last_time_key, now)
	("set", last_event_key, time_act)
end
--- Return to the list,{Whether it is allowed, 等待hour间}
return {ok, wait_period}

The related interfaces Allow, AllowN, Wait, etc. in Golang are all implemented by calling reserveN

// Call the lua scriptfunc (lim *RedisLimiter) reserveN(now , n int, maxFutureReserveSecond int) (*Reservation, error) {
	// ...
	res, err := ((), reserveNScript, []string{}, , , (), n, maxFutureReserveSecond).Result()
	if err != nil &amp;&amp; err !=  {
		return nil, err
	}
	//...
	return &amp;Reservation{
		ok:        allow == 1,
		lim:       lim,
		tokens:    n,
		timeToAct: ((wait) * ),
	}, nil
}

Run the test

func main() {
	rdb := (&{
		Addr:     "localhost:6379",
		Password: "123456",
		DB:       0, // use default DB
	})
	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
	if err != nil {
		(err)
	}
	()
	for i := 0; i < 5; i++ {
		err := (())
		("worker %d allowed: %v", i, err)
	}
}
// output
// 2022/07/22 12:50:31 worker 0 allowed: <nil>
// 2022/07/22 12:50:31 worker 1 allowed: <nil>
// 2022/07/22 12:50:32 worker 2 allowed: <nil>
// 2022/07/22 12:50:33 worker 3 allowed: <nil>
// 2022/07/22 12:50:34 worker 4 allowed: <nil>

The first two requests are in burst and can be obtained directly, and the subsequent requests are generated at the rate of qps.

other

In addition, Redis can also be used as global counting, deduplication, publishing and subscription scenarios. Redis official also provides some general modules, and filtering, current limiting and other features can be achieved by loading these modules. Referencemodules

refer to

/

/qingwave/gocorex

The above is the detailed explanation of the Redis example of Go distributed application. For more information about Go distributed Redis, please pay attention to my other related articles!