SoFunction
Updated on 2025-03-06

A brief analysis of the use of concurrent tool classes in Java

Several very useful concurrency tool classes are provided in the JDK concurrency package. CountDownLatch, CyclicBarrier and Semaphore tools provide a means of concurrent process control, and Exchange tools provide a method of exchanging data between threads.

They're all inPack it. First, let’s summarize what tool classes are there and what their functions are, and then introduce their main usage methods and principles respectively.

kind effect
CountDownLatch Thread waits until the counter decreases to 0 to start working
CyclicBarrier The function is similar to CountDownLatch, but it can be reused
Semaphore Limit the number of threads
Exchanger Two threads exchange data

The following are the categories.

CountDownLatch

Overview

CountDownLatchOne or more threads can be made to wait for other threads to execute before executing.

CountDownLatchDefine a counter, and aBlocking queue, Before the counter value is decremented to 0, the threads in the blocking queue are in a suspended state. When the counter is decremented to 0, all threads in the blocking queue will be awakened. The counter here is a flag, which can represent a task and a thread, or aCountdown timer

Case

When playing the chicken-eating game, before officially starting the game, some pre-scenes will definitely be loaded, such as: "Loading Map", "Loading Character Model", "Loading Background Music", etc.

public class CountDownLatchDemo {
    // Define pre-task thread    static class PreTaskThread implements Runnable {

        private String task;
        private CountDownLatch countDownLatch;

        public PreTaskThread(String task, CountDownLatch countDownLatch) {
             = task;
             = countDownLatch;
        }

        @Override
        public void run() {
            try {
                Random random = new Random();
                ((1000));
                (task + " - Mission completed");
                ();
            } catch (InterruptedException e) {
                ();
            }
        }
    }

    public static void main(String[] args) {
        // Suppose there are three modules that need to be loaded        CountDownLatch countDownLatch = new CountDownLatch(3);

        // Main mission        new Thread(() -> {
            try {
                ("Waiting for data to load...");
                (("There are %d pre-tasks", ()));
                ();
                ("The data is loaded and the game is officially started!");
            } catch (InterruptedException e) {
                ();
            }
        }).start();

        // Pre-task        new Thread(new PreTaskThread("Loading map data", countDownLatch)).start();
        new Thread(new PreTaskThread("Loading the character model", countDownLatch)).start();
        new Thread(new PreTaskThread("Loading background music", countDownLatch)).start();
    }
}

Output:

Waiting for data to load...
There are 3 pre-tasks
Loading map data - task completion
Loading character model - task completion
Loading background music - task completion
The data is loaded and the game is officially started!

principle

The CountDownLatch method is very simple, as follows:

//Construction method:public CountDownLatch(int count)

public void await() // waitpublic boolean await(long timeout, TimeUnit unit) // Timeout waitingpublic void countDown() // count - 1
public long getCount() // Get how many counts are there currently

CountDownLatchIn the constructorThe count value is actually the number of threads to wait for when locking. This value can only be set once, and CountDownLatchNo mechanism is provided to reset this count value

The first interaction with CountDownLatch is when the main thread waits for other threads. The main thread must be called immediately after starting other threads()method. In this way, the operation of the main thread will block on this method until the other threads complete their respective tasks.

The other N threads must reference the locked object because they need to notify the CountDownLatch object that they have completed their respective tasks. This notification mechanism is()Method is completed; every time this method is called, it is initialized in the constructorcountThe value is reduced by 1. So when N threads call this method,countThe value is equal to 0, and then the main thread can passawait()Method, resume execution of your own tasks.

Source code analysis

CountDownLatch has an internal class called Sync, which inherits the AbstractQueuedSynchronizer class, which maintains an integer.state, and ensures modificationstateThe visibility and atomicity of the source code are as follows:

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

When creating a CountDownLatch instance, a Sync instance will also be created, and the counter value is passed to the Sync instance. The source code is as follows:

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
   = new Sync(count);
}

existcountDownIn the method, only the Sync instance is calledreleaseSharedThe method, the source code is as follows:

public void countDown() {
    (1);
}

Among themreleaseSharedMethod: First, decrement the counter. If the counter after decrement is 0, wake upawait method blocksThe source code of all threads is as follows:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //Decrement the counter by one        doReleaseShared();//If the counter is 0, wake up all threads blocked by the await method        return true;
    }
    return false;
}

Among themtryReleaseSharedMethod, first get the value of the current counter. If the counter is 0, it will be returned directly; if it is not 0, use the CAS method to decrement the counter by 1, the source code is as follows:

protected boolean tryReleaseShared(int releases) {
    for (;;) {//The dead loop, if the CAS operation fails, it will continue to try.        int c = getState();//Get the value of the current counter.        if (c == 0)// When the counter is 0, it will return directly.            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))// Use CAS method to reduce the counter by 1            return nextc == 0;//If the operation is successful, return whether the counter is 0    }
}

existawaitIn the method, only the Sync instance is calledacquireSharedInterruptiblyThe method, the source code is as follows:

public void await() throws InterruptedException {
    (1);
}

inacquireSharedInterruptiblyMethod, determine whether the counter is 0. If it is not 0, the current thread will be blocked. The source code is as follows:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (())
        throw new InterruptedException();
    if (tryAcquireShared(arg) &lt; 0)//Discern whether the counter is 0        doAcquireSharedInterruptibly(arg);//If it is not 0, the current thread will be blocked}

intryAcquireSharedThe method is a template method in AbstractQueuedSynchronizer. Its specific implementation is in the Sync class. It mainly determines whether the counter is zero. If it is zero, it returns 1. If it is not zero, it returns -1. The source code is as follows:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

CyclicBarrier

Overview

CyclicBarrier is translated as Cyclic fence (Barrier), and its roughly meaning is to realize a recyclable barrier.

The function of CyclicBarrier is to make a group of threads wait for each other. When a common point is reached, all threads that have been waiting for will continue to execute. The CyclicBarrier function can be reused and usedreset()Method to reset the barrier.

Case

Use the same example of playing games. If you have multiple "levels" to play a game, then using CountDownLatch is obviously not suitable, and you need to create an instance for each level. Then we can use CyclicBarrier to implement the data loading and waiting function for each level.

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {

        private String task;
        private CyclicBarrier cyclicBarrier;

        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
             = task;
             = cyclicBarrier;
        }

        @Override
        public void run() {
            // Assume that there are three levels in total            for (int i = 1; i &lt; 4; i++) {
                try {
                    Random random = new Random();
                    ((1000));
                    (("The task of level %d is completed", i, task));
                    ();
                } catch (InterruptedException | BrokenBarrierException e) {
                    ();
                }
            }
        }
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -&gt; {
            ("All pre-tasks in this level are completed, start the game...");
        });

        new Thread(new PreTaskThread("Loading map data", cyclicBarrier)).start();
        new Thread(new PreTaskThread("Loading the character model", cyclicBarrier)).start();
        new Thread(new PreTaskThread("Loading background music", cyclicBarrier)).start();
    }
}

Output:

Level 1 task loading background music completed
Level 1 task loading map data is completed
Level 1 task loading character model completed
All pre-tasks in this level are completed, and the game starts...
Level 2 task loading character model completed
Level 2 task loading background music completed
Level 2 task loading map data is completed
All pre-tasks in this level are completed, and the game starts...
Level 3 task loading background music completed
Level 3 task loading map data is completed
Level 3 task loading character model complete
All pre-tasks in this level are completed, and the game starts...

There are some differences from CountDownLatch. CyclicBarrier is not divided intoawait()andcountDown(), but only oneawait()method.

Once calledawait()The number of threads of the method is equal to the total number of tasks passed in the construction method, which means that the barrier has been reached. CyclicBarrier allows us to perform a task when we reach the barrier, and we can pass an object of type Runnable in the constructor.

Source code analysis

Constructor:

public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
   = parties;
   = parties;
   = barrierAction;
}

public CyclicBarrier(int parties) {
  this(parties, null);
}

defaultbarrierActionIt is null, this parameter is the Runnable parameter. The task executed when the last thread reaches it. The above case is when the barrier is reached, "All the pre-tasks of this level are completed, and the game starts..." is output.partiesis the number of threads involved.

Let's take a lookawaitThere are two overloads in the method, the difference is whether there is a waiting timeout, the source code is as follows:

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, (timeout));
}

Focus ondowait(), the core logic is this method, the source code is as follows:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {       
        final ReentrantLock lock = ;
        ();
        try {
            // An instance will be generated every time the barrier is used            final Generation g = generation;

            // If it is destroyed, throw an exception            if ()
                throw new BrokenBarrierException();

            // Thread interrupt detection            if (()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // The remaining number of waiting threads            int index = --count;
            // When the last thread arrives            if (index == 0) {  // tripped
                // Mark whether the task is executed (that is, the runable parameter passed in)                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // Execute tasks                    if (command != null)
                        ();
                    ranAction = true;
                    // After completion, proceed to the next set of initialization generation Initialize count and wake up all waiting threads                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // When index is not 0, enters spin            for (;;) {
                try {
                    // First judge the timeout. If you don't have timeout, continue to wait                    if (!timed)
                        ();
                        // If the specified time is exceeded, the call to awaitNanos timeout releases the lock                    else if (nanos &gt; 0L)
                        nanos = (nanos);
                        // Interrupt exception capture                } catch (InterruptedException ie) {
                    // Determine whether it is damaged                    if (g == generation &amp;&amp; ! ) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // Otherwise, the current thread will be interrupted                        ().interrupt();
                    }
                }

                // Abnormal thrown by being destroyed                if ()
                    throw new BrokenBarrierException();

                // Return if you call normally                if (g != generation)
                    return index;

                // When the timeout is awakened, call breakBarrier()                if (timed &amp;&amp; nanos &lt;= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            ();
        }
    }

To summarizedowait()The logic of the method:

  • After the thread is called, the barrier status and thread status will be checked, and the exception status will be interrupted.
  • When initializing CyclicBarrier, the resource value count set will be performed--count
  • When the first 9 threads of 10 threads are executeddowait()After that, becausecount!=0, so it willfor(;;), the Condition will be executed internally()Method, block.
  • Conditions for ending blocking include: timeout, wake-up, and thread interruption.
  • When the 10th thread is executeddowait()After that, becausecount==0, the command content will be checked and executed first.
  • Final executionnextGeneration(), called internally()Wake up all()thread.

How to recover if it is damaged? Let's take a lookreset()The method, the source code is as follows:

public void reset() {
    final ReentrantLock lock = ;
    ();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        ();
    }
}

The source code is very simple. After break, a new instance will be regenerated, and the corresponding count will be re-initialized.dowaitinsideindex==0Called alsonextGeneration, so it can be recycled.

Differences from CountDonwLatch

CountDownLatch minus count, CyclicBarrier add count.

CountDownLatch is one-time and CyclicBarrier can be reused.

CountDownLatch and CyclicBarrier both have the meaning of letting multiple threads wait for synchronization before starting the next action, but the next action implementer of CountDownLatch is the main thread, which is not repeatable; while the next action implementer of CyclicBarrier is still the "other threads" itself, which has the characteristics of implementing actions repeatedly.

Semaphore

Overview

Semaphore is generally translated as semaphore. It is also a thread synchronization tool, mainly used by multiple threads to perform parallel operations on shared resources. It represents a concept of permission, whether multiple threads allow permission to operate on the same resource, and using Semaphore can control the number of threads to access resources concurrently.

Use scenarios

Semaphore usage scenarios are mainly usedFlow control

For example, for database connections, there will be a limit on the number of database connections used at the same time, and the database connections cannot exceed a certain number. When the connection reaches the limited number, the subsequent threads can only queue up and wait for the previous thread to release the database connection before obtaining the database connection.

For example, in the parking lot scene, a parking lot has a limited number of parking spaces and can accommodate how many cars it can accommodate. After the parking space is full, you can only enter when the cars inside leave the cars outside the parking lot.

Case

Simulate the business scenario of a parking lot:

Before entering the parking lot, there will be a sign showing how many parking spaces are left. When the parking space is 0, the parking lot cannot be entered. When the parking space is not 0, vehicles will be allowed to enter the parking lot. So there are several key factors in the parking lot: the total capacity of the parking lot space, when a car enters, the total capacity of the parking lot space - 1, when a car leaves, the total capacity + 1, when the parking lot space is insufficient, the vehicle can only wait outside the parking lot.

public class SemaphoreDemo {

    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i &lt; 100; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    ("welcome " + ().getName() + "Come to the parking lot");
                    // Determine whether parking is allowed                    if (() == 0) {
                        ("There are insufficient parking spaces, please wait patiently");
                    }
                    try {
                        // Try to get                        ();
                        (().getName() + "Enter the parking lot");
                        (new Random().nextInt(10000));// Simulate the time when a vehicle stays in the parking lot                        (().getName() + "Take out of the parking lot");
                        ();
                    } catch (InterruptedException e) {
                        ();
                    }
                }
            }, i + "Car No. 1");
            ();
        }
    }
}

The initial capacity of Semaphore is that there are only 10 parking spaces. We use these 10 parking spaces to control the flow of 100 cars, so the result is very similar to what we expected, that is, most cars are waiting. But at the same time, some cars are still allowed to enter the parking lot. If the vehicles entering the parking lot, they willTake up a parking space and drive out of the parking lot,Give up a parking space and let the car behind enter again.

principle

There is a synchronizer that inherits AQS inside Semaphore, which has rewritten ittryAcquireSharedmethod. In this method, you will try to obtain resources.

If the acquisition fails (the desired number of resources is less than the number of resources currently available), a negative number will be returned (indicating that the attempt to obtain the resource has failed). Then the current thread will enter the AQS waiting queue.

Exchanger

Overview

The Exchanger class is used to exchange data between two threads. It supports generics, which means you can transfer any data between two threads. After completing a certain transaction, a thread wants to exchange data with another thread, the first thread that takes out the data first will wait for the second thread until the second thread arrives with the data.

Case

Case 1: Student A and Student B exchanged their own collection of blockbusters.

public class ExchangerDemo {

    public static void main(String[] args) throws InterruptedException {
        Exchanger&lt;String&gt; stringExchanger = new Exchanger&lt;&gt;();

        Thread studentA = new Thread(() -&gt; {
            try {
                String dataA = "Captain A has collected blockbusters for many years";
                String dataB = (dataA);
                ("Student A got it" + dataB);
            } catch (InterruptedException e) {
                ();
            }
        });

        ("A student A is blocked at this time, waiting for a blockbuster film from student B.");
        (1000);

        Thread studentB = new Thread(() -&gt; {
            try {
                String dataB = "Big movies that have been collected by classmate B for many years";
                String dataA = (dataB);
                ("Student B got it" + dataA);
            } catch (InterruptedException e) {
                ();
            }
        });

        ();
        ();
    }
}

Output:

At this time, classmate A was blocked and was waiting for classmate B's blockbuster movie
Classmate A got a blockbuster film that Classmate B collected for many years
Classmate B got a blockbuster film that Classmate A has collected for many years

You can see that when a thread callsexchangeAfter the method, it is in a blocking state, only if another thread also calls itexchangemethod, it will continue to execute downward.

The Exchanger class also has a method with timeout parameters. If no other thread calls exchange within the specified time, a timeout exception will be thrown.

public V exchange(V x, long timeout, TimeUnit unit)

Case 2: Classmate A was released from pigeons and the transaction failed.

public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger&lt;String&gt; stringExchanger = new Exchanger&lt;&gt;();
        Thread studentA = new Thread(() -&gt; {
            String dataB = null;
            try {
                String dataA = "Captain A has collected blockbusters for many years";
                dataB = (dataA,5, );
                ("Student A got it" + dataB);
            } catch (InterruptedException e) {
                ();
            } catch (TimeoutException e) {
                ("Waiting for timeout-TimeoutException");
            }
            ("Student A got it:"+dataB);
        });

        ();
    }
}

Output:

Waiting for timeout-TimeoutException
Classmate A got: null

principle

The key technologies at the bottom of the Exchanger class are:

  • Use CAS spin instructions to complete data exchange;
  • Using LockSupportparkMethod to make the switch thread enter sleep waiting, using LockSupport'sunparkMethod wakes up the waiting thread.
  • In addition, a Node object is declared for storing exchange data.

Exchanger is generally used to exchange data in memory more conveniently between two threads. Because it supports generics, we can transmit any data, such as IO streams or IO caches. According to the comments in JDK, it can be summarized as follows:

  • This type of external operations are synchronized;
  • Used to exchange data between threads that occur in pairs;
  • It can be regarded as a two-way synchronization queue;
  • It can be applied to scenarios such as genetic algorithms, assembly line design, etc.

It should be noted that exchange can be reused. That is to say, two threads can use Exchanger to continuously exchange data in memory.

summary

This article introduces several concurrent tool classes provided in JDK in conjunction with some application scenarios, and briefly analyzes the usage principles and business scenarios. When working, once there are corresponding business scenarios, you can try these tool classes.

The above is a detailed analysis of the use of concurrent tool classes in Java. For more information about Java concurrent tool classes, please pay attention to my other related articles!