How Java uses thread pool and Redis to achieve efficient data entry
Using thread pools and Redis to achieve efficient data entry
Data repository is a challenging task in a high concurrency environment.
This article will introduce how to use thread pools and Redis to implement real-time cache and batch in-store processing to ensure system performance and stability.
Introduction to main ideas and components
Overview of ideas
In the case of high concurrency, data inbound needs to solve two main problems: real-time and stability.
By first storing data in the Redis cache, large amounts of data requests can be quickly responded to and processed, and then periodically batched data from Redis from time to time with thread pools to reduce database pressure and improve overall performance.
Main components
- BatchDataStorageService: Core service class, responsible for real-time data caching and regular batch storage processing.
- CacheService: Simple cache service class, using ConcurrentHashMap to implement memory cache, for fast access and processing of data.
- RedisUtils: Encapsulates operations on Redis for persistent storage and high-speed reading of data.
- BatchWorker: Implements the Runnable interface to process the task of reading data from Redis and performing batch storage.
- BatchTimeoutCommitThread: Timely monitors whether the data reaches the set batch or timeout time, and triggers the data entry operation.
Detailed code analysis
- BatchDataStorageService
package ; import ; import ; import .slf4j.Slf4j; import ; import ; import ; import ; import ; import ; import ; import ; /** * Data batch storage service category */ @Component @Slf4j public class BatchDataStorageService implements InitializingBean { /** * Maximum batch quantity */ @Value("${:800}") private int maxBatchCount; /** * Maximum number of threads */ @Value("${:100}") private int maxBatchThreads; /** * Timeout time, unit milliseconds */ @Value("${:3000}") private int batchTimeout; /** * Current batch quantity */ private int batchCount = 0; /** * Current batch number */ private static long batchNo = 0; /** * Thread pool executor */ private ExecutorService executorService = null; /** * Cache service */ @Resource private CacheService cacheService; /** * Real-time equipment service */ @Resource private DeviceRealTimeService deviceRealTimeService; /** * Redis tool class */ @Resource private RedisUtils redisUtils; /** * Initialize thread pool */ @Override public void afterPropertiesSet() { executorService = (maxBatchThreads); } /** * Save real-time data of the device * * @param deviceRealTimeDTO device real-time data transfer object */ public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { final String failedCacheKey = "device:real_time:failed_records"; try { // Generate batch and duration cache keys String durationKey = "device:real_time:batchDuration" + batchNo; String batchKey = "device:real_time:batch" + batchNo; // If the current batch duration does not exist, create and start the timeout processing thread if (!(durationKey)) { (durationKey, ()); new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start(); } // Add real-time data of the device to the current batch (batchKey, deviceRealTimeDTO); if (++batchCount >= maxBatchCount) { // To reach the maximum batch, execute the inbound logic dataStorage(durationKey, batchKey, failedCacheKey); } } catch (Exception ex) { ("[DB:FAILED] The device reports an exception to record the batch collection: " + () + ", DeviceRealTimeDTO: " + (deviceRealTimeDTO), ex); (failedCacheKey, deviceRealTimeDTO); } finally { updateRealTimeData(deviceRealTimeDTO); } } /** * Update real-time data to Redis * * @param deviceRealTimeDTO device real-time data transfer object */ private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { ("real_time:" + (), (deviceRealTimeDTO)); } /** * Batch storage processing * * @param durationKey Duration mark * @param batchKey batch identification * @param failedCacheKey Error record identification */ private void dataStorage(String durationKey, String batchKey, String failedCacheKey) { batchNo++; batchCount = 0; (durationKey); if (batchNo >= Long.MAX_VALUE) { batchNo = 0; } (new BatchWorker(batchKey, failedCacheKey)); } /** * Batch worker threads */ private class BatchWorker implements Runnable { private final String failedCacheKey; private final String batchKey; public BatchWorker(String batchKey, String failedCacheKey) { = batchKey; = failedCacheKey; } @Override public void run() { final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>(); try { // Get batch data from cache DeviceRealTimeDTO deviceRealTimeDTO = (batchKey); while (deviceRealTimeDTO != null) { (deviceRealTimeDTO); deviceRealTimeDTO = (batchKey); } long timeMillis = (); try { // Convert DTO to entity objects and batch into the library List<DeviceRealTimeEntity> deviceRealTimeEntityList = (deviceRealTimeDTOList, ); (deviceRealTimeEntityList); } finally { (batchKey); ("[DB:BATCH_WORKER] Batch:" + batchKey + ", save the number of records reported by the device:" + () + ", time consuming:" + (() - timeMillis) + "ms"); } } catch (Exception e) { ("[DB:FAILED] The device reporting record failed to enter the batch of database:" + () + ", DeviceRealTimeDTO: " + (), e); for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) { (failedCacheKey, deviceRealTimeDTO); } } } } /** * Batch timeout submission thread */ class BatchTimeoutCommitThread extends Thread { private final String batchKey; private final String durationKey; private final String failedCacheKey; public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) { = batchKey; = durationKey; = failedCacheKey; ("batch-thread-" + batchKey); } @Override public void run() { try { (batchTimeout); } catch (InterruptedException e) { ("[DB] Internal error, submit directly:" + ()); } if ((durationKey)) { // The maximum batch timeout is reached, and the inbound logic is executed dataStorage(durationKey, batchKey, failedCacheKey); } } } }
- CacheService
package ; import ; import ; import ; import ; import ; import ; /** * Cache service class, providing a simple caching mechanism */ @Component public class CacheService implements InitializingBean { /** * Memory cache, used to store data */ private Map<String, Object> objectCache = new ConcurrentHashMap<>(); /** * Statistical cache, used for counting */ private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>(); /** * Initialize the statistics cache */ @Override public void afterPropertiesSet() { ("terminals", new AtomicLong(0)); ("connections", new AtomicLong(0)); } /** * Increase the count of specified statistics * * @param statName Statistics name * @return The incremented count value */ public long incr (String statName) { (statName, new AtomicLong(0)); return (statName).incrementAndGet(); } /** * Reduce the count of specified statistics * * @param statName Statistics name * @return The reduced count value */ public long decr(String statName) { (statName, new AtomicLong(0)); return (statName).decrementAndGet(); } /** * Get the current count value of the specified statistics * * @param statName Statistics name * @return Current count value */ public long stat(String statName) { (statName, new AtomicLong(0)); return (statName).get(); } /** * Store data * * @param key cache key * @param object caches data */ public <T> void put(String key, T object) { (key, object); } /** * Get data * * @param key cache key * @return Cache data */ public <T> T get(String key) { return (T) (key); } /** * Delete data * * @param key cache key */ public void remove(String key) { (key); } /** * Store hash table data * * @param key hash table key * @param subkey hash table subkey * @param value hash table value */ public void hSet(String key, String subkey, Object value) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) (key, k -> new ConcurrentHashMap<>()); (subkey, value); } } /** * Get hash table data * * @param key hash table key * @param subkey hash table subkey * @return hash table value */ public <T> T hGet(String key, String subkey) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) (key); return submap != null ? (T) (subkey) : null; } } /** * Determine whether the hash table subkey exists * * @param key hash table key * @param subkey hash table subkey * @return Does it exist */ public boolean hExists(String key, String subkey) { synchronized (objectCache) { Map<String, Object> submap = (Map<String, Object>) (key); return submap != null && (subkey); } } /** * Push data into the list * * @param key List key * @param value List value */ public void lPush(String key, Object value) { synchronized (objectCache) { LinkedList<Object> queue = (LinkedList<Object>) (key, k -> new LinkedList<>()); (value); } } /** * Pop up data from the list * * @param key List key * @return List value */ public <T> T lPop(String key) { synchronized (objectCache) { LinkedList<Object> queue = (LinkedList<Object>) (key); return queue != null && !() ? (T) () : null; } } /** * Delete cached data * * @param key cache key */ public void del(String key) { (key); } /** * Determine whether the cache key exists * * @param key cache key * @return Does it exist */ public boolean exists(String key) { return (key); } }
Detailed explanation
BatchDataStorageService
Fields and Initialization:
-
maxBatchCount
: The maximum batch size specified in the configuration file to control the amount of data processed per batch. -
maxBatchThreads
: The maximum number of threads in the thread pool, affecting the processing concurrency ability. -
batchTimeout
: Batch timeout time, used to control the latest time for data processing. -
batchCount
: The number of data strips in the current batch, used to determine whether batch data needs to be submitted. -
batchNo
: Batch number, used to identify different batches. -
executorService
: A thread pool used to perform batch processing tasks. -
cacheService
、deviceRealTimeService
、redisUtils
: Used for cache operations, data storage and Redis operations respectively.
Method details:
-
afterPropertiesSet
: Initialize the thread pool to perform batch processing tasks in subsequent operations. -
saveRealTimeData
:- Push real-time data into the cache to check whether batch data needs to be submitted.
- If the timeout or the amount of data reaches the threshold, call
dataStorage
Methods to process data.
-
updateRealTimeData
: Update data to Redis to ensure real-time data availability. -
dataStorage
:- Performs storage operations for batch data and submits worker threads to process data.
-
BatchWorker
:- Get data from the cache and perform in-store operations, log successful data, and put failed data into the failed cache.
-
BatchTimeoutCommitThread
:- Processing batch timeout logic, data will be submitted even if the batch is not full, ensuring that data is processed in a timely manner.
CacheService
Fields:
-
objectCache
: Used to store ordinary cached data. -
statCache
: Used to store statistics, such as counters, etc.
Method details:
-
put/get/remove
: Basic caching operations, supporting storage, acquisition and deletion of data. -
hSet/hGet/hExists
:- Operations on the hash table, supporting setting, getting and checking key-value pairs.
-
lPush/lPop
:- Operation on the list, supporting push and pop-up data.
-
incr/decr/stat
:- Operations on statistics to support increasing, decreasing and getting current values.
Summarize
This article introduces how to use thread pools and Redis to achieve efficient data in databases in high concurrency environments. By first storing data into the Redis cache and using thread pools to process the in-store operations regularly, the performance and stability of the system can be effectively improved. The complete code includes the core BatchDataStorageService service class, CacheService cache service class, and RedisUtils tool class, all of which provide detailed comments and analysis so that readers can understand and implement similar high-concurrency data processing systems.
The above is personal experience. I hope you can give you a reference and I hope you can support me more.
Related Articles
Detailed explanation of the relationship between Java integer number and network endianness byte[] array conversion
This article mainly introduces the conversion relationship between Java integer numbers and network endian byte[] array, and summarizes and organizes various situations of conversion between Java integer numbers and network endian byte[] in combination with examples. Friends who need it can refer to it2017-08-08Lombok installation and use tips
This article mainly introduces the Lombok installation and use guide. The article focuses on the topic and has certain reference value. Friends who need it can refer to it.2022-05-05Use Spring Security to integrate mobile phone verification code login function to achieve
This article introduces in detail how to use SpringSecurity to realize the registration and login functions of mobile phone verification code. During the login process, verification must also be carried out through the verification code. The article also provides relevant code implementations.2024-10-10Detailed explanation of Java Bean Validation usage example
This article mainly introduces a detailed explanation of Java Bean Validation usage examples. Friends in need can refer to it for reference. I hope it can be helpful. I wish you more progress and get a promotion as soon as possible.2022-11-11Mybatis Parameterless constructor
This article mainly introduces the importance and application of the parameterless constructor in MyBatis. The parameterless constructor has special significance in Java classes. It ensures that objects can be created and initialized even without providing any parameters. Let me introduce it below.2024-10-10Detailed explanation of frame decoder in netty
Netty provides us with some suitable frame decoders, and we can effectively simplify our work by using these frame decoders. This article mainly introduces the frame decoder in netty. Friends who need it can refer to it.2022-04-04SpringBoot Actuator unauthorized access vulnerability troubleshooting and solutions
Spring Boot Actuator is an important tool for developing and managing production-level Spring Boot applications. It can help you ensure the stability and performance of the application. This article introduces you to the troubleshooting and solution of SpringBoot Actuator's unauthorized access vulnerabilities. Friends who need it can refer to it.2024-05-05Detailed explanation of Mybatis mapping file rules example
In the mapping file, the mapper element is the root element of the mapping file, and other tags are its sub-elements. The following article mainly introduces relevant information about the rules of Mybatis mapping file. Friends who need it can refer to it.2022-04-04Detailed explanation of the problem that Spring AOP custom repeatable annotations does not take effect
This article mainly introduces the problem that Spring AOP custom repeatable annotations have not taken effect. The example code is introduced in this article in detail and has certain reference value. Interested friends can refer to it.2021-08-08WeChat public account development - examples of custom menu creation and menu event response
This article mainly introduces the creation of WeChat public account development - custom menus and examples of menu event response. It has certain reference value. Interested friends can refer to it.2016-12-12