SoFunction
Updated on 2024-10-30

Introduction to python threadpool threadpool implementation

First of all, introduce yourself to the terminology used:

Worker thread (worker): When creating a thread pool, create a worker thread according to the specified number of threads, waiting to get tasks from the task queue;

Tasks (requests): i.e. tasks handled by the worker thread, there may be thousands of tasks but only a few worker threads. Tasks are created by makingRequests

Task queue (request_queue): a queue for storing tasks, implemented using queue. Work threads get tasks from the task queue for processing;

Task processing function (callable): After the work thread gets the task, it calls the task processing function of the task, i.e. (request.callable_) to specifically process the task and return the processing result;

Task result queue (result_queue): after the task processing is completed, the results of the returned processing, into the task result queue (including exceptions);

Task exception handler function or callback (exc_callback): get the result from the task result queue, if an exception is set, the exception callback needs to be called to handle the exception;

Task result callback (callback): get the result from the task result queue, further processing of the RESULT;

The previous section introduced the installation and use of the threadpool threadpool, this section will focus on the main process of threadpool work:

(1) Creation of thread pool
(2) Starting of work threads
(3) Creation of tasks
(4) Push of tasks to the thread pool
(5) Thread processing tasks
(6) End-of-mission processing
(7) Work thread exit

Here is the definition of threadpool:

class ThreadPool: 
  """A thread pool, distributing work requests and collecting results. 
 
  See the module docstring for more information. 
 
  """ 
  def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 
    pass 
  def createWorkers(self, num_workers, poll_timeout=5): 
    pass 
  def dismissWorkers(self, num_workers, do_join=False): 
    pass 
  def joinAllDismissedWorkers(self): 
    pass 
  def putRequest(self, request, block=True, timeout=None): 
    pass 
  def poll(self, block=False): 
    pass 
  def wait(self): 
    pass 

1, the creation of the thread pool (ThreadPool(args))

task_pool=(num_works)

task_pool=(num_works) 
  def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 
    """Set up the thread pool and start num_workers worker threads. 
 
    ``num_workers`` is the number of worker threads to start initially. 
 
    If ``q_size > 0`` the size of the work *request queue* is limited and 
    the thread pool blocks when the queue is full and it tries to put 
    more work requests in it (see ``putRequest`` method), unless you also 
    use a positive ``timeout`` value for ``putRequest``. 
 
    If ``resq_size > 0`` the size of the *results queue* is limited and the 
    worker threads will block when the queue is full and they try to put 
    new results in it. 
 
    .. warning: 
      If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 
      the possibilty of a deadlock, when the results queue is not pulled 
      regularly and too many jobs are put in the work requests queue. 
      To prevent this, always set ``timeout > 0`` when calling 
      ``()`` and catch ```` exceptions. 
 
    """ 
    self._requests_queue = (q_size)# Task queue, tasks created by (args) will be put into this queue.
    self._results_queue = (resq_size)# Dictionary, task corresponding to task execution results </span>.
     = []#Working thread list, working threads created within the () function will be put into this working thread list.
     = []#Working threads that have thread events set and are not joined.
     = {}# Dictionary to record which worker thread a task is assigned to</span>.
    (num_workers, poll_timeout) 

where the initialization parameters are:

num_works: number of threads in the pool.

q_size :the length limit of the task queue, if limit the length of the queue, then when call putRequest() to add the task, after reach the limit length, then putRequest will keep trying to add the task, unless in putRequest() set the timeout or blocking;

esq_size: Length of the task result queue;

pool_timeout: if the worker thread can't read the request from the request queue, it will block the pool_timeout, if there is still no request, it will return directly;

Among other things, member variables:

self._requests_queue: task queue, tasks created by (args) will be put into this queue;
self._results_queue: dictionary, task corresponding to task execution
: A list of worker threads to which worker threads created within the () function will be placed;
: Work threads that are set to thread events and are not joined
: Dictionary of tasks pushed to the thread pool, structured as requestID:request. where requestID is a unique identifier for the task, as described later.

2. Start of the working thread ((args))

Function Definition:

def createWorkers(self, num_workers, poll_timeout=5): 
   """Add num_workers worker threads to the pool. 
 
   ``poll_timout`` sets the interval in seconds (int or float) for how 
   ofte threads should check whether they are dismissed, while waiting for 
   requests. 
 
   """ 
   for i in range(num_workers): 
     (WorkerThread(self._requests_queue, 
       self._results_queue, poll_timeout=poll_timeout)) 

WorkerThread() inherits from thread, python's built-in thread class, and puts the created WorkerThread object into a queue. Look at the definition of the WorkerThread class below:

As seen in self.__init__(args):

class WorkerThread(): 
  """Background thread connected to the requests/results queues. 
 
  A worker thread sits in the background and picks up work requests from 
  one queue and puts the results in another until it is dismissed. 
 
  """ 
 
  def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): 
    """Set up thread in daemonic mode and start it immediatedly. 
 
    ``requests_queue`` and ``results_queue`` are instances of 
    ```` passed by the ``ThreadPool`` class when it creates a 
    new worker thread. 
 
    """ 
    .__init__(self, **kwds) 
    (1)# 
    self._requests_queue = requests_queue# Task queue
    self._results_queue = results_queue# Task results queue
    self._poll_timeout = poll_timeoutTimeout when getting a task from the task queue in the #run function, if it times out then continue while(true);
    self._dismissed = ()# thread event, if set thread event then run will execute break, directly exit the working thread;
    () 
 
  def run(self): 
    """Repeatedly process the job queue until told to exit.""" 
    while True: 
      if self._dismissed.isSet():# Exit the worker thread if self._dismissed is set
 
        # we are dismissed, break out of loop 
        break 
      # get next work request. If we don't get a new request from the 
      # queue after self._poll_timout seconds, we jump to the start of 
      # the while loop again, to give the thread a chance to exit. 
      try: 
        request = self._requests_queue.get(True, self._poll_timeout) 
      except :#Try to get a task from the task queue self._requests_queue, if the queue is empty, continue
        continue 
      else: 
        if self._dismissed.isSet():# Check if this worker thread event is set, if it is set, it means that the worker thread should be terminated, then the fetched task should be returned to the task queue and the thread should be exited.
          # we are dismissed, put back request in queue and exit loop 
          self._requests_queue.put(request) 
          break 
        try:<span style="color:# If the thread event is not set, then the task handler is executed and the result returned, is pressed into the task result queue.
          result = (*, **) 
          self._results_queue.put((request, result)) 
        except: 
           = True 
          self._results_queue.put((request, sys.exc_info()))# If an exception occurs in the task handler function, the exception is pressed into the queue
 
  def dismiss(self):</span> 
    """Sets a flag to tell the thread to exit when done with current job. 
    """ 
    self._dismissed.set() 

Initialize the medium variable:

self._request_queue: task queue;
self._resutls_queuqe,: task result queue ;
self._pool_timeout: timeout when get task from task queue in run function, if timeout then continue while(true);
self._dismissed: thread event, if set thread event then run will execute break, directly exit the working thread;

Finally call () to start the thread, see above for the run function definition:

From the above run function while executing the steps are as follows:

(1) Exit the worker thread if self._dismissed is set, otherwise perform step 2
(2) Taste the get task from the task queue self._requests_queue, if the queue is empty, then continue to execute the next while loop, otherwise, execute step 3.
(3) Detect whether the thread event is set or not, if it is set, it means that the thread should be terminated, then it needs to return the fetched task to the task queue and exit the thread. If the thread event is not set, then the task handler is executed and the result returned is pressed into the task result queue, or if the task handler throws an exception, then the exception is pressed into the queue. Finally jump to step 4
(4) Continue the loop and return 1

Up to this point, the worker thread is created. According to the set number of threads in the thread pool, the worker thread is created. The worker thread gets the task from the task queue, processes the task, and presses the result of the task processing into the task result queue.

3. Creation of tasks (makeRequests)

The task creation function is (callable_,args_list,callback=None).

# utility functions 
def makeRequests(callable_, args_list, callback=None, 
    exc_callback=_handle_thread_exception): 
  """Create several work requests for same callable with different arguments. 
 
  Convenience function for creating several work requests for the same 
  callable where each invocation of the callable receives different values 
  for its arguments. 
 
  ``args_list`` contains the parameters for each invocation of callable. 
  Each item in ``args_list`` should be either a 2-item tuple of the list of 
  positional arguments and a dictionary of keyword arguments or a single, 
  non-tuple argument. 
 
  See docstring for ``WorkRequest`` for info on ``callback`` and 
  ``exc_callback``. 
 
  """ 
  requests = [] 
  for item in args_list: 
    if isinstance(item, tuple): 
      ( 
        WorkRequest(callable_, item[0], item[1], callback=callback, 
          exc_callback=exc_callback) 
      ) 
    else: 
      ( 
        WorkRequest(callable_, [item], None, callback=callback, 
          exc_callback=exc_callback) 
      ) 
  return requests 

The specific meaning of the parameters of the function that creates the task is as follows:

callable_: registered task handler, when the task is put into the task queue, the thread that gets the task in the worker thread will execute this callable_.

args_list: first args_list is a list, the list element type for the tuple, the tuple has two elements item[0], item[1], item[0] for the location parameter, item[1] for the dictionary type keyword parameters. The number of tuples in the list represents the number of tasks to be started, which is usually a single tuple when used, i.e. a makerequest() creates a task.

callback: callback function, called in the poll function (later to explain this function), callable_ call is over, the results of the task will be put into the task results queue (self._resutls_queue), in the poll function, when from self._resutls_queue queue to get a certain result, it will be Execute this callback(request, result), where result is the result returned by the request task.

exc_callback: exception callback function, in poll function, if a request corresponds to an execution exception, then this exception callback will be called.

Returns to the created task when the creation is complete.

The outer layer records this task and puts it into the task list.

Above is the function that creates the task, below explains the structure of the task object:

class WorkRequest: 
  """A request to execute a callable for putting in the request queue later. 
 
  See the module function ``makeRequests`` for the common case 
  where you want to build several ``WorkRequest`` objects for the same 
  callable but with different arguments for each call. 
 
  """ 
 
  def __init__(self, callable_, args=None, kwds=None, requestID=None, 
      callback=None, exc_callback=_handle_thread_exception): 
    """Create a work request for a callable and attach callbacks. 
 
    A work request consists of the a callable to be executed by a 
    worker thread, a list of positional arguments, a dictionary 
    of keyword arguments. 
 
    A ``callback`` function can be specified, that is called when the 
    results of the request are picked up from the result queue. It must 
    accept two anonymous arguments, the ``WorkRequest`` object and the 
    results of the callable, in that order. If you want to pass additional 
    information to the callback, just stick it on the request object. 
 
    You can also give custom callback for when an exception occurs with 
    the ``exc_callback`` keyword parameter. It should also accept two 
    anonymous arguments, the ``WorkRequest`` and a tuple with the exception 
    details as returned by ``sys.exc_info()``. The default implementation 
    of this callback just prints the exception info via 
    ``traceback.print_exception``. If you want no exception handler 
    callback, just pass in ``None``. 
 
    ``requestID``, if given, must be hashable since it is used by 
    ``ThreadPool`` object to store the results of that work request in a 
    dictionary. It defaults to the return value of ``id(self)``. 
 
    """ 
    if requestID is None: 
       = id(self) 
    else: 
      try: 
         = hash(requestID) 
      except TypeError: 
        raise TypeError("requestID must be hashable.") 
     = False 
     = callback 
    self.exc_callback = exc_callback 
     = callable_ 
     = args or [] 
     = kwds or {} 
 
  def __str__(self): 
    return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ 
      (, , , ) 

The above, as well as self.exc_callback, and self.callable_ ,args,dwds have already been explained, so I won't ramble on.
which has a globally unique identifier for the task, i.e., by getting its own first address in memory as its unique identifier id(self)
Initialized to False, if an exception occurs during the execution of (), then this variable will be set to True.

At this point, the task is created, and the upper level record of the call to makeRequests() has the task list request_list.

4. Push of tasks to the thread pool (putRequest)

The above subsection describes the creation of tasks. The number of tasks can be in the hundreds, but the only number of threads that will process the tasks is the number of threads that we formulate to process the tasks when we create the thread pool, and the number of threads specified is often much smaller than the number of tasks; therefore, each thread must process multiple tasks.

This section describes how to push a created task into a thread pool in order to get the thread pool from a blocking state, fetch the task, and then go to work on the task.

The task push is created using putRequest(self,request,block,timeout) in the ThreadPool thread pool class:

def putRequest(self, request, block=True, timeout=None): 
  """Put work request into work queue and save its id for later.""" 
  assert isinstance(request, WorkRequest) 
  # don't reuse old work requests 
  assert not getattr(request, 'exception', None) 
  self._requests_queue.put(request, block, timeout) 
  [] = request 

The main purpose of the function is to put the request task, which was created in the previous subsection, into the thread pool's task queue (self._request_queue). The tasks that have been pushed to the thread pool are then recorded and stored via the thread pool's dictionary with the structure :request.

At this point, the task is created and has been pushed to the thread pool.

5. Thread processing tasks

In the previous subsection, tasks have been pushed into threads. When the task is not pushed to the thread pool, the threads in the pool are in a blocking state, i.e., they are in the () function of the thread for a while:

try: 
  request = self._requests_queue.get(True, self._poll_timeout) 
except :#Try to get a task from the task queue self._requests_queue, if the queue is empty then continue
  continue 

Now that the task has been pushed to the thread pool, the get task will return normally and will perform the following steps:

def run(self): 
    """Repeatedly process the job queue until told to exit.""" 
    while True: 
      if self._dismissed.isSet():# Exit the worker thread if self._dismissed is set
 
        # we are dismissed, break out of loop 
        break 
      # get next work request. If we don't get a new request from the 
      # queue after self._poll_timout seconds, we jump to the start of 
      # the while loop again, to give the thread a chance to exit. 
      try: 
        request = self._requests_queue.get(True, self._poll_timeout) 
      except :#Try to get a task from the task queue self._requests_queue, if the queue is empty, continue
        continue 
      else: 
        if self._dismissed.isSet():# Check if this worker thread event is set, if it is set, it means that the worker thread should be terminated, then the fetched task should be returned to the task queue and the thread should be exited.
          # we are dismissed, put back request in queue and exit loop 
          self._requests_queue.put(request) 
          break 
        try:# If the thread event is not set, then the task handler is executed and the returned result is pressed into the task result queue.
          result = (*, **) 
          self._results_queue.put((request, result)) 
        except: 
           = True 
          self._results_queue.put((request, sys.exc_info()))# If an exception occurs in the task handler function, the exception is pressed into the queue

Get the task --->Call the task's handler function callable() to process the task --->Press the task request and the results returned by the task into the self.results_queue queue ------>If the task handler function is abnormal, then set the task exception flag to True, and press the task request and the task exception are pressed into the self.results_queue queue ----> return again to get the task

If, during the while loop, a thread event is set externally, i.e. self._dismissed.isSet is True, then it means that the thread will finish processing the task, then it will return the gotten task to the task queue and exit the thread.

6. End-of-mission processing

In the above subsection, the thread pool is described as constantly getting tasks and constantly processing them. So what do we do after each task, the thread pool provides wait () and poll () functions.

When we submit a task to the thread pool, we will call wait() to wait for the end of the task processing, after the end of the wait() will return, after the return we can carry out the next operation, such as re-create the task, will continue to push the task to the thread pool, or end the thread pool. Ending the thread pool will be introduced in the next subsection, this subsection mainly introduces wait() and poll() operations.

Let's start with the wait() operation:

def wait(self): 
  """Wait for results, blocking until all have arrived.""" 
  while 1: 
    try: 
      (True) 
    except NoResultsPending: 
      break 

Wait for the end of task processing, remains in the block phase until all task processing is complete, if () returns an exception NoResultsPending exception, then wait returns and task processing is complete.

Take a look at the poll function below:

def poll(self, block=False): 
  """Process any new results in the queue.""" 
  while True: 
    # still results pending? 
    if not : 
      raise NoResultsPending 
    # are there still workers to process remaining requests? 
    elif block and not : 
      raise NoWorkersAvailable 
    try: 
      # get back next results 
      request, result = self._results_queue.get(block=block) 
      # has an exception occured? 
      if  and request.exc_callback: 
        request.exc_callback(request, result) 
      # hand results to callback, if any 
      if  and not \ 
          ( and request.exc_callback): 
        (request, result) 
      del [] 
    except : 
      break 

(1) First, detect whether the task dictionary ({:request}) is empty or not, if it is empty then throw an exception NoResultPending end, otherwise go to step 2;

(2) Detect if the worker thread is empty (if a thread event is set for a thread, then the worker thread exits and pops out of it), if it is empty then throw a NoWorkerAvailable exception to end it, otherwise go to step 3;

(3) Get the task result from the task result queue, if the throw queue is empty then break,return, otherwise go to step 4;

(4) If an exception occurs during task processing, i.e., it is set, and an exception handling callback is set, i.e., request.exc_callback then execute the exception callback, handle the exception in the callback again, and then remove the task from the task list after returning, continue to get the task, and return to step 1. Otherwise, go to step 5;

(5) If the task result callback is set i.e. not null, then execute the task result callback i.e. (request,result), and
Remove the task from the task list, continue with the get task, and return to step 1.

(6) Repeat the above steps until an exception is thrown or the task queue is empty, then poll returns;

To this point throws NoResultPending wait operation accepts this exception after to this point wait() returns.

7. Work thread exit

The operations provided by threadpool to exit a worker thread are dismissWorkers() and joinAllDismissedWorker():

def dismissWorkers(self, num_workers, do_join=False): 
  """Tell num_workers worker threads to quit after their current task.""" 
  dismiss_list = [] 
  for i in range(min(num_workers, len())): 
    worker = () 
    () 
    dismiss_list.append(worker) 
 
  if do_join: 
    for worker in dismiss_list: 
      () 
  else: 
    (dismiss_list) 
 
def joinAllDismissedWorkers(self): 
  """Perform () on all worker threads that have been dismissed. 
  """ 
  for worker in : 
    () 
   = [] 

As you can see from dismissWorkers, the main job is to pop the specified number of threads from the working thread and set up the thread event for this thread, after setting up the thread event, the thread() function will detect this setting and end the thread.

If do_join is set, i.e., the threads that are set to be joined out of this function, then the join operation is performed on the exiting threads. Otherwise the popped out thread is placed into to wait for the joinAllDismissedWorkers operation to process the join thread.

8. Summary

To this point, threadpool thread pool in all the operations introduced, its implementation has also made a specific introduction. As you can see from the above, threadpool is not so complex, there are only a few simple operations, the main thing is to understand the whole process can be.

This is the entire content of this article.