SoFunction
Updated on 2025-04-06

C++ thread pool implementation

1. Introduction to thread pool

Thread pooling is a concurrent programming technique that avoids the overhead of creating and destroying threads frequently by pre-creating a set of threads and reusing them to perform multiple tasks. It is particularly suitable for scenarios where a large number of short life cycle tasks are handled (such as server requests, parallel computing).

The core components of thread pool

1. Task Queue(Task Queue)
Stores the tasks to be executed (usually a function object or a callable object).

2. Worker thread(Worker Threads)
A set of pre-created threads constantly extract tasks from the queue and execute them.

3. Synchronization mechanismMutex: Protects thread-safe access to task queues.
Condition Variable: Notifies thread task arrival or thread pool termination.

Implementation steps

1. Initialize the thread poolCreate a fixed number of threads, each thread loops to wait for tasks.

2. Submit a taskWrap the task into a function object and add it to the task queue.

3. Task executionThe worker thread takes the task from the queue and executes it.

4. Terminate thread poolSend a stop signal and wait for all threads to complete the current task before exiting.

2. C++11 implements thread pool

Source code

#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>

class ThreadPool 
{
public:
    //Constructor: Create worker threads based on the number of threads entered (default hardware concurrency).    //Each worker thread executes a loop, constantly extracting and executing tasks from the task queue.    //Explicat keyword prevents implicit type conversion    explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
        : stop(false) 
    {
        if (threads == 0) 
        {
            threads = 1;
        }

        for (size_t i = 0; i < threads; ++i) 
        {
            workers.emplace_back([this] 
            {
                for (;;) 
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        
                        //Waiting condition: The thread waits for the task to arrive or stop signal through the condition variable.  (CPU usage: close to 0% during sleep, wake up only when the task arrives)                        //The lambda expression is used as a predicate. When the condition (stop signal is true or the task queue is not empty) is true, the blocking will be unblocked.                        this->(lock, [this] {
                            return (this->stop || !this->());
                            });

                        /* Traditional busy waiting: while (!(stop || !())) {} // Empty loop consumes CPU */

                        if (this->stop && this->())
                        {
                            //If the thread pool needs to be terminated and the task queue is empty, return directly                            return;
                        }

                        //Task extraction: Take out the task from the queue and execute it, and use std::move to avoid copy overhead.                        task = std::move(this->());
                        this->();
                    }
                    //Execute tasks                    task();
                }
            });
        }
    }

    //Task Submission (enqueue method)    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type> 
    {
        using return_type = typename std::result_of<F(Args...)>::type;

        //Task encapsulation: Use std::packaged_task to wrap user tasks, support asynchronous return of results.        //Intelligent pointer management: shared_ptr ensures that the life cycle of the task object continues until execution is completed.        //Perfect forwarding: Keep the lvalue/rvalue characteristics of the parameter through std::forward.        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            if (stop)
            {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }  

            ([task]() { (*task)(); });
            /* The object passed in push needs to be constructed in advance, and then copied to insert it into the container;
                Emplace can construct an object using the parameters required by the constructor and insert it directly into the container.
                Compared with push, emplace saves the copying step, using emplace will save more memory.  */
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool() 
    {
        //Set the stop flag, wake up all threads, and wait for the task queue to be cleared.        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers)
        {
            ();
        }
    }

private:
    std::vector<std::thread> workers;        //Storing worker thread object    std::queue<std::function<void()>> tasks; //Task queue, storing tasks to be executed
    std::mutex queue_mutex;                  //Protect the mutex lock of the task queue    std::condition_variable condition;       //Condition variables for synchronization between threads    bool stop;                               //Sign whether the thread pool stops};

3. Thread pool source code analysis

1. Member variables

std::vector<std::thread> workers;        // Worker thread containerstd::queue<std::function<void()>> tasks; // Task queuestd::mutex queue_mutex;                  // Queue mutex lockstd::condition_variable condition;       // Conditional variablesbool stop;                               // Stop sign

Design Points:

  • Adopt the producer-consumer model, and the task queue is used as a shared resource

  • Combination usemutex+condition_variableImplement thread synchronization

  • vectorStore thread objects for unified management of life cycles

2. Constructor

2.1 Thread Initialization

explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
    : stop(false)
{
    if (threads == 0) 
    {
        threads = 1;
    }
        
    for (size_t i = 0; i < threads; ++i) 
    {
        workers.emplace_back([this] { /* Worker thread logic */ });
    }
}

Design Points:

  • explicitPrevent implicit type conversion (e.g.ThreadPool pool = 4;

  • The number of hardware concurrent threads is used by default (throughhardware_concurrency()

  • Create at least 1 thread to avoid empty pools

  • useemplace_backDirectly construct thread objects

2.2 Worker thread logic

for (;;)
{
    std::function<void()> task;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        (lock, [this] {
            return stop || !();
        });

        if (stop && ()) 
        {
           return; 
        }

        task = std::move(());
        ();
    }
    task();
}

Core mechanism:

  • unique_lockAutomatic lock management with condition variables

  • Dual status check (stop sign + queue non-empty)

  • Task extraction uses mobile semantics to avoid copying

  • Task execution is performed outside the lock scope

3. Task submission (enqueue method)

3.1 Method Signature

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>

Type Derivation:

  • Use tail-return type declaration
  • std::result_ofDeduce the return type of callable object
  • Perfect forwarding parameters (F&&+Args&&...

3.2 Task encapsulation

auto task = std::make_shared<std::packaged_task<return_type()>>
    (std::bind(std::forward<F>(f), std::forward<Args>(args)...));

Encapsulation policy:

  • packaged_taskPackaging tasks for asynchronous acquisition of results
  • shared_ptrManage task object life cycle
  • std::bindBind parameters (note the parameter forwarding restrictions of C++11)

3.3 Task joining

([task]() { (*task)(); });

Optimization point:

  • useemplaceDirectly construct queue elements
  • Lambdacaptureshared_ptrKeep tasks valid
  • Explicit dereference executionpackaged_task

4. Destructor

4.1 Shutdown control

~ThreadPool() 
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (auto& worker : workers)
    {
        ();
    }  
}

Downtime Agreement:

  • Set the shutdown flag atomic operation
  • Broadcast wakes up all waiting threads
  • Wait for all worker threads to exit

5. Analysis of key technical points

5.1 Perfect forwarding implementation

std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  • Maintain the left and right values ​​of the parameters
  • Supports the transfer of moving semantic parameters
  • C++11 limitations: Unable to forward all parameter types perfectly

5.2 Abnormal propagation mechanism

  • The task exception passesfutureObject propagation
  • packaged_taskAutomatically catch exceptions
  • User pass()Get exception

5.3 Memory Management Model

         [Task Submitter]
             |
             v
       [packaged_task] &lt;---- shared_ptr ---- [Task Queue]
             |
             v
         [future]
  • Triple life cycle guarantee:

    Submitter holdsfuture

    Queue Holding Task Wrapper

    Worker threads execute tasks

4. Performance characteristics analysis

1. Time complexity

operate Time complexity
Task submission (enqueue) O(1) (locking overhead)
Task extraction O(1)
Thread wake up Depend on system scheduling

2. Space complexity

Components Space occupancy
Thread stack MB per thread
Task Queue Ratio of tasks
Synchronous primitives Fixed size

5. Expand optimization direction

1. Work Stealing

  • Implement multiple task queues
  • Idle threads steal tasks from other queues

2. Dynamic thread pool

void adjust_workers(size_t new_size) 
{
    if (new_size &gt; ()) 
    {
     	// Scaling logic    } 
    else 
    {
        // Scaling logic    }
}

3. Priority queue

using Task = std::pair&lt;int, std::function&lt;void()&gt;&gt;; // Priority + Task   std::priority_queue&lt;Task&gt; tasks;

4. Lockless queue

moodycamel::ConcurrentQueue<std::function<void()>> tasks;

6. Guide to troubleshooting typical problems

Phenomenon Possible Causes Solution
Task not executed Thread pool destruction in advance Extend thread pool life cycle
()Permanent blockage Task not submitted/Exception not handled Check the task submission path
CPU utilization rate 100% Busy waiting or locking competition Optimize task granularity/use lock-free structure
Memory continues to grow The task object is not released correctly Check the use of smart pointers

This implementation fully demonstrates the core design paradigm of modern C++ thread pools, and developers can expand and optimize functions based on specific needs. Understanding this code structure is the basis for mastering more advanced concurrency patterns.

7. Test cases

Example of usage (C++11 compatible):

#include &lt;iostream&gt;

int main() 
{
    ThreadPool pool(4);
    
    // Submit normal functions    auto future1 = ([](int a, int b) {
        return a + b;
    }, 2, 3);
    
    // Submit member functions    struct Calculator 
    {
        int multiply(int a, int b) 
        { 
            return a * b; 
        }
    } calc;
    auto future2 = (std::bind(&amp;Calculator::multiply, &amp;calc, 
                                        std::placeholders::_1, 
                                        std::placeholders::_2), 4, 5);
    
    //Exception handling example    auto future3 = ([]() -&gt; int {
        throw std::runtime_error("example error");
        return 1;
    });
    
    std::cout &lt;&lt; "2+3=" &lt;&lt; () &lt;&lt; std::endl;
    std::cout &lt;&lt; "4*5=" &lt;&lt; () &lt;&lt; std::endl;
    
    try 
    {
        ();
    } 
    catch(const std::exception&amp; e)
    {
        std::cout &lt;&lt; "Caught exception: " &lt;&lt; () &lt;&lt; std::endl;
    }
    
    return 0;
}

This is all about this article about the implementation of C++ thread pool. For more related C++ thread pool content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!