introduction
In real-time data processing systems, we often need to count the number of occurrences of an event in a specific time window, for example:
- Statistics the number of visits per hour by user
- Limit the frequency of requests per minute of the device
- Advertisement exposure counts by hour
Such needs often face two core challenges:
- High concurrency counting: multiple servers read and write the same counter at the same time
- Precise time window: data automatically expires at point to avoid accumulation
This article will introduce in detail how to implement high-performance and highly available counting schemes based on Redis, and provide a complete Java code implementation.
1. Redis Counting Plan Selection
1.1 Why choose Redis
plan | QPS | Data consistency | Implement complexity |
---|---|---|---|
Database + Transactions | ~1K | Strong consistency | high |
Local cache | ~100K | Final agreement | middle |
Redis atomic operation | 50K+ | Strong consistency | Low |
Redis's single-threaded model is naturally suitable for counting scenarios and provides atomic commands such as INCR/INCRBY.
1.2 Key Design Principles
// Format: Business Prefix: appId:deviceId:ip:time windowString key = "flow:count:app123:device456:127.0.0.1:2023080117";
- Contains all dimension information
- Time window is divided by hours (adjustable)
- Add business prefix to avoid conflicts
2. Basic implementation plan
2.1 Simple INCRBY implementation
public void incrementCount(String key, int delta) { ().increment(key, delta); }
Problem: No expiration time will cause unlimited data accumulation
2.2 Increase expiration time
public void incrementWithExpire(String key, int delta, long ttlSeconds) { ().increment(key, delta); (key, ttlSeconds, ); }
New problem: TTL is set for each operation, causing redundant Redis calls
3. Optimization solution: Accurate TTL control
3.1 Determine whether the key is first written
We need to make sure that TTL is only set once when Key is created, two ways to implement it:
Solution A: Lua script (recommended)
private static final String LUA_SCRIPT = "local current = ('INCRBY', KEYS[1], ARGV[1])\n" + "if current == tonumber(ARGV[1]) then\n" + " ('EXPIRE', KEYS[1], ARGV[2])\n" + "end\n" + "return current"; public Long incrementAtomically(String key, int delta, long ttl) { return ( new DefaultRedisScript<>(LUA_SCRIPT, ), (key), (delta), (ttl) ); }
Advantages:
- Completely atomic execution
- Single network round-trip
- Accurate judgment on first write
Solution B: SETNX+INCRBY
public void incrementWithNX(String key, int delta, long ttl) { ((RedisCallback<Object>) connection -> { StringRedisConnection conn = (StringRedisConnection) connection; (key, "0"); // Try initializing (key, delta); if ((key + ":lock", "1")) { // The first time you can judge the simple lock (key, ttl); (key + ":lock", 10); } return null; }); }
Applicable scenarios: Redis version < 2.6 (Lua is not supported)
4. Complete production-level implementation
4.1 Time window calculation
public long calculateTtlToNextHour() { LocalDateTime now = (); LocalDateTime nextHour = (1).truncatedTo(); return (now, nextHour); }
4.2 Kafka Consumer Integration
@Component @RequiredArgsConstructor public class FlowCounter { private final RedisTemplate<String, String> redisTemplate; private static final String KEY_PREFIX = "flow:count:"; @KafkaListener(topics = "${}") public void handleMessages(List<Message> messages) { Map<String, Integer> countMap = () .collect(( this::buildKey, msg -> 1, Integer::sum )); ((k, v) -> incrementAtomically(k, v, calculateTtlToNextHour()) ); } private String buildKey(Message msg) { return ("%s%s:%s:%s:%s", KEY_PREFIX, (), (), (), ().format(("yyyyMMddHH")) ); } }
4.3 Query interface
public long getCurrentCount(String appId, String deviceId, String ip) { String key = buildKey(appId, deviceId, ip); String val = ().get(key); return val != null ? (val) : 0L; }
5. Performance optimization skills
5.1 Pipeline batch processing
((RedisCallback<Object>) connection -> { StringRedisConnection conn = (StringRedisConnection) connection; ((k, v) -> { (k, v); // Can be further optimized in combination with Lua scripts }); return null; });
5.2 Local pre-aggregation
// First merge the counts of the same key in memoryMap<String, Integer> localCount = () .collect(( this::buildKey, m -> 1, Integer::sum ));
5.3 Things to note when deploying a cluster
Use {} forced hash tag to ensure that the same key is routed to the same node
"{flow}:count:app123:..."
Consider sharding strategies to avoid hot spots
6. Exception handling and monitoring
6.1 Redis retry mechanism
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 100)) public void safeIncrement(String key, int delta) { // Business logic}
6.2 Monitoring indicators
# TYPE redis_operations_total counter redis_operations_total{operation="incr"} 12345 redis_operations_total{operation="expire"} 678
6.3 Data compensation
@Scheduled(fixedRate = 3600000) public void checkDataConsistency() { // Comparison of the differences between DB and Redis counts}
7. Comparison and summary of the plan
plan | advantage | shortcoming | Applicable scenarios |
---|---|---|---|
Lua script | Strong atomicity, best performance | Redis 2.6+ required | First choice for new projects |
SETNX+INCR | Compatible with old versions | There is competition risk | Legacy System |
Pure INCR+TTL | Simple implementation | TTL redundancy | Not recommended for production |
Conclusion
Through the solution of this article, we have implemented:
- Single-player 50K+ QPS counting capability
- Accurate time window control to hour
- Strong consistency in distributed environments
Best Practice Recommendations:
- Lua scripting scheme is preferred in production environment
- For ultra-high concurrency scenarios (such as Double 11), local cache layer can be added
- Regularly check Redis memory usage
The above is a detailed explanation of the method of Redis accurate counting and time window expiration under high concurrency. For more information about Redis high concurrency accurate counting, please pay attention to my other related articles!