SoFunction
Updated on 2025-04-18

Implement an efficient thread pool using C++

In multithreaded programming, thread pooling is a common and efficient design pattern. It handles tasks by pre-creating a certain number of threads, thereby avoiding the performance overhead of frequent thread creation and destruction. This article will introduce in detail how to implement a thread pool using C++ and parse relevant code implementation details.

Introduction to thread pool

Thread Pool is a mechanism for managing and reusing threads. It maintains a collection of threads, and when there is a task that needs to be executed, it allocates an idle thread from the thread pool to process the task. After the task is completed, the thread is returned to the pool. This can significantly reduce the overhead of thread creation and destruction, and improve the overall performance and response speed of the system.

Design ideas

The thread pool implemented in this article mainly contains two core classes:

  • Thread class: encapsulates the creation, startup and management of a single thread.
  • ThreadPool class: manages multiple threads, maintains task queues, and schedules tasks to threads for execution.

Thread pool supports two modes:

MODE_CACHED: Cache mode, dynamically adjusts the number of threads according to the task volume, and is suitable for scenarios where the task volume is not fixed.

MODE_FIXED: Fixed mode, fixed number of threads, suitable for scenarios with stable task volume.

Thread class implementation

The Thread class is responsible for encapsulating the creation and management of single threads. The following is the implementation of the and.

#include <functional>
#include <atomic>
#include <cstdint>
#include <thread>

class Thread {
public:
    using ThreadFunc = std::function<void(std::uint32_t)>;

public:
    explicit Thread(ThreadFunc func);

    void join();

    ~Thread();

    void start();

    [[nodiscard]] std::uint32_t getID() const;

    [[nodiscard]] static std::uint32_t getNumCreated();

    Thread(const Thread &) = delete;

    Thread &operator=(const Thread &) = delete;

private:
    ThreadFunc m_func;
    uint32_t m_threadID;
    std::thread m_thread;
    static std::atomic<uint32_t> m_numCreateThread;
};

#include ""

std::atomic<uint32_t> Thread::m_numCreateThread(0);

Thread::Thread(Thread::ThreadFunc func) : m_func(std::move(func)), m_threadID(m_numCreateThread.load()) {
    m_numCreateThread++;
}

void Thread::start() {
    m_thread = std::thread([this]() {
        m_func(m_threadID);
    });
    m_thread.detach();
}

uint32_t Thread::getID() const {
    return m_threadID;
}

uint32_t Thread::getNumCreated() {
    return Thread::m_numCreateThread.load();
}

Thread::~Thread() {
    join();
}

void Thread::join() {
    if (m_thread.joinable()) {
        m_thread.join();
    }
}

Analysis

Member variables:

  • m_func: A function executed by a thread.
  • m_threadID: The unique identifier of the thread.
  • m_thread:std::thread object.
  • m_numCreateThread: Static atomic variable used to record the number of created threads.

Constructor:

Accept a function as an argument and assign a unique thread ID.

Start method:

Start the thread, execute the passed function, and set the thread to a separate state so that resources are automatically recycled when the thread ends.

Join method and destructor:

If the thread is connectable, perform the join operation to ensure the correct recycling of thread resources.

ThreadPool class implementation

The ThreadPool class is responsible for managing multiple threads, maintaining task queues, and scheduling tasks to threads for execution. The following is the implementation of the and.

#include &lt;mutex&gt;
#include &lt;unordered_map&gt;
#include &lt;memory&gt;
#include &lt;functional&gt;
#include &lt;queue&gt;
#include &lt;iostream&gt;
#include &lt;condition_variable&gt;
#include &lt;future&gt;
#include &lt;cstdint&gt;
#include ""

enum class THREAD_MODE {
    MODE_CACHED,
    MODE_FIXED,
};

class ThreadPool {
public:
    explicit ThreadPool(THREAD_MODE mode = THREAD_MODE::MODE_CACHED, std::uint32_t maxThreadSize = 1024,
                        std::uint32_t initThreadSize = 4, std::uint32_t maxTaskSize = 1024);

    ~ThreadPool();

    void setThreadMaxSize(uint32_t maxSize);
    void setMode(THREAD_MODE mode);
    void setTaskMaxSize(uint32_t maxSize);
    void start(uint32_t taskSize = std::thread::hardware_concurrency());

    ThreadPool(const ThreadPool &amp;) = delete;
    ThreadPool &amp;operator=(const ThreadPool &amp;) = delete;

    template&lt;typename Func, typename ...Args&gt;
    auto submitTask(Func &amp;&amp;func, Args &amp;&amp;...args) -&gt; std::future&lt;typename std::invoke_result&lt;Func, Args...&gt;::type&gt;;

protected:
    [[nodiscard]] bool checkState() const;
    void ThreadFun(uint32_t threadID);

private:
    using Task = std::function&lt;void()&gt;;

    std::unordered_map&lt;uint32_t, std::unique_ptr&lt;Thread&gt;&gt; m_threads;
    uint32_t m_initThreadSize; // Initial number of threads    std::atomic&lt;std::uint32_t&gt; m_spareThreadSize; // Number of free threads    uint32_t m_maxThreadSize; // Maximum number of threads    std::atomic&lt;bool&gt; m_isRunning; // Thread pool running flag    THREAD_MODE m_mode; // Thread pool running mode
    std::deque&lt;Task&gt; m_tasks;
    std::atomic&lt;uint32_t&gt; m_taskSize;
    uint32_t m_maxTaskSize;

    uint32_t m_thread_maxSpareTime;

    mutable std::mutex m_mutex; // Thread pool mutex    std::condition_variable m_notEmpty;
    std::condition_variable m_notFull;
    std::condition_variable m_isExit;
};

#include ""
#include <thread>

ThreadPool::ThreadPool(THREAD_MODE mode, uint32_t maxThreadSize, uint32_t initThreadSize,
                       uint32_t maxTaskSize) : m_initThreadSize(initThreadSize), m_spareThreadSize(0),
                                               m_maxThreadSize(maxThreadSize), m_isRunning(false), 
                                               m_mode(mode), m_taskSize(0), m_maxTaskSize(maxTaskSize), 
                                               m_thread_maxSpareTime(60) {
}

bool ThreadPool::checkState() const {
    return m_isRunning;
}

void ThreadPool::setThreadMaxSize(uint32_t maxSize) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_maxThreadSize = maxSize;
}

void ThreadPool::setMode(THREAD_MODE mode) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_mode = mode;
}

void ThreadPool::setTaskMaxSize(uint32_t maxSize) {
    if (checkState())
        std::cerr << "threadPool is running, cannot change!" << std::endl;
    else
        this->m_maxTaskSize = maxSize;
}

void ThreadPool::ThreadFun(uint32_t threadID) {
    auto last_time = std::chrono::high_resolution_clock::now();
    for (;;) {
        Task task;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            std::cout << "threadID: " << threadID << " trying to get a task" << std::endl;
            while (m_tasks.empty() && m_isRunning) {
                if (m_mode == THREAD_MODE::MODE_CACHED && m_threads.size() > m_initThreadSize) {
                    if (m_notEmpty.wait_for(lock, std::chrono::seconds(3)) == std::cv_status::timeout) {
                        auto now_time = std::chrono::high_resolution_clock::now();
                        auto dur_time = std::chrono::duration_cast<std::chrono::seconds>(now_time - last_time);
                        if (dur_time.count() > m_thread_maxSpareTime && m_threads.size() > m_initThreadSize) {
                            m_threads.erase(threadID);
                            m_spareThreadSize--;
                            std::cout << "threadID: " << threadID << " exiting due to inactivity!" << std::endl;
                            return;
                        }
                    }
                } else {
                    m_notEmpty.wait(lock);
                }
            }
            if (!m_isRunning && m_tasks.empty()) {
                m_threads.erase(threadID);
                std::cout << "threadID: " << threadID << " exiting!" << std::endl;
                m_isExit.notify_all();
                return;
            }

            if (!m_tasks.empty()) {
                m_spareThreadSize--;
                task = std::move(m_tasks.front());
                m_tasks.pop_front();
                std::cout << "threadID: " << threadID << " retrieved a task!" << std::endl;
                if (!m_tasks.empty())
                    m_notEmpty.notify_all();
                m_notFull.notify_all();
            }
        }
        if (task) {
            try {
                task();
            } catch (const std::exception &e) {
                std::cerr << "Exception in task: " << () << std::endl;
            } catch (...) {
                std::cerr << "Unknown exception in task." << std::endl;
            }
            std::cout << "threadID: " << threadID << " completed a task." << std::endl;
            m_spareThreadSize++;
            last_time = std::chrono::high_resolution_clock::now();
        }
    }
}

void ThreadPool::start(std::uint32_t taskSize) {
    m_isRunning = true;
    for (std::uint32_t i = 0; i < taskSize; ++i) {
        auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
        auto threadID = ptr->getID();
        m_threads.emplace(threadID, std::move(ptr));
    }
    for (auto &it: m_threads) {
        ->start();
        m_spareThreadSize++;
    }
}

ThreadPool::~ThreadPool() {
    m_isRunning = false;
    std::unique_lock<std::mutex> lock(m_mutex);
    m_notEmpty.notify_all();
    m_notFull.notify_all();
    m_isExit.wait(lock, [&]() -> bool { return m_threads.empty(); });
}

Implementation of submitTask template method

template<typename Func, typename ...Args>
auto ThreadPool::submitTask(Func &&func, Args &&...args) -> std::future<typename std::invoke_result<Func, Args...>::type> {
    using Rtype = typename std::invoke_result<Func, Args...>::type;
    auto task = std::make_shared<std::packaged_task<Rtype()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    std::future<Rtype> result = task->get_future();
    std::unique_lock lock(m_mutex);
    if (!m_notFull.wait_for(lock, std::chrono::seconds(3),
                            [&]() -> bool { return m_tasks.size() < m_maxTaskSize; })) {
        std::cerr << "Task queue is full, submit task failed!" << std::endl;
        throw std::runtime_error("Task queue is full");
    }
    m_tasks.emplace_back([task] { (*task)(); });
    m_notEmpty.notify_all();

    if (m_mode == THREAD_MODE::MODE_CACHED && m_tasks.size() > m_spareThreadSize) {
        if (m_threads.size() >= m_maxThreadSize) {
            std::cerr << "Thread pool has reached max size, cannot create new thread!" << std::endl;
        } else {
            std::cout << "Creating a new thread!" << std::endl;
            auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::ThreadFun, this, std::placeholders::_1));
            u_int64_t threadID = ptr->getID();
            m_threads.emplace(threadID, std::move(ptr));
            m_threads[threadID]->start();
            ++m_spareThreadSize;
        }
    }
    return result;
}

Analysis

Member variables:

  • m_threads: A collection of all threads stored.
  • m_tasks: Task queue, storing tasks to be executed.
  • m_mutex, m_notEmpty, m_notFull, m_isExit: Mutex and conditional variables for thread synchronization and task scheduling.
  • Other variables are used to control the state of the thread pool, such as the maximum number of threads, the initial number of threads, the maximum length of the task queue, etc.

Constructor:

Initialize the various parameters of the thread pool, such as mode, maximum number of threads, initial number of threads, maximum number of tasks, etc.

Start method:

Start the thread pool, create the initial number of threads, and start it.

SubmitTask template method:

  • Submit tasks to thread pool, supporting any callable objects.
  • Use std::packaged_task and std::future to implement asynchronous execution and result acquisition of tasks.
  • If the task queue is full, wait for the specified time, and if it is still full, an exception will be thrown.
  • In cache mode, create new threads dynamically based on the amount of tasks.

ThreadFun method:

  • A work function of a thread that obtains tasks from a task queue and executes them.
  • In cache mode, threads will automatically exit after a certain period of idle time, reducing resource usage.

Destructor:

Close the thread pool, notify all threads to exit, and wait for all threads to end.

Use of thread pool

Here is a simple example showing how to use the thread pool implemented above.

#include ""
#include &lt;iostream&gt;
#include &lt;chrono&gt;

// Sample task functionvoid exampleTask(int n) {
    std::cout &lt;&lt; "Task " &lt;&lt; n &lt;&lt; " is starting." &lt;&lt; std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout &lt;&lt; "Task " &lt;&lt; n &lt;&lt; " is completed." &lt;&lt; std::endl;
}

int main() {
    // Create a thread pool, use cache mode, the maximum number of threads is 8, the initial number of threads is 4, and the maximum number of tasks is 16    ThreadPool pool(THREAD_MODE::MODE_CACHED, 8, 4, 16);
    ();

    // Submit multiple tasks    std::vector&lt;std::future&lt;void&gt;&gt; futures;
    for (int i = 0; i &lt; 10; ++i) {
        futures.emplace_back((exampleTask, i));
    }

    // Wait for all tasks to complete    for (auto &amp;fut : futures) {
        ();
    }

    std::cout &lt;&lt; "All tasks have been completed." &lt;&lt; std::endl;
    return 0;
}

Running results

threadID: 0 trying to get a task
threadID: 1 trying to get a task
threadID: 2 trying to get a task
threadID: 3 trying to get a task
Task 0 is starting.
Task 1 is starting.
Task 2 is starting.
Task 3 is starting.
threadID: 0 completed a task.
threadID: 0 trying to get a task
Task 4 is starting.
threadID: 1 completed a task.
threadID: 1 trying to get a task
Task 5 is starting.
...
All tasks have been completed.

The above is the detailed content of using C++ to implement an efficient thread pool. For more information about C++ thread pool, please pay attention to my other related articles!