SoFunction
Updated on 2025-03-02

Examples of thread pool and task pool for c++ version

Copy the codeThe code is as follows:

//Units of seconds, monitor the time interval of the free list. Tasks that exceed TASK_DESTROY_INTERVAL time in the idle queue will be automatically destroyed
const int CHECK_IDLE_TASK_INTERVAL = 300;
//Units of seconds, automatic task destruction time interval
const int TASK_DESTROY_INTERVAL = 60;

// Monitor whether the thread pool is empty time interval, microseconds
const int IDLE_CHECK_POLL_EMPTY = 500;

//The time interval between thread pool threads is idle and automatically exits, 5 minutes
const int  THREAD_WAIT_TIME_OUT = 300;

Copy the codeThe code is as follows:

#include ""

#include <>

#include <>
#include <>

    TaskPool::TaskPool(const int & poolMaxSize)
    : m_poolSize(poolMaxSize)
      , m_taskListSize(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_lock, NULL);
    pthread_mutex_init(&m_idleMutex, NULL);
    pthread_cond_init(&m_idleCond, NULL);

    pthread_attr_t attr;
    pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // Let the thread run independently
pthread_create(&m_idleId, &attr, CheckIdleTask, this); //Create a monitoring of idle task process
    pthread_attr_destroy(&attr);
}

TaskPool::~TaskPool()
{
    if(!m_bStop)
    {
        StopPool();
    }
    if(!m_taskList.empty())
    {
        std::list<Task*>::iterator it = m_taskList.begin();
        for(; it != m_taskList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_taskList.clear();
        m_taskListSize = 0;
    }
    if(!m_idleList.empty())
    {
        std::list<Task*>::iterator it = m_idleList.begin();
        for(; it != m_idleList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_idleList.clear();
    }


    pthread_mutex_destroy(&m_lock);
    pthread_mutex_destroy(&m_idleMutex);
    pthread_cond_destroy(&m_idleCond);
}

void * TaskPool::CheckIdleTask(void * arg)
{
    TaskPool * pool = (TaskPool*)arg;
    while(1)
    {
        pool->LockIdle();
        pool->RemoveIdleTask();
        if(pool->GetStop())
        {
            pool->UnlockIdle();
            break;
        }
        pool->CheckIdleWait();
        pool->UnlockIdle();
    }
}

void TaskPool::StopPool()
{
    m_bStop = true;
    LockIdle();
pthread_cond_signal(&m_idleCond); //Prevent the monitoring thread from waiting and causing problems that cannot exit
    UnlockIdle();
    pthread_join(m_idleId, NULL);
}

bool TaskPool::GetStop()
{
    return m_bStop;
}

void TaskPool::CheckIdleWait()
{
    struct timespec timeout;
    memset(&timeout, 0, sizeof(timeout));
    timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
    timeout.tv_nsec = 0;
    pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}

int TaskPool::RemoveIdleTask()
{
    int iRet = 0;
    std::list<Task*>::iterator it, next;
    std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();
    time_t curTime = time(0);
    for(; rit != m_idleList.rend(); )
    {
        it = --();
        if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
        {
            iRet++;
            delete *it;
            *it = NULL;
            next = m_idleList.erase(it);
            rit = std::list<Task*>::reverse_iterator(next);
        }
        else
        {
            break;   
        }
    }
}

int TaskPool::AddTask(task_fun fun, void *arg)
{
    int iRet = 0;
    if(0 != fun)
    {
        pthread_mutex_lock(&m_lock);
        if(m_taskListSize >= m_poolSize)
        {
            pthread_mutex_unlock(&m_lock);
            iRet = -1; //task pool is full;
        }
        else
        {
            pthread_mutex_unlock(&m_lock);
            Task * task = GetIdleTask();
            if(NULL == task)
            {
                task = new Task;
            }
            if(NULL == task)
            {
                iRet = -2; // new failed
            }
            else
            {
                task->fun = fun;
                task->data = arg;
                pthread_mutex_lock(&m_lock);
                m_taskList.push_back(task);
                ++m_taskListSize;
                pthread_mutex_unlock(&m_lock);
            }
        }
    }
    return iRet;
}

Task* TaskPool::GetTask()
{
    Task *task = NULL;
    pthread_mutex_lock(&m_lock);
    if(!m_taskList.empty())
    {
        task =  m_taskList.front();
        m_taskList.pop_front();
        --m_taskListSize;
    }
    pthread_mutex_unlock(&m_lock);
    return task;
}

void TaskPool::LockIdle()
{
    pthread_mutex_lock(&m_idleMutex);
}

void TaskPool::UnlockIdle()
{
    pthread_mutex_unlock(&m_idleMutex);
}

Task * TaskPool::GetIdleTask()
{
    LockIdle();
    Task * task = NULL;
    if(!m_idleList.empty())
    {
        task = m_idleList.front();
        m_idleList.pop_front();
    }
    UnlockIdle();
    return task;
}

void TaskPool::SaveIdleTask(Task*task)
{
    if(NULL != task)
    {
        task->fun = 0;
        task->data = NULL;
        task->last_time = time(0);
        LockIdle();
        m_idleList.push_front(task);
        UnlockIdle();
    }
}

Copy the codeThe code is as follows:

#ifndef TASKPOOL_H
#define TASKPOOL_H
/* purpose @ task pool, mainly buffers the number of external high concurrent tasks, and a manager is responsible for scheduling tasks
*            The task pool can automatically destroy long-term idle Task objects
*
*           TASK_DESTROY_INTERVAL Sets the Task idle time. If this time value exceeds this, it will be destroyed by the CheckIdleTask thread.
 * date    @ 2013.12.23
 * author  @
 */

#include <list>
#include <>
#include ""

//All users operate as a task,
typedef void (*task_fun)(void *);
struct Task
{
task_fun fun; //Task processing function
void* data; //Task processing data
time_t last_time; //The time to join the idle queue is used for automatic destruction
};

//Task pool, all tasks will be delivered to the task pool, and the management thread is responsible for delivering tasks to the thread pool
class TaskPool
{
public:
/* pur @ Initialize the task pool, start the task pool idle queue to automatically destroy threads
* para @ maxSize Maximum number of tasks, greater than 0
    */
    TaskPool(const int & poolMaxSize);
    ~TaskPool();

/* pur @ Add task to the end of the task queue
* para @ task, specific tasks
* return @ 0 Added successfully, negative number Added failed
    */   
    int AddTask(task_fun fun, void* arg);

/* pur @ Get a task from the header of the task list
* return @  If there is a task in the list, it returns a Task pointer, otherwise it returns a NULL
    */   
    Task* GetTask();

/* pur @ Save idle tasks to idle queue
* para @ task The task that has been called to execute
     * return @
    */
    void SaveIdleTask(Task*task);

    void StopPool();
public:
    void LockIdle();
    void UnlockIdle();
    void CheckIdleWait();
    int RemoveIdleTask();
    bool GetStop();
private:
    static void * CheckIdleTask(void *);
/* pur @ Get an idle task
     * para @
     * para @
* return @ NULL indicates that there is no free, otherwise get one from m_idleList
    */
    Task* GetIdleTask();
    int GetTaskSize();
private:
int m_poolSize; //Task pool size
int m_taskListSize; // Statistics the size of the taskList, because when the size of the List increases with the increase in the number of
bool m_bStop; //Stop
std::list<Task*> m_taskList;//All pending tasks list
std::list<Task*> m_idleList;//All idle task list
pthread_mutex_t m_lock; //Lock the task list to ensure that only one task can be retrieved at a time
pthread_mutex_t m_idleMutex; //Idle task queue lock
pthread_cond_t m_idleCond; //Idle queue waiting conditions
    pthread_t m_idleId;;
};
#endif

Copy the codeThe code is as follows:

/* purpose @ thread pool class, responsible for thread creation and destruction, and realizes the automatic exit function of thread timeout (semi-resident)
 * date    @ 2014.01.03
 * author  @
 */

#include ""
#include <>
#include <>

/*
#include <iostream>
#include <>
*/

Thread::Thread(bool detach, ThreadPool * pool)
    : m_pool(pool)
{
    pthread_attr_init(&m_attr);
    if(detach)
    {
pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // Let the thread run independently
    }
    else
    {
         pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
    }

pthread_mutex_init(&m_mutex, NULL); //Initialize the mutex
pthread_cond_init(&m_cond, NULL); //Initialize the condition variable
    = 0;
    = NULL;
}

Thread::~Thread()
{
    pthread_cond_destroy(&m_cond);
    pthread_mutex_destroy(&m_mutex);
    pthread_attr_destroy(&m_attr);
}

    ThreadPool::ThreadPool()
    : m_poolMax(0)
    , m_idleNum(0)
    , m_totalNum(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_mutex, NULL);
    pthread_mutex_init(&m_runMutex,NULL);
    pthread_mutex_init(&m_terminalMutex, NULL);
    pthread_cond_init(&m_terminalCond, NULL);
    pthread_cond_init(&m_emptyCond, NULL);
}

ThreadPool::~ThreadPool()
{
    /*if(!m_threads.empty())
    {
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it != m_threads.end(); ++it)
        {
            if(*it != NULL)
            {
                pthread_cond_destroy( &((*it)->m_cond) );
                pthread_mutex_destroy( &((*it)->m_mutex) );
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
    }*/
    pthread_mutex_destroy(&m_runMutex);
    pthread_mutex_destroy(&m_terminalMutex);
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_terminalCond);
    pthread_cond_destroy(&m_emptyCond);
}

int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
    if(poolMax < poolPre
            || poolPre < 0
            || poolMax <= 0)
    {
        return -1;
    }
    m_poolMax = poolMax;

    int iRet = 0;
    for(int i=0; i<poolPre; ++i)
    {
        Thread * thread = CreateThread();
        if(NULL == thread)
        {
            iRet = -2;
        }
    }

    if(iRet < 0)
    { 
        std::list<Thread*>::iterator it = m_threads.begin();
        for(; it!= m_threads.end(); ++it)
        {
            if(NULL != (*it) )
            {
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
        m_totalNum = 0;
    }
    return iRet;
}

void ThreadPool::GetThreadRun(task_fun fun, void* arg)
{
//Get a thread from the thread pool
    pthread_mutex_lock( &m_mutex);
    if(m_threads.empty())
    {
pthread_cond_wait(&m_emptyCond,&m_mutex); //Blocking and waiting for free threads
    }

    Thread * thread = m_threads.front();
    m_threads.pop_front();
    pthread_mutex_unlock( &m_mutex);

    pthread_mutex_lock( &thread->m_mutex );
    thread-> = fun;
    thread-> = arg;       
pthread_cond_signal(&thread->m_cond); //Trigger the thread WapperFun loop execution
    pthread_mutex_unlock( &thread->m_mutex );
}

int ThreadPool::Run(task_fun fun, void * arg)
{
pthread_mutex_lock(&m_runMutex); //Certify that only one thread can execute at a time
    int iRet = 0;
    if(m_totalNum <m_poolMax) //
    {
        if(m_threads.empty() && (NULL == CreateThread()) )
        {
            iRet = -1;//can not create new thread!
        }
        else
        {
            GetThreadRun(fun, arg);
        }
    }
    else
    {
        GetThreadRun(fun, arg);
    }
    pthread_mutex_unlock(&m_runMutex);
    return iRet;
}

void ThreadPool::StopPool(bool bStop)
{
    m_bStop = bStop;
    if(bStop)
    {
//Start a thread that monitors whether all idle threads exit
        Thread thread(false, this);
pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck, &thread); //Start and monitor all threads to exit threads
//Blocking and waiting for all idle threads to exit
        pthread_join(thread.m_threadId, NULL);
    }
    /*if(bStop)
    {
        pthread_mutex_lock(&m_terminalMutex);
//Start a thread that monitors whether all idle threads exit
        Thread thread(true, this);
pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck, &thread); //Start and monitor all threads to exit threads
//Blocking and waiting for all idle threads to exit
        pthread_cond_wait(&m_terminalCond, & m_terminalMutex);
        pthread_mutex_unlock(&m_terminalMutex);
    }*/
}

bool ThreadPool::GetStop()
{
    return m_bStop;
}

Thread * ThreadPool::CreateThread()
{
    Thread * thread = NULL;
    thread = new Thread(true, this);
    if(NULL != thread)
    {
int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun, thread); //Add threads to the idle queue through WapperFun
        if(0 != iret)
        {
            delete thread;
            thread = NULL;
        }
    }
    return thread;
}

void * ThreadPool::WapperFun(void*arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    pool->IncreaseTotalNum();
    struct timespec abstime;
    memset(&abstime, 0, sizeof(abstime));
    while(1)
    {
        if(0 != thread->)
        {
            thread->(thread->);
        }

        if( true == pool->GetStop() ) 
        {
break; //After confirming the current task execution, determine whether to exit the thread
        }
        pthread_mutex_lock( &thread->m_mutex );
pool->SaveIdleThread(thread); //Add thread to the idle queue
        abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
        abstime.tv_nsec = 0;
if(ETIMEDOUT  == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //Waiting for the thread to be woken up or timeout automatically exit
        {
            pthread_mutex_unlock( &thread->m_mutex );
            break;
        }
        pthread_mutex_unlock( &thread->m_mutex );
    }

    pool->LockMutex();
    pool->DecreaseTotalNum();
    if(thread != NULL)
    {
        pool->RemoveThread(thread);
        delete thread;
        thread = NULL;
    }
    pool->UnlockMutex();
    return 0;
}

void ThreadPool::SaveIdleThread(Thread * thread )
{
    if(thread)
    {
        thread-> = 0;
        thread-> = NULL;
        LockMutex();
        if(m_threads.empty())
        {
pthread_cond_broadcast(&m_emptyCond); //Send a non-empty signal to tell the run function that the thread queue is no longer empty
        }
        m_threads.push_front(thread);
        UnlockMutex();
    }
}

int ThreadPool::TotalThreads()
{
    return m_totalNum;
}


void ThreadPool::SendSignal()
{
    LockMutex();
    std::list<Thread*>::iterator it = m_threads.begin();
    for(; it!= m_threads.end(); ++it)
    {
        pthread_mutex_lock( &(*it)->m_mutex );
        pthread_cond_signal(&((*it)->m_cond));
        pthread_mutex_unlock( &(*it)->m_mutex );
    }
    UnlockMutex();
}

void * ThreadPool::TerminalCheck(void* arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    while((false == pool->GetStop()) || pool->TotalThreads() >0 )
    {
        pool->SendSignal();

        usleep(IDLE_CHECK_POLL_EMPTY);
    }
    //pool->TerminalCondSignal();
    return 0;
}

void ThreadPool::TerminalCondSignal()
{
    pthread_cond_signal(&m_terminalCond);
}

void ThreadPool::RemoveThread(Thread* thread)
{
    m_threads.remove(thread);
}

void ThreadPool::LockMutex()
{
    pthread_mutex_lock( &m_mutex);
}

void ThreadPool::UnlockMutex()
{
    pthread_mutex_unlock( &m_mutex );
}

void ThreadPool::IncreaseTotalNum()
{
    LockMutex();
    m_totalNum++;
    UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
    m_totalNum--;
}

Copy the codeThe code is as follows:

#ifndef THREADPOOL_H
#define THREADPOOL_H
/* purpose @ thread pool class, responsible for thread creation and destruction, and realizes the automatic exit function of thread timeout (semi-resident) a
*         Create a TerminalCheck thread when the thread pool exits, which is responsible for monitoring the exit of all threads in the thread pool.
 * date    @ 2013.12.23
 * author  @
 */

#include <list>
#include <string>
#include ""
//Control the task scheduling process through threadmanager
//The TerminalCheck thread of threadpool is responsible for monitoring the exit of all threads of the thread pool


class ThreadPool;
class Thread
{
public:
    Thread(bool detach, ThreadPool * pool);
    ~Thread();
pthread_t  m_threadId; //Threadid
pthread_mutex_t m_mutex; //mutex lock
pthread_cond_t m_cond; //condition variable
pthread_attr_t m_attr; //Thread attributes
 Task  task; //
ThreadPool * m_pool; //The thread pool to which it belongs
};

//Thread pool, responsible for creating thread processing tasks. After processing, the thread will be added to the idle queue and from the task pool
class ThreadPool
{
public:
    ThreadPool();
    ~ThreadPool();

/* pur @ Initialize thread pool
* para @ poolMax thread pool maximum number of threads
* para @ poolPre Number of pre-created threads
* return @ 0:Success
     *          -1: parameter error, must poolMax > poolPre >=0
*        -2: Failed to create thread
    */
    int InitPool(const int & poolMax, const int & poolPre);

/* pur @ Perform a task
* para @ task pointer
* return @ 0 Task allocation succeeds, negative value Task allocation failed, -1, creating a new thread failed
    */
    int Run(task_fun fun, void* arg);

/* pur @ Set whether to stop thread pool work
* para @ bStop true stops, false does not stop
    */
 void StopPool(bool bStop);

public: //This public function is mainly used for static function calls
/* pur @ Get the start and stop status of the process pool
     * return @
    */
    bool GetStop();   
 void SaveIdleThread(Thread * thread );
    void LockMutex();
    void UnlockMutex();
    void DecreaseTotalNum();
    void IncreaseTotalNum();
    void RemoveThread(Thread* thread);
    void TerminalCondSignal();
    int TotalThreads();
    void SendSignal();
private:
/* pur @ Create thread
* return @ non-empty success, NULL failed,
    */
 Thread * CreateThread();

/* pur @ Get thread running tasks from thread pool
* para @ fun function pointer
* para @ arg function parameters
     * return @
    */
    void GetThreadRun(task_fun fun, void* arg);

 static void * WapperFun(void*);
static void * TerminalCheck(void*);//Cycle monitoring whether all threads terminate threads

private:
int m_poolMax;//The maximum number of threads in thread pool
int m_idleNum; //Num of idle threads
int m_totalNum; //The total number of current threads is less than the maximum number of threads
bool m_bStop; // Whether to stop the thread pool
pthread_mutex_t m_mutex; //Thread list lock
pthread_mutex_t m_runMutex; //run function lock

pthread_mutex_t m_terminalMutex; // Terminate all thread mutexes
pthread_cond_t m_terminalCond; // Terminate all thread condition variables
pthread_cond_t m_emptyCond; //Idle threads are not empty condition variables

std::list<Thread*> m_threads; // Thread list
};
#endif

Copy the codeThe code is as follows:

#include ""
#include ""
#include ""

#include <>
#include <>

/*#include <>
#include <sys/>
#include <>*/
 //   struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager()
    : m_threadPool(NULL)
    , m_taskPool(NULL)
    , m_bStop(false)
{
    pthread_mutex_init(&m_mutex_task,NULL);
    pthread_cond_init(&m_cond_task, NULL);

   /* memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);*/
}

ThreadPoolManager::~ThreadPoolManager()
{
    StopAll();
    if(NULL != m_threadPool)
    {
        delete m_threadPool;
        m_threadPool = NULL;
    }
    if(NULL != m_taskPool)
    {
        delete m_taskPool;
        m_taskPool = NULL;
    }

    pthread_cond_destroy( &m_cond_task);
    pthread_mutex_destroy( &m_mutex_task );

    /*gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    printf("manager total time = %d\n", total);
    gettimeofday(&time_beg, NULL);*/
}

int ThreadPoolManager::Init(
        const int &tastPoolSize,
        const int &threadPoolMax,
        const int &threadPoolPre)
{
    m_threadPool = new ThreadPool();
    if(NULL == m_threadPool)
    {
        return -1;
    }
    m_taskPool = new TaskPool(tastPoolSize);
    if(NULL == m_taskPool)
    {
        return -2;
    }

    if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
    {
        return -3;
    }
//Start the thread pool
//Start the task pool
//Start the task to obtain the thread, and continuously get the tasks from the task pool to the thread pool
    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
pthread_create(&m_taskThreadId, &attr, TaskThread, this); //Create a process to get the task
    pthread_attr_destroy(&attr);
    return 0;
}

void ThreadPoolManager::StopAll()
{
    m_bStop = true;
    LockTask();
    pthread_cond_signal(&m_cond_task);
    UnlockTask();
    pthread_join(m_taskThreadId, NULL);
//Waiting for all current tasks to be completed
    m_taskPool->StopPool();
m_threadPool->StopPool(true); // Stop thread pool work
}

void ThreadPoolManager::LockTask()
{
    pthread_mutex_lock(&m_mutex_task);
}

void ThreadPoolManager::UnlockTask()
{
    pthread_mutex_unlock(&m_mutex_task);
}

void* ThreadPoolManager::TaskThread(void* arg)
{
    ThreadPoolManager * manager = (ThreadPoolManager*)arg;
    while(1)
    {
manager->LockTask(); //Prevent the task from being executed and sends a stop signal
while(1) //Exit the task in the task queue after executing it
        {
            Task * task = manager->GetTaskPool()->GetTask();
            if(NULL == task)
            {
                break;
            }
            else
            {
                manager->GetThreadPool()->Run(task->fun, task->data);
                manager->GetTaskPool()->SaveIdleTask(task);
            }
        }

        if(manager->GetStop())
        {
            manager->UnlockTask();
            break;
        }
manager->TaskCondWait(); //Waiting for execution when there is a task
        manager->UnlockTask();
    }
    return 0;
}

ThreadPool * ThreadPoolManager::GetThreadPool()
{
    return m_threadPool;
}

TaskPool * ThreadPoolManager::GetTaskPool()
{
    return m_taskPool;
}

int  ThreadPoolManager::Run(task_fun fun,void* arg)
{
    if(0 == fun)
    {
        return 0;
    }
    if(!m_bStop)
    {  
        int iRet =  m_taskPool->AddTask(fun, arg);

        if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )
        {
            pthread_cond_signal(&m_cond_task);
            UnlockTask();
        }
        return iRet;
    }
    else
    {
        return -3;
    }
}

bool ThreadPoolManager::GetStop()
{
    return m_bStop;
}

void ThreadPoolManager::TaskCondWait()
{
    struct timespec to;
    memset(&to, 0, sizeof to);
    to.tv_sec = time(0) + 60;
    to.tv_nsec = 0;

pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60 seconds timeout
}

Copy the codeThe code is as follows:

#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
*      Basic process:
*
*     Basic functions:
*
*         2. The task pool can automatically release resources that have not been used for a long time when the business is not busy (can be modified)
*          3. When the program exits, no tasks will be added to the task pool. When all tasks in the task pool are executed, the relevant program will be exited (to achieve the safe exit of the program)
*       Thread resources:
*
*
*          The maximum number of threads is: 1 TaskPool thread + 1 manager task dispatch thread + ThreadPool maximum number of threads + 1 manager exit monitoring thread + 1 thread pool all threads exit monitoring thread
*           The minimum number of threads is: 1 TaskPool to create idle task resources and destroy monitoring thread + 1 manager to create task scheduling thread
*     How to use:
 *          ThreadPoolManager manager;
*            (100000, 50, 5);//Initialize a task pool to 10000, the maximum number of threads in the thread pool is 50, and a manager with 5 threads is pre-created
*            (fun, data); //Add to execute tasks to the manager, fun is a function pointer, data is a parameter that fun needs to be passed in, data can be NULL
 *
 * date    @ 2013.12.23
 * author  @
 *
*  Detailed parameter control can modify the relevant variable values
 */

#include <>
typedef void (*task_fun)(void *);

class ThreadPool;
class TaskPool;

class ThreadPoolManager
{
public:
    ThreadPoolManager();
    ~ThreadPoolManager();

/* pur @ Initialize thread pool and task pool, threadPoolMax > threadPoolPre > threadPoolMin >= 0
* para @ tastPoolSize task pool size
* para @ threadPoolMax thread pool maximum number of threads
* para @ threadPoolPre pre-created thread count
* return @ 0: Initialization is successful, negative number Initialization failed
*        -1: Failed to create a thread pool
*        -2: Failed to create a task pool
*        -3: Thread pool initialization failed
    */
    int Init(const int &tastPoolSize,
            const int &threadPoolMax,
            const int &threadPoolPre);

/* pur @ Perform a task
* para @ fun function pointer to be executed
* Para @ arg fun required parameters, default to NULL
* return @ 0 Task allocation succeeds, negative number Task allocation failed
*        -1: The mission pool is full
*        -2: Task pool new failed
*       -3: The manager has sent a stop signal and no longer receives new tasks
    */
    int Run(task_fun fun,void* arg=NULL);

public://The following public functions are mainly used for static function calls
    bool GetStop();
    void TaskCondWait();
    TaskPool * GetTaskPool();
    ThreadPool * GetThreadPool();
    void LockTask();
    void UnlockTask();
    void LockFull();

private:
static void * TaskThread(void*); //Task processing thread
 void StopAll();

private:
ThreadPool *m_threadPool; //ThreadPool
TaskPool * m_taskPool; //TaskPool
bool m_bStop; // Whether to terminate the manager

pthread_t m_taskThreadId; // TaskThread thread id
 pthread_mutex_t m_mutex_task;
    pthread_cond_t m_cond_task;
};
#endif

Copy the codeThe code is as follows:

#include <iostream>
#include <string>
#include ""
#include <sys/>
#include <>
#include <>
#include <>


using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
    pthread_mutex_lock(&m_mutex);
    seq++;
    if(seq%inter == 0 )
    {
        cout << "fun 1=" << seq << endl;
    }
    if(seq>=1000000000)
    {
        cout << "billion" << endl;
        seq = 0;
        billNum++;
    }
    pthread_mutex_unlock(&m_mutex);
    //sleep();
}

int main(int argc, char** argv)
{
    if(argc != 6)
    {
cout << "There must be 5 parameters. Number of tasks executed. Task pool size. Thread pool size. Number of pre-created threads. Output interval" << endl;
        cout << "eg: ./test 999999 10000 100 10 20" << endl;
cout << "The above example represents the creation of a 20 task output interval, the task pool size is 10000, the thread pool size is 100, and 10 threads are pre-created, and the number of tasks is executed is: 999999" << endl;
        return 0;
    }
    double loopSize = atof(argv[1]);
    int taskSize = atoi(argv[2]);
    int threadPoolSize = atoi(argv[3]);
    int preSize = atoi(argv[4]);
    inter = atoi(argv[5]);

    pthread_mutex_init(&m_mutex,NULL);
    ThreadPoolManager manager;
    if(0>(taskSize,  threadPoolSize, preSize))
    {
cout << "Initialization failed" << endl;
        return 0;
    }
cout << "********************* Initialization is completed*********************************" << endl;
    struct timeval time_beg, time_end;
    memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);
    double i=0;
    for(; i<loopSize; ++i)
    {
        while(0>(myFunc,NULL))
        {
            usleep(100);
        }
    }
    gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    cout << "total time =" << total << endl;
    cout << "total num =" << i  << " billion num=" << billNum<< endl;
cout << __FILE__ << "All threads will be closed" << endl;
    //pthread_mutex_destroy(&m_mutex);
    return 0;
}