How to use springboot multithreading (declarative)?
1. springboot provides annotation @Async to use thread pool. The specific usage method is as follows:
(1) Add @EnableAsync to start the thread pool in the startup class (configuration class)
(2) Add annotation @Async on the method that needs to open the child thread
Therefore, you need to configure a custom thread pool when using it, as follows:
@Configuration @EnableAsync public class ThreadPoolTaskConfig { @Bean("threadPoolTaskExecutor")//Custom thread pool name public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //The number of core threads created by the thread pool and the minimum number of threads maintained by the thread pool will survive even if there is no task to be executed. (16); //If allowCoreThreadTimeout=true (default false), the core thread will time out and close //(true); //Blocking queue When the number of core threads reaches the maximum, the new task will be placed in the queue waiting for execution (124); //Maximum number of thread pools, when the number of threads >=corePoolSize and the task queue is full. The thread pool creates a new thread to handle the task //When the task queue is full, and when the number of threads = maxPoolSize, the thread pool will refuse to process the task and throw an exception (64); //When the thread idle time reaches keepAliveTime, the thread will exit until the number of threads = corePoolSize //Accept thread idle time for 30 seconds, and be destroyed when the maxPoolSize thread arrives at idle time //If allowCoreThreadTimeout=true, it will be until the number of threads=0 (30); //The ThreadPoolTaskExecutor thread pool provided by spring has the setThreadNamePrefix() method. //The ThreadPoolExecutor thread pool provided by jdk does not have the setThreadNamePrefix() method ("Custom thread pool-"); // rejection-policy: rejection policy: How to handle new tasks when the number of threads has reached maxSize // CallerRunsPolicy(): handed over to the caller thread to run, such as the main thread; if the addition to the thread pool fails, the main thread will execute the task itself and will not wait for the thread in the thread pool to execute, (personal recommendation) // AbortPolicy(): This policy is the default policy of thread pool. If the thread pool queue is full and the task is dropped and a RejectedExecutionException exception is thrown. // DiscardPolicy(): If the thread pool queue is full, the task will be dropped directly and there will be no exceptions // DiscardOldestPolicy(): Discard the oldest task in the queue. When the queue is full, the earliest task entering the queue will be deleted and space will be made, and then try to join the queue. (new ()); //Set the thread pool is closed and wait for all tasks to complete before continuing to destroy other beans, so that the destruction of these asynchronous tasks will be preceded by the destruction of Redis thread pool (true); //Set the waiting time of tasks in the thread pool. If it exceeds this time, it will be forced to destroy it before it is destroyed to ensure that the application can be closed in the end rather than blocked. (60); (); return executor; } }
Method to enable child thread:Add annotation @Async("threadPoolTaskExecutor") to the method that needs to be started, where the parameters in the annotation are the name of the custom thread pool.
2. Custom annotations to implement multi-threaded transaction control
1. Custom annotations
This article uses the joint action of two annotations. The main thread is used as the coordinator and the child threads are participants.
package ; import ; import ; import ; import ; /** * Multithreaded transaction notes: Main transaction * * @author zlj * @since 2022/11/3 */ @Target({}) @Retention() public @interface MainTransaction { int value();//Number of child threads} package ; import ; import ; import ; import ; /** * Multithreaded transaction annotation: sub-transactions * * @author zlj * @since 2022/11/3 */ @Target({}) @Retention() public @interface SonTransaction { String value() default ""; }
explain:
Both annotations are used in methods and must be used with @Transactional(rollbackFor = )
- @MainTransaction annotationUsed in the caller, its parameters are required, and the parameter value is the number of threads opened by the method called in this method. For example, in the method called in this method, two methods have opened the child thread with @Async annotation, and the parameter is @MainTransaction(2). In addition, if the @MainTransaction annotation is not used, there is no multi-thread transaction execution directly (single-thread transactions of the method do not affect the method)
- @SonTransaction annotationUsed on the called party (a method to start the thread), no need to pass in parameters
content
The code is as follows:
package ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; /** * Multithreaded transactions * * @author zlj * @since 2022/11/3 */ @Aspect @Component public class TransactionAop { // Used to store the counter data of each thread (it will be deleted from the map after each execution) private static final Map<String, Object> map = new HashMap<>(); @Resource private PlatformTransactionManager transactionManager; @Around("@annotation(mainTransaction)") public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable { //The current thread name Thread thread = (); String threadName = (); //Initialize the counter CountDownLatch mainDownLatch = new CountDownLatch(1); CountDownLatch sonDownLatch = new CountDownLatch(());//The parameters in the @MainTransaction annotation are the number of child threads // Used to record the running status of the child thread, and it becomes true if there is a failure AtomicBoolean rollBackFlag = new AtomicBoolean(false); // Used to store exceptions of each child thread, insert custom exceptions of each thread into the first position of the vector, and insert other exceptions into the last position to avoid thread unsafe, so use vector instead of list Vector<Throwable> exceptionVector = new Vector<>(); (threadName + "mainDownLatch", mainDownLatch); (threadName + "sonDownLatch", sonDownLatch); (threadName + "rollBackFlag", rollBackFlag); (threadName + "exceptionVector", exceptionVector); try { ();//Execution method } catch (Throwable e) { (0, e); (true);//Child thread rollback ();// All child threads are released } if (!()) { try { // sonDownLatch waits until all child threads have completed the insertion operation, but the transaction has not been submitted yet (); ();//Avait of the child thread according to the rollBackFlag status, tell me whether to rollback or commit } catch (Exception e) { (true); (0, e); } } if ((exceptionVector)) { (threadName + "mainDownLatch"); (threadName + "sonDownLatch"); (threadName + "rollBackFlag"); (threadName + "exceptionVector"); throw (0); } } @Around("@annotation()") public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable { Object[] args = (); Thread thread = (Thread) args[ - 1]; String threadName = (); CountDownLatch mainDownLatch = (CountDownLatch) (threadName + "mainDownLatch"); if (mainDownLatch == null) { // When the main transaction is not annotated, the sub-transaction will be executed directly ();//The best way here is: hand over the thread above to call this method, but I did not find the corresponding API, so I can only give up the transaction directly. Welcome to optimize and leave a message to share return; } CountDownLatch sonDownLatch = (CountDownLatch) (threadName + "sonDownLatch"); AtomicBoolean rollBackFlag = (AtomicBoolean) (threadName + "rollBackFlag"); Vector<Throwable> exceptionVector = (Vector<Throwable>) (threadName + "exceptionVector"); //If a child thread has an error at this time, the current thread does not need to be executed if (()) { (); return; } DefaultTransactionDefinition def = new DefaultTransactionDefinition();// Start transactions (TransactionDefinition.PROPAGATION_REQUIRES_NEW);// Set transaction isolation level TransactionStatus status = (def); try { ();//Execution method ();// For sonDownLatch-1 ();// If mainDownLatch is not 0, the thread will block here until mainDownLatch becomes 0 // If you can execute this step, it means that all child threads have been executed and judged that if atomicBoolean is true, roll back false and submit if (()) { (status); } else { (status); } } catch (Throwable e) { (0, e); // rollback (status); // and set the status to true (true); (); (); } } }
Extended Description: What is CountDownLatch?
A synchronous auxiliary class
- When creating an object: Initialize CountDownLatch with the given number
- countDown() method: reduce count by 1
- await() method: Blocks the current thread until the current count reaches zero.
In this article:
MainDownLatch initialized with Count 1 is used as a simple on/off latch, or entry: All threads calling await are waiting at the entry before opening the entry through the thread calling countDown().
A sonDownLatch initialized with Number of Child Threads can make a thread wait until N threads complete an operation, or until an operation completes N times.
3. Use Demo annotation
Task method:
package ; import ; import ; import ; import ; /** * @author zlj * @since 2022/11/14 */ @Service public class SonService { /** * Parameter description: The following 4 method parameters are the same as this * * @param args Parameters that need to be passed in the business * @param thread The caller's thread, used to obtain parameters of aop, it is not recommended to briefly use method rewriting, * In the caller method, this parameter can be used as annotation parameter to calculate the number of child threads as annotation parameter to avoid thread parameter calculation errors causing lock table * When passing parameters, the parameters are fixed as: () */ @Transactional(rollbackFor = ) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod1(String args, Thread thread) { (args + "Thread is enabled"); } @Transactional(rollbackFor = ) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod2(String args1, String args2, Thread thread) { (args1 + "and" + args2 + "Thread is enabled"); } @Transactional(rollbackFor = ) @Async("threadPoolTaskExecutor") @SonTransaction public void sonMethod3(String args, Thread thread) { (args + "Thread is enabled"); } //sonMethod4 method does not use thread pool @Transactional(rollbackFor = ) public void sonMethod4(String args) { (args + "No thread is enabled"); } }
Caller:
package ; import ; import ; import ; import ; /** * @author zlj * @since 2022/11/14 */ @Service public class MainService { @Resource private SonService sonService; @MainTransaction(3)//The call method is called and the thread is opened using @Async, so the parameters are: 3 @Transactional(rollbackFor = ) public void test1() { sonService.sonMethod1("Luffy", ()); sonService.sonMethod2("Zoro", "Sanji", ()); sonService.sonMethod3("Nami", ()); sonService.sonMethod4("Robin"); } /* * Some businesses have multiple possibilities for if. If the number of methods (the method of opening threads) is different, you can choose to give up using @MainTransaction annotation to avoid locking tables. * If an exception occurs at this time, multiple threads cannot roll back at the same time. You can weigh whether to use it according to your business. */ @Transactional(rollbackFor = ) public void test2() { sonService.sonMethod1("Luffy", ()); sonService.sonMethod2("Zoro", "Sanji", ()); sonService.sonMethod3("Nami", ()); sonService.sonMethod4("Robin"); } }
Summarize
The above is personal experience. I hope you can give you a reference and I hope you can support me more.