What is pipeline
In Redis, Pipeline is an optimization mechanism for communication between clients and servers, aiming to reduce network round trip time and improve command execution efficiency. The following are the specific definitions and features of Redis Pipeline:
1. Batch sending and receiving:
- When using Pipeline, the client no longer sends commands one by one, but instead packages multiple commands into a request packet and sends them to the Redis server at one time. Correspondingly, after receiving this request packet, the server does not immediately return the execution result of each command, but first executes all commands in turn, and then packages all the results into a response packet and returns them to the client.
- This approach significantly reduces the number of network communications between clients and servers, especially in scenarios where a large number of commands are required, which can greatly reduce the impact of network latency.
2. Asynchronous execution
- Although all commands in Redis Pipeline are executed sequentially on the server side, since communication between the client and the server is done in batches, the client can start processing other tasks immediately after sending a batch of commands without waiting for a separate response to each command. This asynchronous processing method can better utilize the client's computing resources and improve the concurrency performance of the overall application.
3. Command Isolation
- In Pipeline, the execution of each command does not affect each other, that is, the execution result of a command will not affect the execution of subsequent commands. This means that even if a command fails to execute, the execution of subsequent commands will not be blocked. When the client parses the response packet, it can judge the execution result of each command based on the response content.
4. Usage scenarios
- Pipeline is mainly suitable for operations that require a large number of commands to Redis, such as batch data import, large-scale data updates, complex queries, etc. Without Pipeline, these operations may result in a significant increase in overall execution time due to network latency.
- For operations involving transactions, although Pipeline can also be used to package commands, it should be noted that Pipeline does not provide atomicity and consistency guarantees of transactions. If you need to ensure that a set of commands is executed as an atomic unit, you should use Redis's MULTI/EXEC command to start the transaction.
5. Things to note
- Although Pipeline can significantly improve the efficiency of command execution, the number of commands sent at one time should not be too large, otherwise it may lead to excessive data packets, increase network transmission pressure, and even exceed the buffer limit of the Redis server or client, causing an error. Reasonable command packaging size needs to be adjusted according to the actual environment and network conditions.
- When using Pipeline, since the response of the command is delayed, the client needs to do a good job of error handling and retry policies, especially when the network is unstable or the server load is high.
In summary, Redis Pipeline is a technology for efficient communication between clients and servers. By sending and receiving commands in batches, it reduces the number of network round trips and improves command execution efficiency, especially suitable for scenarios where a large number of command operations are performed. When using it, you need to pay attention to the control of the size of the command package and error handling.
Scenario 1: I want to add a large batch of data to redis
Redis Pipeline allows multiple commands to be sent to the Redis server at once without waiting for the response of each command, significantly reducing network round trip time and potential latency. In Spring Boot applications, you can use the executePipelined() method of RedisTemplate:
@Autowired private StringRedisTemplate redisTemplate public void batchInsertUsersWithPipeline(List<User> users, String keyPrefix, long ttlSeconds) { ((RedisCallback<Object>) connection -> { for (User user : users) { String key = generateKey(keyPrefix, ()); String value = (user); ((), (int) ttlSeconds, ()); } return null; }); }
Batch processing
Although Pipeline improves efficiency, sending all commands at once can cause memory overflow or network blockage for tens of millions of data. Therefore, it is recommended to process the data in batches, each batch contains an appropriate amount of records (such as 1,000 items) and send it to Redis batch by batch:
public void insertUsersInBatches(List<User> users, String keyPrefix, long ttlSeconds, int batchSize) { int start = 0; while (start < ()) { int end = (start + batchSize, ()); List<User> batch = (start, end); batchInsertUsersWithPipeline(batch, keyPrefix, ttlSeconds); start = end; } }
batchInsertUsersWithPipeline method uses the Redis Pipeline mechanism to send batch commands, which can improve the concurrency of insertion operations to a certain extent and reduce network round trip time and overall time consumption. However, Pipeline itself cannot strictly guarantee that all commands will succeed or fail at the same time. Its main features are as follows:
1. Atomicity:
- The Redis command is atomic inside Pipeline, that is, the execution of a single command will not be interrupted by other commands.
- Note: This does not mean that all commands of the entire Pipeline are atomic as a whole. The commands in Pipeline are still executed sequentially, but the communication process between the client and the server is optimized.
2. Response order
- The Redis server returns the results in the order in which the commands were received. Even if multiple commands are sent concurrently in Pipeline, the responses received by the client will be arranged in the order in which the commands are sent.
3. Fault handling
- If a command in Pipeline fails to execute (such as syntax errors, key does not exist, etc.), subsequent commands will usually continue to be executed.
- Error information will be included in the response to the corresponding command. The client can judge which commands are executed successfully and which fail based on this information.
To sum up, the batchInsertUsersWithPipeline method cannot strictly ensure that all commands succeed or fail at the same time. In actual use, if you need to ensure that a batch of data is either inserted successfully or rolled back all fails:
Transactions (MULTI/EXEC/DISCARD):
- redis provides a transaction function. Through commands such as MULTI, EXEC and DISCARD, a set of commands can be packaged together and executed. The entire transaction will be submitted only when all commands can be successfully executed; otherwise, any failure of any command will cause the entire transaction to roll back.
- Although Redis transactions do not support rollback to a specific state (i.e., isolation is not guaranteed), in batch insert scenarios, it can meet the "all or nothing" requirements.
Lua script:
- Use Lua scripts to write batch insertion logic, and the scripts are executed on the Redis server side, which is atomic. Even in abnormal situations such as network interruption or server restart, the script will either be executed completely or not at all, and there will be no partial success or partial failure.
What is the difference between the various methods in the connection in batchInsertUsersWithPipeline method?
((), (int) ttlSeconds, ());
This line calls the setEx method of RedisConnection, which is used to set a key-value pair with an expiration time (Time To Live, TTL). The parameters are as follows:
- (): Convert the given key (string) to a byte array, which is the format required by Redis's underlying communication protocol.
- (int) ttlSeconds: Converts expiration time (in seconds) to an integer type, indicating that the key-value pair automatically expires after the specified number of seconds and is deleted.
- (): Also convert the given value (JSON string serialized by the user object) into a byte array
The setEx method ensures that a key-value pair is set to an expiration time. If the key already exists, the method updates the key's value and expiration time. This operation is atomic in Pipeline mode, that is, only one setEx command is executed at the same time.
(); and ();
These two methods involve the Transaction function of Redis. In Pipeline mode, these two methods are generally not used because we want to maintain high performance. But if you really need to ensure the atomicity of a batch of commands, you can use the following method:
- (): Open a transaction block, and all subsequent commands will be put into this transaction until the exec method is called. In Pipeline mode, calling the multi method may destroy the original performance optimization effect.
- (): Submit and execute all commands in the transaction. If any command in the transaction fails to execute, the other commands will be cancelled and the entire transaction is considered to be a failure.
multi and exec are not used in your batchInsertUsersWithPipeline method, because Pipeline already provides an efficient batch execution mechanism, and the purpose here is to improve insertion performance rather than implement strict transaction behavior.
To sum up, the setEx method is directly used in the batchInsertUsersWithPipeline method, and Pipeline is used to efficiently insert key-value pairs with expiration time. If stricter transaction control is required, transactions using Redis (MULTI/EXEC) or Lua scripts should be considered, but this usually sacrifices some performance and is not fully compatible with the Pipeline mechanism. In practical application scenarios, the appropriate operation method should be selected based on business needs.
What is the difference between () and
() and () are both key-value pair setting methods for Redis. Their main differences are whether there are conditions and how to deal with existing keys:
(key, value)
This is the most basic way to set key-value pairs. Regardless of whether the key exists or not, the corresponding key-value pair will be directly overwritten (or created). The parameters are as follows:
- key: The key to set.
- value: The value to be associated.
Behavior characteristics:
- Unconditional settings: The setting operation will be performed regardless of whether the key exists or not.
- Overwrite existing keys: If the key already exists, its original value will be overwritten by the new value.
- Create a new key: If the key does not exist, a new key-value pair is created.
(key, value)
This is a key-value pair setting with conditions, which is set only if the key does not exist. The parameters are the same as set():
- key: The key to set
- value: The value to be associated.
Behavioral characteristics
- Conditional settings: Only perform the setting operation if the key does not exist.
- Do not overwrite existing keys: If the key already exists, the method will not have any actions, neither change the key's value nor throw an error.
- Create a new key: If the key does not exist, a new key-value pair is created.
In summary, () sets or updates the key-value pairs unconditionally, while () sets the key-value pairs only when the key does not exist. If the key already exists, no operation will be performed. The former is suitable for regular key-value updates or insertions, while the latter is often used to implement lock mechanisms, uniqueness checks and other scenarios to ensure that the value of a certain key is only valid when it is set for the first time. In your batchInsertUsersWithPipeline method, since the goal is to batch insert new data, the setEx method (set with expiration time) is used, ensuring that each user data is added to Redis as a new key-value pair. If you need to check the uniqueness of the key before insertion, consider using the setNx method. However, for batch insert scenarios, it is often assumed that the data is new and the key does not exist, so it is more common to use setEx directly.
Scenario 2: Delete data in redis in large quantities
public void batchDeleteKeysWithPipeline(List<String> keys) { ((RedisCallback<Object>) connection -> { for (String key : keys) { (()); } return null; }); }
- The () method creates a Pipeline context that allows you to send multiple commands within the callback function without waiting for a response.
- The callback function traverses the list of keys to be deleted and calls (()) for each key. The del method is used to delete the specified key, convert the key name into a byte array and pass it to Redis.
- All del commands are sent continuously to the Redis server in Pipeline, during which the client will not wait for any response.
- When the callback function is executed and returned, the commands in Pipeline will be sent to Redis at one time and receive the responses to all commands. Since commands are sent in batches in a single network roundtrip, it is more efficient than executing each delete command individually.
Scenario 3: Delete tens of millions of data in redis
1. Batch Delete Policy
- Use the SCAN command in combination with the DEL command to achieve batch deletion.
The SCAN command is used to iterate the dataset incrementally to avoid memory overflow from obtaining all keys at once.
The DEL command is used to delete a single or multiple keys.
2. Parallel processing
- Use multi-threading or asynchronous tasks to spread batch deletion operations into multiple worker threads to improve deletion efficiency.
Client optimization:
- Choose a Redis client library that supports high performance, batch operations and pipeline capabilities, such as Jedis or Lettuce.
4. Monitoring and failure recovery:
- Pay close attention to Redis' performance metrics (such as CPU, memory, network bandwidth, etc.) and the status of client programs when performing large-scale deletion operations.
- Prepare to deal with possible abnormal situations, such as disconnection and retry, data consistency check, etc.
Based on Jedis client implementation
import ; import ; import ; public class RedisDataDeleter { private static final int SCAN_BATCH_SIZE = 1000; // Can be adjusted according to actual conditions private static final String MATCH_PATTERN = "*"; // Match all keys public void deleteAllKeys(Jedis jedis) { ScanParams scanParams = new ScanParams().count(SCAN_BATCH_SIZE).match(MATCH_PATTERN); String cursor = "0"; while (true) { ScanResult<String> scanResult = (cursor, scanParams); cursor = (); List<String> keysToDelete = (); if (!()) { // Use Pipeline Batch Delete Key Pipeline pipeline = (); for (String key : keysToDelete) { (key); } (); // Execute batch commands } if ("0".equals(cursor)) { break; // Scan complete } } } }
Notice
- Make sure the SCAN_BATCH_SIZE parameter is adjusted appropriately in production so that it can make full use of system resources without putting too much pressure on the Redis server.
- Before performing a large-scale deletion operation, it is best to back up important data and operate during off-peak periods to reduce the impact on the business.
If conditions permit, it is recommended to upgrade to the Redis version and enable the activedefrag configuration item, which helps to defragment in time after deleting a large amount of data and maintain efficient utilization of Redis memory. At the same time, monitor Redis's memory usage and fragmentation rate, and manually trigger the BGREWRITEAOF or BGSAVE operation if necessary.
maven
<dependencies> <!-- ... Other dependencies ... --> <dependency> <groupId></groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> <!-- Adjust according to the actual version number --> </dependency> </dependencies>
jedis connection pool configuration
=192.168.1.100 =6379 =mysecretpassword # If you have a password, please fill in it # Jedis connection pool configuration-active=10 -idle=6 -idle=2 -wait=2000ms
jedisConfig
@Configuration public class JedisConfig { @Value("${}") private String host; @Value("${}") private int port; @Value("${}") private String password; @Bean public JedisPool jedisPool() { JedisPoolConfig poolConfig = new JedisPoolConfig(); ((("-active"))); ((("-idle"))); ((("-idle"))); ((("-wait"))); return new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, password); } }
Implement Redis data deletion service
@Service public class RedisDataDeleterService { @Autowired private JedisPool jedisPool; public void deleteAllKeys() { try (Jedis jedis = ()) { ScanParams scanParams = new ScanParams().match("*").count(1000); String cursor = "0"; while (true) { ScanResult<String> scanResult = (cursor, scanParams); cursor = (); List<String> keysToDelete = (); if (!()) { Pipeline pipeline = (); for (String key : keysToDelete) { (key); } (); } if ("0".equals(cursor)) { break; } } } } }
Call the delete service
@RestController @RequestMapping("/redis") public class RedisController { @Autowired private RedisDataDeleterService redisDataDeleterService; @GetMapping("/delete-all-keys") public ResponseEntity<?> deleteAllKeys() { (); return ().build(); } }
Based on Lettuce
maven
<dependencies> <!-- ... Other dependencies ... --> <dependency> <groupId></groupId> <artifactId>lettuce-core</artifactId> <version>6.2.¼</version> <!-- Adjust according to the actual version number --> </dependency> </dependencies>
Configure Lettuce
Spring Boot automatic configuration provides connection pooling support for Lettuce. Configure Redis connection information in or:
=192.168.1.100 =6379 =mysecretpassword # If you have a password, please fill in it
Use the Lettuce client to perform batch deletion:
@Service public class RedisDataDeleterService { @Autowired private RedisConnectionFactory connectionFactory; public void deleteAllKeys() { RedisAsyncCommands<String, String> asyncCommands = ().async(); ScanArgs scanArgs = ("*").count(1000); RedisFuture<ScanResult<String>> scanFuture = (, scanArgs); AtomicBoolean isRunning = new AtomicBoolean(true); AtomicReference<ScanCursor> lastCursor = new AtomicReference<>(); // Asynchronously process scan results (scanResult -> { (()); List<String> keysToDelete = (); if (!()) { RedisFuture<Long> delFuture = ((new String[0])); (count -> { if (()) { // If it is still running, continue scanning deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning); } }); } else { (false); } }); // Set the timeout time (can be adjusted according to actual conditions) (() -> { try { (120000); // 2 minutes timeout } catch (InterruptedException e) { ().interrupt(); } (false); }); } private void deleteAllKeysRecursive(RedisAsyncCommands<String, String> asyncCommands, ScanArgs scanArgs, AtomicReference<ScanCursor> lastCursor, AtomicBoolean isRunning) { if (()) { ((), scanArgs).thenAccept(scanResult -> { (()); List<String> keysToDelete = (); if (!()) { ((new String[0])).thenAccept(count -> { if (()) { deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning); } }); } else { (false); } }); } } }
Call
@RestController @RequestMapping("/redis") public class RedisController { @Autowired private RedisDataDeleterService redisDataDeleterService; @GetMapping("/delete-all-keys") public ResponseEntity<?> deleteAllKeys() { (); return ().build(); } }
This is the end of this article about the implementation of pipeline operation in Redis. For more related content on pipeline operation in Redis, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!