Several very useful concurrency tool classes are provided in the JDK concurrency package. CountDownLatch, CyclicBarrier and Semaphore tools provide a means of concurrent process control, and Exchange tools provide a method of exchanging data between threads.
They're all inPack it. First, let’s summarize what tool classes are there and what their functions are, and then introduce their main usage methods and principles respectively.
kind | effect |
---|---|
CountDownLatch | Thread waits until the counter decreases to 0 to start working |
CyclicBarrier | The function is similar to CountDownLatch, but it can be reused |
Semaphore | Limit the number of threads |
Exchanger | Two threads exchange data |
The following are the categories.
CountDownLatch
Overview
CountDownLatch
One or more threads can be made to wait for other threads to execute before executing.
CountDownLatch
Define a counter, and aBlocking queue, Before the counter value is decremented to 0, the threads in the blocking queue are in a suspended state. When the counter is decremented to 0, all threads in the blocking queue will be awakened. The counter here is a flag, which can represent a task and a thread, or aCountdown timer。
Case
When playing the chicken-eating game, before officially starting the game, some pre-scenes will definitely be loaded, such as: "Loading Map", "Loading Character Model", "Loading Background Music", etc.
public class CountDownLatchDemo { // Define pre-task thread static class PreTaskThread implements Runnable { private String task; private CountDownLatch countDownLatch; public PreTaskThread(String task, CountDownLatch countDownLatch) { = task; = countDownLatch; } @Override public void run() { try { Random random = new Random(); ((1000)); (task + " - Mission completed"); (); } catch (InterruptedException e) { (); } } } public static void main(String[] args) { // Suppose there are three modules that need to be loaded CountDownLatch countDownLatch = new CountDownLatch(3); // Main mission new Thread(() -> { try { ("Waiting for data to load..."); (("There are %d pre-tasks", ())); (); ("The data is loaded and the game is officially started!"); } catch (InterruptedException e) { (); } }).start(); // Pre-task new Thread(new PreTaskThread("Loading map data", countDownLatch)).start(); new Thread(new PreTaskThread("Loading the character model", countDownLatch)).start(); new Thread(new PreTaskThread("Loading background music", countDownLatch)).start(); } }
Output:
Waiting for data to load...
There are 3 pre-tasks
Loading map data - task completion
Loading character model - task completion
Loading background music - task completion
The data is loaded and the game is officially started!
principle
The CountDownLatch method is very simple, as follows:
//Construction method:public CountDownLatch(int count) public void await() // waitpublic boolean await(long timeout, TimeUnit unit) // Timeout waitingpublic void countDown() // count - 1 public long getCount() // Get how many counts are there currently
CountDownLatch
In the constructorThe count value is actually the number of threads to wait for when locking. This value can only be set once, and CountDownLatchNo mechanism is provided to reset this count value。
The first interaction with CountDownLatch is when the main thread waits for other threads. The main thread must be called immediately after starting other threads()
method. In this way, the operation of the main thread will block on this method until the other threads complete their respective tasks.
The other N threads must reference the locked object because they need to notify the CountDownLatch object that they have completed their respective tasks. This notification mechanism is()
Method is completed; every time this method is called, it is initialized in the constructorcount
The value is reduced by 1. So when N threads call this method,count
The value is equal to 0, and then the main thread can passawait()
Method, resume execution of your own tasks.
Source code analysis
CountDownLatch has an internal class called Sync, which inherits the AbstractQueuedSynchronizer class, which maintains an integer.state
, and ensures modificationstate
The visibility and atomicity of the source code are as follows:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
When creating a CountDownLatch instance, a Sync instance will also be created, and the counter value is passed to the Sync instance. The source code is as follows:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); = new Sync(count); }
existcountDown
In the method, only the Sync instance is calledreleaseShared
The method, the source code is as follows:
public void countDown() { (1); }
Among themreleaseShared
Method: First, decrement the counter. If the counter after decrement is 0, wake upawait method blocksThe source code of all threads is as follows:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //Decrement the counter by one doReleaseShared();//If the counter is 0, wake up all threads blocked by the await method return true; } return false; }
Among themtryReleaseShared
Method, first get the value of the current counter. If the counter is 0, it will be returned directly; if it is not 0, use the CAS method to decrement the counter by 1, the source code is as follows:
protected boolean tryReleaseShared(int releases) { for (;;) {//The dead loop, if the CAS operation fails, it will continue to try. int c = getState();//Get the value of the current counter. if (c == 0)// When the counter is 0, it will return directly. return false; int nextc = c-1; if (compareAndSetState(c, nextc))// Use CAS method to reduce the counter by 1 return nextc == 0;//If the operation is successful, return whether the counter is 0 } }
existawait
In the method, only the Sync instance is calledacquireSharedInterruptibly
The method, the source code is as follows:
public void await() throws InterruptedException { (1); }
inacquireSharedInterruptibly
Method, determine whether the counter is 0. If it is not 0, the current thread will be blocked. The source code is as follows:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//Discern whether the counter is 0 doAcquireSharedInterruptibly(arg);//If it is not 0, the current thread will be blocked}
intryAcquireShared
The method is a template method in AbstractQueuedSynchronizer. Its specific implementation is in the Sync class. It mainly determines whether the counter is zero. If it is zero, it returns 1. If it is not zero, it returns -1. The source code is as follows:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
CyclicBarrier
Overview
CyclicBarrier is translated as Cyclic fence (Barrier), and its roughly meaning is to realize a recyclable barrier.
The function of CyclicBarrier is to make a group of threads wait for each other. When a common point is reached, all threads that have been waiting for will continue to execute. The CyclicBarrier function can be reused and usedreset()
Method to reset the barrier.
Case
Use the same example of playing games. If you have multiple "levels" to play a game, then using CountDownLatch is obviously not suitable, and you need to create an instance for each level. Then we can use CyclicBarrier to implement the data loading and waiting function for each level.
public class CyclicBarrierDemo { static class PreTaskThread implements Runnable { private String task; private CyclicBarrier cyclicBarrier; public PreTaskThread(String task, CyclicBarrier cyclicBarrier) { = task; = cyclicBarrier; } @Override public void run() { // Assume that there are three levels in total for (int i = 1; i < 4; i++) { try { Random random = new Random(); ((1000)); (("The task of level %d is completed", i, task)); (); } catch (InterruptedException | BrokenBarrierException e) { (); } } } } public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { ("All pre-tasks in this level are completed, start the game..."); }); new Thread(new PreTaskThread("Loading map data", cyclicBarrier)).start(); new Thread(new PreTaskThread("Loading the character model", cyclicBarrier)).start(); new Thread(new PreTaskThread("Loading background music", cyclicBarrier)).start(); } }
Output:
Level 1 task loading background music completed
Level 1 task loading map data is completed
Level 1 task loading character model completed
All pre-tasks in this level are completed, and the game starts...
Level 2 task loading character model completed
Level 2 task loading background music completed
Level 2 task loading map data is completed
All pre-tasks in this level are completed, and the game starts...
Level 3 task loading background music completed
Level 3 task loading map data is completed
Level 3 task loading character model complete
All pre-tasks in this level are completed, and the game starts...
There are some differences from CountDownLatch. CyclicBarrier is not divided intoawait()
andcountDown()
, but only oneawait()
method.
Once calledawait()
The number of threads of the method is equal to the total number of tasks passed in the construction method, which means that the barrier has been reached. CyclicBarrier allows us to perform a task when we reach the barrier, and we can pass an object of type Runnable in the constructor.
Source code analysis
Constructor:
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); = parties; = parties; = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
defaultbarrierAction
It is null, this parameter is the Runnable parameter. The task executed when the last thread reaches it. The above case is when the barrier is reached, "All the pre-tasks of this level are completed, and the game starts..." is output.parties
is the number of threads involved.
Let's take a lookawait
There are two overloads in the method, the difference is whether there is a waiting timeout, the source code is as follows:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, (timeout)); }
Focus ondowait()
, the core logic is this method, the source code is as follows:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = ; (); try { // An instance will be generated every time the barrier is used final Generation g = generation; // If it is destroyed, throw an exception if () throw new BrokenBarrierException(); // Thread interrupt detection if (()) { breakBarrier(); throw new InterruptedException(); } // The remaining number of waiting threads int index = --count; // When the last thread arrives if (index == 0) { // tripped // Mark whether the task is executed (that is, the runable parameter passed in) boolean ranAction = false; try { final Runnable command = barrierCommand; // Execute tasks if (command != null) (); ranAction = true; // After completion, proceed to the next set of initialization generation Initialize count and wake up all waiting threads nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // When index is not 0, enters spin for (;;) { try { // First judge the timeout. If you don't have timeout, continue to wait if (!timed) (); // If the specified time is exceeded, the call to awaitNanos timeout releases the lock else if (nanos > 0L) nanos = (nanos); // Interrupt exception capture } catch (InterruptedException ie) { // Determine whether it is damaged if (g == generation && ! ) { breakBarrier(); throw ie; } else { // Otherwise, the current thread will be interrupted ().interrupt(); } } // Abnormal thrown by being destroyed if () throw new BrokenBarrierException(); // Return if you call normally if (g != generation) return index; // When the timeout is awakened, call breakBarrier() if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { (); } }
To summarizedowait()
The logic of the method:
- After the thread is called, the barrier status and thread status will be checked, and the exception status will be interrupted.
- When initializing CyclicBarrier, the resource value count set will be performed
--count
。 - When the first 9 threads of 10 threads are executed
dowait()
After that, becausecount!=0
, so it willfor(;;)
, the Condition will be executed internally()
Method, block. - Conditions for ending blocking include: timeout, wake-up, and thread interruption.
- When the 10th thread is executed
dowait()
After that, becausecount==0
, the command content will be checked and executed first. - Final execution
nextGeneration()
, called internally()
Wake up all()
thread.
How to recover if it is damaged? Let's take a lookreset()
The method, the source code is as follows:
public void reset() { final ReentrantLock lock = ; (); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { (); } }
The source code is very simple. After break, a new instance will be regenerated, and the corresponding count will be re-initialized.dowait
insideindex==0
Called alsonextGeneration
, so it can be recycled.
Differences from CountDonwLatch
CountDownLatch minus count, CyclicBarrier add count.
CountDownLatch is one-time and CyclicBarrier can be reused.
CountDownLatch and CyclicBarrier both have the meaning of letting multiple threads wait for synchronization before starting the next action, but the next action implementer of CountDownLatch is the main thread, which is not repeatable; while the next action implementer of CyclicBarrier is still the "other threads" itself, which has the characteristics of implementing actions repeatedly.
Semaphore
Overview
Semaphore is generally translated as semaphore. It is also a thread synchronization tool, mainly used by multiple threads to perform parallel operations on shared resources. It represents a concept of permission, whether multiple threads allow permission to operate on the same resource, and using Semaphore can control the number of threads to access resources concurrently.
Use scenarios
Semaphore usage scenarios are mainly usedFlow control。
For example, for database connections, there will be a limit on the number of database connections used at the same time, and the database connections cannot exceed a certain number. When the connection reaches the limited number, the subsequent threads can only queue up and wait for the previous thread to release the database connection before obtaining the database connection.
For example, in the parking lot scene, a parking lot has a limited number of parking spaces and can accommodate how many cars it can accommodate. After the parking space is full, you can only enter when the cars inside leave the cars outside the parking lot.
Case
Simulate the business scenario of a parking lot:
Before entering the parking lot, there will be a sign showing how many parking spaces are left. When the parking space is 0, the parking lot cannot be entered. When the parking space is not 0, vehicles will be allowed to enter the parking lot. So there are several key factors in the parking lot: the total capacity of the parking lot space, when a car enters, the total capacity of the parking lot space - 1, when a car leaves, the total capacity + 1, when the parking lot space is insufficient, the vehicle can only wait outside the parking lot.
public class SemaphoreDemo { private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < 100; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { ("welcome " + ().getName() + "Come to the parking lot"); // Determine whether parking is allowed if (() == 0) { ("There are insufficient parking spaces, please wait patiently"); } try { // Try to get (); (().getName() + "Enter the parking lot"); (new Random().nextInt(10000));// Simulate the time when a vehicle stays in the parking lot (().getName() + "Take out of the parking lot"); (); } catch (InterruptedException e) { (); } } }, i + "Car No. 1"); (); } } }
The initial capacity of Semaphore is that there are only 10 parking spaces. We use these 10 parking spaces to control the flow of 100 cars, so the result is very similar to what we expected, that is, most cars are waiting. But at the same time, some cars are still allowed to enter the parking lot. If the vehicles entering the parking lot, they willTake up a parking space and drive out of the parking lot,
Give up a parking space and let the car behind enter again.
principle
There is a synchronizer that inherits AQS inside Semaphore, which has rewritten ittryAcquireShared
method. In this method, you will try to obtain resources.
If the acquisition fails (the desired number of resources is less than the number of resources currently available), a negative number will be returned (indicating that the attempt to obtain the resource has failed). Then the current thread will enter the AQS waiting queue.
Exchanger
Overview
The Exchanger class is used to exchange data between two threads. It supports generics, which means you can transfer any data between two threads. After completing a certain transaction, a thread wants to exchange data with another thread, the first thread that takes out the data first will wait for the second thread until the second thread arrives with the data.
Case
Case 1: Student A and Student B exchanged their own collection of blockbusters.
public class ExchangerDemo { public static void main(String[] args) throws InterruptedException { Exchanger<String> stringExchanger = new Exchanger<>(); Thread studentA = new Thread(() -> { try { String dataA = "Captain A has collected blockbusters for many years"; String dataB = (dataA); ("Student A got it" + dataB); } catch (InterruptedException e) { (); } }); ("A student A is blocked at this time, waiting for a blockbuster film from student B."); (1000); Thread studentB = new Thread(() -> { try { String dataB = "Big movies that have been collected by classmate B for many years"; String dataA = (dataB); ("Student B got it" + dataA); } catch (InterruptedException e) { (); } }); (); (); } }
Output:
At this time, classmate A was blocked and was waiting for classmate B's blockbuster movie
Classmate A got a blockbuster film that Classmate B collected for many years
Classmate B got a blockbuster film that Classmate A has collected for many years
You can see that when a thread callsexchange
After the method, it is in a blocking state, only if another thread also calls itexchange
method, it will continue to execute downward.
The Exchanger class also has a method with timeout parameters. If no other thread calls exchange within the specified time, a timeout exception will be thrown.
public V exchange(V x, long timeout, TimeUnit unit)
Case 2: Classmate A was released from pigeons and the transaction failed.
public class ExchangerDemo { public static void main(String[] args) { Exchanger<String> stringExchanger = new Exchanger<>(); Thread studentA = new Thread(() -> { String dataB = null; try { String dataA = "Captain A has collected blockbusters for many years"; dataB = (dataA,5, ); ("Student A got it" + dataB); } catch (InterruptedException e) { (); } catch (TimeoutException e) { ("Waiting for timeout-TimeoutException"); } ("Student A got it:"+dataB); }); (); } }
Output:
Waiting for timeout-TimeoutException
Classmate A got: null
principle
The key technologies at the bottom of the Exchanger class are:
- Use CAS spin instructions to complete data exchange;
- Using LockSupport
park
Method to make the switch thread enter sleep waiting, using LockSupport'sunpark
Method wakes up the waiting thread. - In addition, a Node object is declared for storing exchange data.
Exchanger is generally used to exchange data in memory more conveniently between two threads. Because it supports generics, we can transmit any data, such as IO streams or IO caches. According to the comments in JDK, it can be summarized as follows:
- This type of external operations are synchronized;
- Used to exchange data between threads that occur in pairs;
- It can be regarded as a two-way synchronization queue;
- It can be applied to scenarios such as genetic algorithms, assembly line design, etc.
It should be noted that exchange can be reused. That is to say, two threads can use Exchanger to continuously exchange data in memory.
summary
This article introduces several concurrent tool classes provided in JDK in conjunction with some application scenarios, and briefly analyzes the usage principles and business scenarios. When working, once there are corresponding business scenarios, you can try these tool classes.
The above is a detailed analysis of the use of concurrent tool classes in Java. For more information about Java concurrent tool classes, please pay attention to my other related articles!