SoFunction
Updated on 2025-03-08

Java concurrent container ConcurrentLinkedQueue parsing

Introduction to ConcurrentLinkedQueue

Collection classes commonly used in single-threaded programming, such as ArrayList and HashMap, but these classes are not thread-safe classes. To ensure thread safety, Vector can be used as an alternative, but it achieves thread safety at the method level by using synchronized exclusive locks, thus making multi-threaded execution serialized and inefficient. In addition, you can also use (List<T> list) to convert ArrayList to thread-safe, but it is still implemented through the synchronized modification method, and it is still not an efficient way.

For the commonly used data structure of queues, when solving thread safety problems, Doug Lea provides ConcurrentLinkedQueue, which is a thread-safe queue. It can be seen from the class name that it implements the data structure of the queue based on the linked list.

Node

Node node is a basic unit used in linked list data structures to form a ConcurrentLinkedQueue. A node contains two fields: one is item, which is used to store element data, and the other is next, which is used to point to the next node, thereby implementing the linked list structure.

In ConcurrentLinkedQueue, since concurrent operations are to be supported, the volatile keyword is used to modify the item and next fields of the node. The volatile keyword can ensure visibility in a multi-threaded environment, thus avoiding thread safety issues such as dirty reading.

In the ConcurrentLinkedQueue constructor, the head and tail nodes are initialized, and the head and tail are pointed to the same initial node. The item of this node is empty (null), indicating that there are no elements in the queue yet.

By constantly adding and deleting nodes, ConcurrentLinkedQueue can be implemented, which is thread-safe and has a high throughput due to the use of lock-free algorithm (CAS operation).

Several CAS operations for operating Node

In ConcurrentLinkedQueue, there are several methods for CAS operation of Node nodes (for CAS operation, you can read this article):

  • casItem(E cmp, E val): The data domain item used to compare and exchange nodes. This operation compares whether the node's item domain is equal to cmp, and if it is equal, replaces it with val. Return true means exchange is successful, and return false means exchange is failed.
  • lazySetNext(Node<E> val): Used to delay setting the next node pointer next. This operation will set the next field of the node to val, but it is not guaranteed to be visible immediately. Even after this operation, the next value of the node read by other threads may still be the old value. This kind of operation is usually better in performance than mandatory visibility.
  • casNext(Node<E> cmp, Node<E> val): Used to compare and exchange the next node pointer next of the node. This operation compares whether the next field of the node is equal to cmp, and if it is equal, replaces it with val. Return true means exchange is successful, and return false means exchange is failed.
//Change the data domain item in Nodeboolean casItem(E cmp, E val) {
    return (this, itemOffset, cmp, val);
}
//Change the pointer field next in Nodevoid lazySetNext(Node&lt;E&gt; val) {
    (this, nextOffset, val);
}
//Change the pointer field next in Nodeboolean casNext(Node&lt;E&gt; cmp, Node&lt;E&gt; val) {
    return (this, nextOffset, cmp, val);
}

It should be noted that these methods all implement CAS operations by calling relevant methods of the class. The Unsafe class is a class provided by the Java underlying layer, which supports direct operation of memory and concurrent operations. It provides some atomic operations methods, including methods such as compareAndSwapObject(). These methods use CMPXCHG instructions of the processor instruction set to implement atomic comparison and exchange operations.

CAS operations are very important in concurrent programming. They can avoid the use of lock mechanisms to achieve thread safety and improve concurrency performance. However, it should be noted that CAS operations are not applicable to all situations, and sometimes there may be ABA problems and other situations that need to be paid attention to.

Offer method

1. Single thread offer: When only one thread performs the offer operation, it inserts the element to the end of the queue. Since no other threads perform poll operations, the length of the queue will continue to grow.

2. Multiple threads offer: When multiple threads perform offer operations at the same time, they insert elements at the end of the queue. Because the offer operation is always performed at the end of the queue, and the poll operation is always performed at the head of the queue, these two types of threads do not affect each other.

3. Some threads offer, some threads poll:

  • The offer speed is faster than the poll: If the offer operation is faster than the poll operation, the length of the queue will get longer and longer. Because the offer node is always inserted at the end of the queue, and the poll node is always deleted at the head of the queue, there is no intersection between the two types of threads, which can be regarded as a "single-thread offer".
  • The offer speed is slower than the poll: If the poll operation is faster than the offer operation, the length of the queue will become shorter and shorter. At this time, there will be an intersection between the offer thread and the poll thread, that is, a node that operates simultaneously at a certain moment, which is called the critical point. At critical points, the processing logic in the offer method needs to be considered.

In a multi-threaded environment, the offer method of ConcurrentLinkedQueue can ensure concurrency security without additional synchronization measures. It can efficiently support concurrent insertion operations and follows the FIFO (first in, first out) feature.

import ;
 
public class main {
    public static void main(String[] args) {
        // Create a ConcurrentLinkedQueue object        ConcurrentLinkedQueue&lt;Integer&gt; queue = new ConcurrentLinkedQueue&lt;&gt;();
 
        // Single thread offer        Thread singleThreadOffer = new Thread(() -&gt; {
            for (int i = 0; i &lt; 5; i++) {
                (i);
                ("Single thread offer element:" + i);
            }
        });
 
        // Multiple threads offer        Thread multipleThreadsOffer = new Thread(() -&gt; {
            for (int i = 5; i &lt; 10; i++) {
                (i);
                ("Multiple thread offer elements:" + i);
            }
        });
 
        // Some threads offer, some threads poll        Thread mixedThreads = new Thread(() -&gt; {
            for (int i = 0; i &lt; 5; i++) {
                if (i % 2 == 0) {
                    (i);
                    ("Some thread offer elements:" + i);
                } else {
                    Integer element = ();
                    ("Some thread poll elements:" + element);
                }
            }
        });
 
        // Start the thread        ();
        ();
        ();
 
        // Wait for the thread to complete execution        try {
            ();
            ();
            ();
        } catch (InterruptedException e) {
            ();
        }
    }
}

The above code demonstrates three scenarios:

  • A single thread performs an offer operation, inserting elements into the queue one by one;
  • Multiple threads perform offer operations at the same time, inserting elements into the queue one by one;
  • Some threads perform offer operations, and some threads perform poll operations to implement the insertion and removal of elements.

In each thread, the element is inserted into the queue by (i) method, and the element is removed from the queue using the () method. Each operation prints out the corresponding information to observe the results.

The offer and poll methods of ConcurrentLinkedQueue can be used safely in multithreaded environments and follow the FIFO (first in, first out) feature.

poll method

  • In a single thread, the poll operation will first determine whether the data of the head of the team is empty. If it is not empty, it will directly return the data and dequeue the node; otherwise, it will need to traverse the queue to find nodes whose data is not empty and update the head of the team.
  • In the case of multi-threading, you need to pay attention to dealing with the situation where multiple threads poll and offer at the same time to ensure the correctness of the operation. At the same time, when judging whether the queue is empty, you cannot rely solely on whether the poll return value is null, but should use the isEmpty method to judge.
import ;
import .*;
public class main {
    // Create a ConcurrentLinkedQueue object    private static Queue&lt;Integer&gt; queue = new ConcurrentLinkedQueue&lt;&gt;();
    public static void main(String[] args) throws InterruptedException {
        // Put numbers into queue        (1);
        (2);
        (3);
        // Take out elements from the queue and output        Integer value = ();
        ("The value taken out is:" + value);
        ("The element in the current queue is:" + ());
        // Create a thread pool        ExecutorService executor = ();
        // Loop 10 times, submit one task to the thread pool each time        for (int i = 0; i &lt; 10; i++) {
            (() -&gt; {
                // Put numbers into queue                ((int) (() * 100));
                // Take out elements from the queue and output                Integer result = ();
                (().getName() + "The value taken out is:" + result);
                ("The element in the current queue is:" + ());
            });
        }
        // Close the thread pool        ();
        // Wait for the task execution in the thread pool to complete        (Long.MAX_VALUE, );
    }
}

In this comprehensive code example, first put the numbers into the queue in a single thread environment and remove the elements, and output the current state of the queue. Next, in a multi-threaded environment, we create a thread pool and submit 10 tasks. Each task will randomly generate a number and put it into the queue, then take out the elements from the queue and output it, and finally output the current state of the queue. It can be seen that in the case of a single thread and multiple threads, the poll method of ConcurrentLinkedQueue can ensure the thread safety of queue operations, and can improve the concurrency performance of the program in a multi-threaded environment.

HOPS design

Through the above analysis of the offer and poll methods, we found that tail and head are delayed updates, and the update triggering time between the two is:

The design of HOPS reduces the frequency of CAS operations through delayed updates, thereby improving the performance of enqueuing operations. Specifically, the update of tail is delayed until the insertion node is performed. Only when the next node of the node pointed to by tail is not null, the real tail-queue node positioning and update operations will be performed. Similarly, the update of the head is delayed until the node is deleted. The head node positioning and update operations will be performed only when the item domain of the node pointed to by the head is null.

This latency update strategy is designed to reduce the performance impact of CAS operations. If the tail or head is updated immediately every time, a large number of incoming or dequeuing operations require CAS operations to update the tail or head, which puts a significant burden on performance. Through delayed updates, the frequency of CAS operations is reduced to a certain extent and the efficiency of enqueuing operations is improved.

Although delayed updates increase the operation of locating tail-queue nodes in a loop, overall, the performance of read operations is much higher than that of write operations. Therefore, the increased operating performance loss of positioning the tail node in the loop is relatively small.

In summary, the strategy of delayed update in HOPS design improves the performance of enqueuing operations by reducing the frequency of CAS operations, and at the same time achieves a good balance between the trade-offs between read operation performance and write operation performance.

Extended knowledge

tail and head are two important pointers in the ConcurrentLinkedQueue queue. They point to the tail and head nodes in the queue, respectively.

When we need to insert elements into the queue, we need to use the node pointed to by the tail pointer as the predecessor node for the insertion node, and add the insertion node behind it through the CAS (compare-and-swap) operation. If the insertion node successfully joins the queue tail, you need to use the CAS operation to point the tail pointer to the new tail node.

When we need to delete an element from the queue, we need to use the node pointed to by the head pointer as the node to be deleted and point it to the next node through the CAS operation. If the deletion is successful, you need to use the updateHead method to point the head pointer to the real head node.

It should be noted that there will be a delay in updating tail and head pointers, and will only be updated under specific conditions. Specifically, when the next node of the node pointed to by tail is not null, the operation of locating the real tail node of the queue will be performed, and the update of tail will be completed through CAS operations; when the item domain of the node pointed to by head is null, the operation of locating the real head node of the queue will be performed, and the update of head will be completed through the updateHead method. This delayed update strategy can effectively reduce the frequency of CAS operations and improve queue performance.

This is the end of this article about the analysis of Java concurrent container ConcurrentLinkedQueue. For more content related to Java ConcurrentLinkedQueue, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!