Source code for lazyflow.request.threadPool

#   lazyflow: data flow based lazy parallel computation framework
#       Copyright (C) 2011-2014, the ilastik developers
#                                <>
# This program is free software; you can redistribute it and/or
# modify it under the terms of the Lesser GNU General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
# See the files LICENSE.lgpl2 and LICENSE.lgpl3 for full text of the
# GNU Lesser General Public License version 2.1 and 3 respectively.
# This information is also available on the ilastik web site at:
# Built-in
import atexit
import collections
import heapq
import threading
import platform
import time
import os
import ctypes

import psutil

from lazyflow.utility.priorityQueue import PriorityQueue

[docs]class ThreadPool(object): """ Manages a set of worker threads and dispatches tasks to them. """ #_DefaultQueueType = FifoQueue #_DefaultQueueType = LifoQueue _DefaultQueueType = PriorityQueue
[docs] def __init__(self, num_workers, queue_type=_DefaultQueueType): """ Constructor. Starts all workers. :param num_workers: The number of worker threads to create. :param queue_type: The type of queue to use for prioritizing tasks. Possible queue types include :py:class:`PriorityQueue`, :py:class:`FifoQueue`, and :py:class:`LifoQueue`, or any class with ``push()``, ``pop()``, and ``__len__()`` methods. """ self.job_condition = threading.Condition() self.unassigned_tasks = queue_type() #self.memory = MemoryWatcher(self) #self.memory.start() self.num_workers = num_workers self.workers = self._start_workers( num_workers, queue_type ) # ThreadPools automatically stop upon program exit atexit.register( self.stop )
[docs] def wake_up(self, task): """ Schedule the given task on the worker that is assigned to it. If it has no assigned worker yet, assign it to the first worker that becomes available. """ # Once a task has been assigned, it must always be processed in the same worker if hasattr(task, 'assigned_worker') and task.assigned_worker is not None: task.assigned_worker.wake_up( task ) else: self.unassigned_tasks.push(task) # Notify all currently waiting workers that there's new work self._notify_all_workers()
[docs] def stop(self): """ Stop all threads in the pool, and block for them to complete. Postcondition: All worker threads have stopped. Unfinished tasks are simply dropped. """ #self.memory.stop() for w in self.workers: w.stop() for w in self.workers: w.join()
def get_states(self): return [w.state for w in self.workers] def _start_workers(self, num_workers, queue_type): """ Start a set of workers and return the set. """ workers = set() for i in range(num_workers): w = _Worker(self, i, queue_type=queue_type) workers.add( w ) w.start() return workers def _notify_all_workers(self): """ Wake up all worker threads that are currently waiting for work. """ for worker in self.workers: with worker.job_queue_condition: worker.job_queue_condition.notify() def _wait_for_idle(self): """ Useful for testing only. Wait until there are no tasks left in the threadpool. """ done = False while not done: while self.unassigned_tasks: time.sleep(0.1) for worker in self.workers: while worker.job_queue: time.sleep(0.1) # Second pass: did any of those completing tasks launch new tasks? done = True for worker in self.workers: if len(worker.job_queue) > 0: done = False if self.unassigned_tasks: done = False
class _Worker(threading.Thread): """ Runs in a loop until stopped. The loop pops one task from the threadpool and executes it. """ def __init__(self, thread_pool, index, queue_type ): name = "Worker #{}".format(index) super(_Worker, self).__init__( name=name ) self.daemon = True # kill automatically on application exit! self.thread_pool = thread_pool self.stopped = False self.job_queue_condition = threading.Condition() self.job_queue = queue_type() self.state = 'initialized' def run(self): """ Keep executing available tasks until we're stopped. """ # Try to get some work. self.state = 'waiting' next_task = self._get_next_job() while not self.stopped: # Start (or resume) the work by switching to its greenlet self.state = 'running task' next_task() # We're done with this request. # Free it immediately for garbage collection. self.state = 'freeing task' next_task = None # Now try to get some work (wait if necessary). self.state = 'waiting' next_task = self._get_next_job() def stop(self): """ Tell this worker to stop running. Does not block for thread completion. """ self.stopped = True # Wake up the thread if it's waiting for work with self.job_queue_condition: self.job_queue_condition.notify() def wake_up(self, task): """ Add this task to the queue of tasks that are ready to be processed. The task may or not be started already. """ assert task.assigned_worker is self with self.job_queue_condition: self.job_queue.push(task) self.job_queue_condition.notify() def _get_next_job(self): """ Get the next available job to perform. If necessary, block until: - a task is available (return it) OR - the worker has been stopped (might return None) """ # Keep trying until we get a job with self.job_queue_condition: if self.stopped: return None next_task = self._pop_job() while next_task is None and not self.stopped: # Wait for work to become available self.job_queue_condition.wait() if self.stopped: return None next_task = self._pop_job() if not self.stopped: assert next_task is not None assert next_task.assigned_worker is self return next_task def _pop_job(self): """ Non-blocking. If possible, get a job from our own job queue. Otherwise, get one from the global job queue. Return None if neither queue has work to do. """ # Try our own queue first if len(self.job_queue) > 0: return self.job_queue.pop() # Otherwise, try to claim a job from the global unassigned list try: #task = self.thread_pool.memory.filter(self.thread_pool.unassigned_tasks.pop()) task = self.thread_pool.unassigned_tasks.pop() except IndexError: return None else: task.assigned_worker = self # If this fails, then your callable is some built-in that doesn't allow arbitrary # members (e.g. .assigned_worker) to be "monkey-patched" onto it. You may have to wrap it in a custom class first. return task def raise_exc(self, excobj): """ I HAVEN'T TESTED THIS YET. (But it looks useful.) Debugging method. Asynchronously raise an exception in this thread. See docstring for _async_raise() for more details. """ assert self.isAlive(), "thread must be started" for tid, tobj in threading._active.items(): if tobj is self: _Worker._async_raise(tid, excobj) return @staticmethod def _async_raise(tid, excobj): """ Raise an exception in the thread with the given id. """ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj)) if res == 0: raise ValueError("nonexistent thread id") elif res > 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) raise SystemError("PyThreadState_SetAsyncExc failed")