Current limiting is a tool that is often used in projects. It is generally used to limit the frequency of user requests, and can also avoid system crashes caused by excessive instantaneous traffic, or stabilize message processing speed. And sometimes we also need to use distributed current limiting. The common implementation method is to use Redis as the central storage.
This article mainly uses Go+Redis to implement common current limiting algorithms. If you need to understand the principles of each current limiting algorithm, you can read the articleGo implements common current limiting algorithms
The following code has been usedgo-redisClient
Fixed window
Using Redis to implement fixed windows is simple, mainly because there is only one window in the fixed window at the same time, so we can use it when entering the window for the first timepexpire
The command sets the expiration time to the window time size, so that the window will become invalid with the expiration time. At the same time, we useincr
Command increases window count.
Because we need tocounter==1
When setting the expiration time of the window, in order to ensure atomicity, we use simpleLua
Script implementation.
const fixedWindowLimiterTryAcquireRedisScript = ` -- ARGV[1]: Window time size -- ARGV[2]: Window request cap local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) -- Get the original value local counter = tonumber(("get", KEYS[1])) if counter == nil then counter = 0 end -- 若到达Window request cap,Request failed if counter >= limit then return 0 end -- Window value+1 ("incr", KEYS[1]) if counter == 0 then ("pexpire", KEYS[1], window) end return 1 `
package redis import ( "context" "errors" "/go-redis/redis/v8" "time" ) // FixedWindowLimiter Fixed Window Current Limitertype FixedWindowLimiter struct { limit int // window request upper limit window int // Window time size client * // Redis client script * // TryAcquire script} func NewFixedWindowLimiter(client *, limit int, window ) (*FixedWindowLimiter, error) { // Redis expiration time accuracy is up to milliseconds, so the window must be completely divided by milliseconds if window% != 0 { return nil, ("the window uint must not be less than millisecond") } return &FixedWindowLimiter{ limit: limit, window: int(window / ), client: client, script: (fixedWindowLimiterTryAcquireRedisScript), }, nil } func (l *FixedWindowLimiter) TryAcquire(ctx , resource string) error { success, err := (ctx, , []string{resource}, , ).Bool() if err != nil { return err } // If the window request limit is reached, the request fails if !success { return ErrAcquireFailed } return nil }
Sliding window
hash implementation
We use Redishash
Stores the count of each small window, and each request will take allValid window
The count ofcount
,usehdel
Delete the invalid window and finally determine whether the total count of the window is greater than the upper limit.
We basically put all the logic into Lua scripts, and the big head is righthash
The time complexity of traversal is O(N), and N is the number of small windows, so it is best not to have too many small windows.
const slidingWindowLimiterTryAcquireRedisScriptHashImpl = ` -- ARGV[1]: Window time size -- ARGV[2]: Window request cap -- ARGV[3]: Current small window value -- ARGV[4]: Starting small window value local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local currentSmallWindow = tonumber(ARGV[3]) local startSmallWindow = tonumber(ARGV[4]) -- Calculate the total number of requests in the current window local counters = ("hgetall", KEYS[1]) local count = 0 for i = 1, #(counters) / 2 do local smallWindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallWindow < startSmallWindow then ("hdel", KEYS[1], smallWindow) else count = count + counter end end -- 若到达Window request cap,Request failed if count >= limit then return 0 end -- 若没到Window request cap,Current small window counter+1,Request succeeded ("hincrby", KEYS[1], currentSmallWindow, 1) ("pexpire", KEYS[1], window) return 1 `
package redis import ( "context" "errors" "/go-redis/redis/v8" "time" ) // SlidingWindowLimiter Sliding Window Current Limitertype SlidingWindowLimiter struct { limit int // window request upper limit window int64 // Window time size smallWindow int64 // Small window time size smallWindows int64 // Number of small windows client * // Redis client script * // TryAcquire script} func NewSlidingWindowLimiter(client *, limit int, window, smallWindow ) ( *SlidingWindowLimiter, error) { // Redis expiration time accuracy is up to milliseconds, so the window must be completely divided by milliseconds if window% != 0 || smallWindow% != 0 { return nil, ("the window uint must not be less than millisecond") } // Window time must be divided by small window time if window%smallWindow != 0 { return nil, ("window cannot be split by integers") } return &SlidingWindowLimiter{ limit: limit, window: int64(window / ), smallWindow: int64(smallWindow / ), smallWindows: int64(window / smallWindow), client: client, script: (slidingWindowLimiterTryAcquireRedisScriptHashImpl), }, nil } func (l *SlidingWindowLimiter) TryAcquire(ctx , resource string) error { // Get the current small window value currentSmallWindow := ().UnixMilli() / * // Get the starting small window value startSmallWindow := currentSmallWindow - *(-1) success, err := ( ctx, , []string{resource}, , , currentSmallWindow, startSmallWindow).Bool() if err != nil { return err } // If the window request limit is reached, the request fails if !success { return ErrAcquireFailed } return nil }
List implementation
If the number of small windows is particularly large, you can uselist
To optimize time complexity, the structure of list is:
[counter, smallWindow1, count1, smallWindow2, count2, smallWindow3, count3...]
That is, we use the first element of list to store the counter, each window is represented by two elements, the first element represents the small window value, and the second element represents the count of this small window. The reason why small window values and counts are not placed directly in an element is that there is no function that splits strings in the Redis Lua script.
Specific operation process:
1. Get the list length
2. If the length is 0, set counter, length +1
3. If the length is greater than 1, get the second and third element
If the value is less than the starting small window value, counter-the value of the third element, delete the second and third element, length -2
4. If counter is greater than or equal to limit, the request fails
5. If the length is greater than 1, get the second to last element
- If the value of the penultimate element's small window is greater than or equal to the current small window value, it means that the current request is due to network delay. When it arrives at the server, the window is outdated. The penultimate element is regarded as the current small window (because it is updated), and the value of the penultimate element + 1
- Otherwise, add new window value, add new count (1), update expiration time
6. Otherwise, add new window value, add new count (1), update expiration time
+ 1
8. Return to success
const slidingWindowLimiterTryAcquireRedisScriptListImpl = ` -- ARGV[1]: Window time size -- ARGV[2]: Window request cap -- ARGV[3]: Current small window value -- ARGV[4]: Starting small window value local window = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local currentSmallWindow = tonumber(ARGV[3]) local startSmallWindow = tonumber(ARGV[4]) -- Getlistlength local len = ("llen", KEYS[1]) -- 如果length是0,set upcounter,length+1 local counter = 0 if len == 0 then ("rpush", KEYS[1], 0) ("pexpire", KEYS[1], window) len = len + 1 else -- 如果length大于1,Get第二第个元素 local smallWindow1 = tonumber(("lindex", KEYS[1], 1)) counter = tonumber(("lindex", KEYS[1], 0)) -- 如果该值小于Starting small window value if smallWindow1 < startSmallWindow then local count1 = ("lindex", KEYS[1], 2) -- counter-The value of the third element counter = counter - count1 -- length-2 len = len - 2 -- Delete the second and third elements ("lrem", KEYS[1], 1, smallWindow1) ("lrem", KEYS[1], 1, count1) end end -- 若到达Window request cap,Request failed if counter >= limit then return 0 end -- 如果length大于1,Get倒数第二第一个元素 if len > 1 then local smallWindown = tonumber(("lindex", KEYS[1], -2)) -- 如果倒数第二个元素小窗口值大于等于Current small window value if smallWindown >= currentSmallWindow then -- Treat the penultimate element as the current small window(Because it's updated),The last element value+1 local countn = ("lindex", KEYS[1], -1) ("lset", KEYS[1], -1, countn + 1) else -- otherwise,Add new window value,Add a new count(1),Update expiration time ("rpush", KEYS[1], currentSmallWindow, 1) ("pexpire", KEYS[1], window) end else -- otherwise,Add new window value,Add a new count(1),Update expiration time ("rpush", KEYS[1], currentSmallWindow, 1) ("pexpire", KEYS[1], window) end -- counter + 1And update ("lset", KEYS[1], 0, counter + 1) return 1 `
Algorithms are all operationslist
Head or tail, so the time complexity is close to O(1)
Leak bucket algorithm
The leaking bucket needs to save the current water level and the last release time, so we usehash
to save these two values.
const leakyBucketLimiterTryAcquireRedisScript = ` -- ARGV[1]: Highest water level -- ARGV[2]: Water flow rate/Second -- ARGV[3]: Current time(Second) local peakLevel = tonumber(ARGV[1]) local currentVelocity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local lastTime = tonumber(("hget", KEYS[1], "lastTime")) local currentLevel = tonumber(("hget", KEYS[1], "currentLevel")) -- initialization if lastTime == nil then lastTime = now currentLevel = 0 ("hmset", KEYS[1], "currentLevel", currentLevel, "lastTime", lastTime) end -- Try to release water -- Time to last time local interval = now - lastTime if interval > 0 then -- Current water level-Time to last time(Second)*Water flow rate local newLevel = currentLevel - interval * currentVelocity if newLevel < 0 then newLevel = 0 end currentLevel = newLevel ("hmset", KEYS[1], "currentLevel", newLevel, "lastTime", now) end -- 若到达Highest water level,Request failed if currentLevel >= peakLevel then return 0 end -- 若没有到达Highest water level,Current water level+1,Request succeeded ("hincrby", KEYS[1], "currentLevel", 1) ("expire", KEYS[1], peakLevel / currentVelocity) return 1 `
package redis import ( "context" "/go-redis/redis/v8" "time" ) // LeakyBucketLimiter LeakyBucketLimitertype LeakyBucketLimiter struct { peakLevel int // The highest water level currentVelocity int // Water flow rate/second client * // Redis client script * // TryAcquire script} func NewLeakyBucketLimiter(client *, peakLevel, currentVelocity int) *LeakyBucketLimiter { return &LeakyBucketLimiter{ peakLevel: peakLevel, currentVelocity: currentVelocity, client: client, script: (leakyBucketLimiterTryAcquireRedisScript), } } func (l *LeakyBucketLimiter) TryAcquire(ctx , resource string) error { // Current time now := ().Unix() success, err := (ctx, , []string{resource}, , , now).Bool() if err != nil { return err } // If the window request limit is reached, the request fails if !success { return ErrAcquireFailed } return nil }
Token bucket
Token buckets can be regarded as the opposite algorithm of leaking buckets. One is to pour water into the bucket and the other is to get the token from the bucket.
const tokenBucketLimiterTryAcquireRedisScript = ` -- ARGV[1]: capacity -- ARGV[2]: Token issuance rate/Second -- ARGV[3]: Current time(Second) local capacity = tonumber(ARGV[1]) local rate = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local lastTime = tonumber(("hget", KEYS[1], "lastTime")) local currentTokens = tonumber(("hget", KEYS[1], "currentTokens")) -- initialization if lastTime == nil then lastTime = now currentTokens = capacity ("hmset", KEYS[1], "currentTokens", currentTokens, "lastTime", lastTime) end -- Try issuing a token -- Time from last time the token was issued local interval = now - lastTime if interval > 0 then -- Current number of tokens+Time from last time the token was issued(Second)*Token issuance rate local newTokens = currentTokens + interval * rate if newTokens > capacity then newTokens = capacity end currentTokens = newTokens ("hmset", KEYS[1], "currentTokens", newTokens, "lastTime", now) end -- If there is no token,Request failed if currentTokens == 0 then return 0 end -- If there is a token,Current token-1,Request succeeded ("hincrby", KEYS[1], "currentTokens", -1) ("expire", KEYS[1], capacity / rate) return 1 `
package redis import ( "context" "/go-redis/redis/v8" "time" ) // TokenBucketLimiter TokenBucketLimitertype TokenBucketLimiter struct { capacity int // capacity rate int // Token issuance rate/second client * // Redis client script * // TryAcquire script} func NewTokenBucketLimiter(client *, capacity, rate int) *TokenBucketLimiter { return &TokenBucketLimiter{ capacity: capacity, rate: rate, client: client, script: (tokenBucketLimiterTryAcquireRedisScript), } } func (l *TokenBucketLimiter) TryAcquire(ctx , resource string) error { // Current time now := ().Unix() success, err := (ctx, , []string{resource}, , , now).Bool() if err != nil { return err } // If the window request limit is reached, the request fails if !success { return ErrAcquireFailed } return nil }
Sliding log
The algorithm flow is the same as the sliding window, except that it can specify multiple policies, and when the request fails, the caller needs to be notified which policy is intercepted by.
const slidingLogLimiterTryAcquireRedisScriptHashImpl = ` -- ARGV[1]: Current small window value -- ARGV[2]: The window time size of the first strategy -- ARGV[i * 2 + 1]: The starting small window value for each policy -- ARGV[i * 2 + 2]: Each policy's window request cap local currentSmallWindow = tonumber(ARGV[1]) -- The window time size of the first strategy local window = tonumber(ARGV[2]) -- The starting small window value of the first strategy local startSmallWindow = tonumber(ARGV[3]) local strategiesLen = #(ARGV) / 2 - 1 -- Calculate the total number of requests for each policy's current window local counters = ("hgetall", KEYS[1]) local counts = {} -- initializationcounts for j = 1, strategiesLen do counts[j] = 0 end for i = 1, #(counters) / 2 do local smallWindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallWindow < startSmallWindow then ("hdel", KEYS[1], smallWindow) else for j = 1, strategiesLen do if smallWindow >= tonumber(ARGV[j * 2 + 1]) then counts[j] = counts[j] + counter end end end end -- If the corresponding policy window request limit is reached,Request failed,Returns the violated policy subscript for i = 1, strategiesLen do if counts[i] >= tonumber(ARGV[i * 2 + 2]) then return i - 1 end end -- If the window request limit is not reached,Current small window counter+1,Request succeeded ("hincrby", KEYS[1], currentSmallWindow, 1) ("pexpire", KEYS[1], window) return -1 `
package redis import ( "context" "errors" "fmt" "/go-redis/redis/v8" "sort" "time" ) // ViolationStrategyError Policy violation errortype ViolationStrategyError struct { Limit int // window request upper limit Window // Window time size} func (e *ViolationStrategyError) Error() string { return ("violation strategy that limit = %d and window = %d", , ) } // SlidingLogLimiterStrategy Sliding Log Current Limiter Strategytype SlidingLogLimiterStrategy struct { limit int // window request upper limit window int64 // Window time size smallWindows int64 // Number of small windows} func NewSlidingLogLimiterStrategy(limit int, window ) *SlidingLogLimiterStrategy { return &SlidingLogLimiterStrategy{ limit: limit, window: int64(window), } } // SlidingLogLimiter Sliding Log Current Limitertype SlidingLogLimiter struct { strategies []*SlidingLogLimiterStrategy // List of sliding log current limiter policies smallWindow int64 // Small window time size client * // Redis client script * // TryAcquire script} func NewSlidingLogLimiter(client *, smallWindow , strategies ...*SlidingLogLimiterStrategy) ( *SlidingLogLimiter, error) { // Copy policy avoids modification strategies = append(make([]*SlidingLogLimiterStrategy, 0, len(strategies)), strategies...) // You cannot set a policy if len(strategies) == 0 { return nil, ("must be set strategies") } // Redis expiration time accuracy is up to milliseconds, so the window must be completely divided by milliseconds if smallWindow% != 0 { return nil, ("the window uint must not be less than millisecond") } smallWindow = smallWindow / for _, strategy := range strategies { if %int64() != 0 { return nil, ("the window uint must not be less than millisecond") } = / int64() } // Sorting strategy, the row with large window time is in front of the row, and the row with large upper limit of the same window is in front of the row with large window (strategies, func(i, j int) bool { a, b := strategies[i], strategies[j] if == { return > } return > }) for i, strategy := range strategies { // As the window time becomes smaller, the upper limit of the window should also become smaller if i > 0 { if >= strategies[i-1].limit { return nil, ("the smaller window should be the smaller limit") } } // Window time must be divided by small window time if %int64(smallWindow) != 0 { return nil, ("window cannot be split by integers") } = / int64(smallWindow) } return &SlidingLogLimiter{ strategies: strategies, smallWindow: int64(smallWindow), client: client, script: (slidingLogLimiterTryAcquireRedisScriptHashImpl), }, nil } func (l *SlidingLogLimiter) TryAcquire(ctx , resource string) error { // Get the current small window value currentSmallWindow := ().UnixMilli() / * args := make([]interface{}, len()*2+2) args[0] = currentSmallWindow args[1] = [0].window // Get the starting small window value for each policy for i, strategy := range { args[i*2+2] = currentSmallWindow - *(-1) args[i*2+3] = } index, err := ( ctx, , []string{resource}, args...).Int() if err != nil { return err } // If the window request limit is reached, the request fails if index != -1 { return &ViolationStrategyError{ Limit: [index].limit, Window: ([index].window), } } return nil }
Summarize
Since Redis has rich and high-performance data types, it is not difficult to implement the current limiting algorithm using Redis, but each algorithm requires writing Lua scripts, so if you are not familiar with Lua, you may fall into some pitfalls.
If you need complete code and test code, you can view:/jiaxwu/limiter/tree/main/redis
The above is the detailed content of the sample code for Go+Redis to implement common current limiting algorithms. For more information about Go Redis current limiting algorithm, please pay attention to my other related articles!