Lazyflow Request Framework

Introduction

The request framework is a general-purpose, coroutine-based task scheduling system based on the greenlet python library. It does not depend in any way on the graph framework, so it could be used to schedule the execution of generic python callables. It is similar in spirit to other greenlet-based frameworks like eventlet and gevent, which provide a similar interface for highly concurrent IO applications.

Using the lazyflow request framework, it is easy to perform concurrent, asynchronous workloads, without requiring the developer to write complicated state-machines or messy callback handlers. The request framework hides this complexity from the developer, allowing you to write in a clean, blocking style without the performance drag of actually blocking the OS thread(s) your workload is executing on. Internally, requests are scheduled for execution in a fixed-size threadpool. When a request has to wait for subcomputations to complete in other requests, it is temporarily suspended so that its OS thread can be used to perform other work.

This dependency diagram shows how user-defined workloads depend on the parts of the request system.

request framework dependencies

Note

The request framework is written to allow easy parallelization of computations. In Python, the Global Interpreter Lock (GIL) prevents the interpreter from executing two python statements at once. This restriction does not apply to C or C++ extensions (as long as they release the GIL). Therefore, the Request framework is most useful for workloads that do most of their “heavy lifting” within C/C++ extensions. For pure Python workloads, the Request Framework doesn’t provide performance benefits, but may still be useful for the abstractions it provides.

Quick Start

Let’s start with an example computation. Suppose you want to smooth an image at two different scales, then subtract the two resulting images. A naive implementation of this computation might look like this:

from some_img_lib import smooth

def f(image, sigmaA, sigmaB):
    smoothedA = smooth(image, sigmaA)
    smoothedB = smooth(image, sigmaB)

    result = smoothedA - smoothedB
    return result

diff_of_smoothed = f(my_image, 1.0, 3.0)

The above single-threaded approach leaves much room for improvement. Using the request framework, we can parallelize the workload:

from some_img_lib import smooth
from functools import partial
from lazyflow.request import Request

def f(image, sigmaA, sigmaB):
    r2 = Request( partial(smooth, image, sigmaA) )
    r3 = Request( partial(smooth, image, sigmaB) )

    # Start executing r3
    r3.submit()

    # Wait until both requests are complete
    smoothedA = r2.wait() # (Auto-submits)
    smoothedB = r3.wait()

    result = smoothedA - smoothedB
    return result

r1 = Request( partial(f, my_image, 1.0, 3.0) )
diff_of_smoothed = r1.wait()

To understand the example, we make a few key observations:

  • Request objects are constructed with a single callable object, which it executes
  • Requests do not execute their callable until they have been submitted via Request.submit().
  • Request.wait() automatically calls submit() if the request hasn’t been submitted yet.
  • The callable’s return value is given as the result of Request.wait().
  • functools.partial (from the python standard library) is a convenient way of creating a new callable object from a function and a set of arguments.

In cases where we are creating multiple requests and waiting until they are all complete, we can use a RequestPool, which eliminates some boilerplate.

from some_img_lib import smooth
from functools import partial
from lazyflow.request import Request, RequestPool

def f(image, sigmaA, sigmaB):
    r2 = Request( partial(smooth, image, sigmaA) )
    r3 = Request( partial(smooth, image, sigmaB) )

    pool = RequestPool()
    pool.add( r2 )
    pool.add( r3 )
    pool.wait()

    return r2.result - r3.result

r1 = Request( partial(f, my_image, 1.0, 3.0) )
diff_of_smoothed = r1.wait()

Okay, in our example, only two of the requests can execute in parallel, so the RequestPool didn’t save any code in this case. Anyway, we have more observations to make note of:

  • RequestPool.wait() will block until all requests in the pool have completed.
  • All Request objects save their callable’s return value as an attribute: Request.result

Dependencies

Here’s a visualization of the dependencies between the requests from the quickstart example:

request dependency diagram

As you can see, r1 depends on BOTH r2 and r3. In a typical use case, request dependencies form a tree, but this isn’t always true. Let’s tweak our example even further. In the new version, we don’t already have the input image. Instead, we compute it in a separate request.

from some_img_lib import smooth, compute_sample_image
from functools import partial
from lazyflow.request import Request, RequestPool

def waitAndSmooth( imageRequest, sigma ):
    image = imageRequest.wait()
    return smooth(image)

def f(imageRequest, sigmaA, sigmaB):
    r2 = Request( partial(waitAndSmooth, imageRequest, sigmaA) )
    r3 = Request( partial(waitAndSmooth, imageRequest, sigmaB) )

    pool = RequestPool()
    pool.add( r2 )
    pool.add( r3 )
    pool.wait()

    return r2.result - r3.result

r4 = Request( compute_sample_image )
r1 = Request( partial(f, r4, 1.0, 3.0) )
diff_of_smoothed = r1.wait()

Now our example is getting a little contrived for such a simple computation, but bear with us. The request dependencies are visualized in the following diagram:

request dependency diagram

Cancellation

The request framework is designed to support interactive GUIs, in which the computational workload may need to be altered on the fly. In such an environment, it may be necessary to cancel a request that has already been submitted.

To cancel a request, simply call Request.cancel()

r1 = Request( some_work )
r1.submit()

# ...

r1.cancel()

If the request was waiting for any other requests, those requests will be cancelled, too.

def some_work():
    r2 = Request( some_more_work )
    return r2.wait()

r1 = Request( some_work )
r1.submit()

# ...

r1.cancel() # Cancels r1 AND r2.

But a request will not be cancelled unless ALL of the requests that were waiting for it have already been cancelled. For example, suppose the dependency graph for some group of requests looked like this:

request dependency diagram

Now suppose that we call r1.cancel(). The following diagram shows all cancelled requests in red.

request dependency diagram

Notice that r3 and subsequent requests were not cancelled because there is a non-cancelled request (r2) still waiting for it.

Handling Cancellation

Within the context of a request, cancellation produces an exception. When a request has been cancelled, nothing happens at first. As soon as the request cedes control to the Request framework by calling Request.wait() on a child request, a cancellation request is raised. In a typical application, requests are used to execute pure functional callables. For pure-functional requests, there’s no need to handle the cancellation exception. However, in some applications, you may want to use requests to modify some external state. In that case, you’ll need to handle the cancellation exception that might be raised any time your request calls wait().

global_list = [1,2,3]

def add_items_to_global_list( num_items ):
    initial_size = len(global_list)
    try:
        for n in range(num_items):
            req = Request( get_next_item )
            next_item = req.wait() # Might raise
            global_list.append( next_item )
    except Request.CancellationException:
        # Restore the previous global state
        global_list = global_list[0:initial_size]
        raise

r1 = Request( functools.partial(add_items_to_global_list, n) )
r1.submit()

# ...

r1.cancel() # Cancels r1 AND r2.

In the example above, we catch the Request.CancellationException that might be raised within req.wait(). Note that we re-raise the exception after we clean up. Re-raising the cancellation exception isn’t strictly required by the current Request framework implementation, but it is considered best practice nonetheless.

Note

There is a special corner case that can occur if your request attempts to wait for a request that has already been cancelled from some other thread or request. If you attempt to wait for a request that is already cancelled, a Request.InvalidRequestException is raised.

Failed Requests

If any exception is raised within a request (other than a cancellation exception), the request fails. The exception that caused the failure is propagated to the request(s) or thread(s) that are waiting for it.

def some_work():
    raise RuntimeError("Something went wrong.")

r1 = Request( some_work )
try:
    r1.wait()
except:
    sys.stderr("Request failed.")

Note

Request failure handling and exception propagation is relatively heavy-weight. You can and should rely on it to catch occasional or unexpected failures, but do not rely on it as though it were as cheap as a simple if/else statement. If your requests are repeatedly raising and catching exceptions, your performance may suffer.

Exception Propagation

As mentioned above, exceptions raised in a request are propagated backwards to waiting requests. There is an interesting consequence of this behavior: For the special case where a request is being waited on by multiple requests, a single exception may propagate through multiple callstacks.

Consider this request dependency graph:

request dependency diagram

Suppose an exception is raised in r1. The following series of diagrams highlights the requests in which the exception will be seen.

request dependency diagram
request dependency diagram
request dependency diagram
request dependency diagram
request dependency diagram
request dependency diagram

Note

We are missing an optimzation opportunity here. In the example above, one of the right-most requests never sees the exception. That’s expected behavior, since that request did not fail. But after the exception is propagated through the request “call” stacks, the request is not needed any more. To save CPU cycles, we could cancel the non-failed children of failed requests.

If we decide to implement this optimization, we should take care not to interfere with the status of the children that have already failed. Only the non-failed requests should be cancelled, to avoid downgrading the “failed” status of some requests into a “cancelled” status.

Request Notifications

For some use-cases, you may want to be notified when a request completes. Request objects allow you to subscribe callbacks to three notifications:

  • Use Request.notify_finished() to be notified when a request completes successfully.
  • Use Request.notify_failed() to be notified when a request has failed (due to an uncaught exception).
  • Use Request.notify_cancelled() to be notified when a request has been cancelled.

Here’s an example:

def some_work():
    """Do some work."""

def handle_result(result)
    print "The result was:", result

def handle_failure(ex):
    print "The request failed due a {} exception".format( type(ex) )

def handle_cancelled():
    print "The request was cancelled"

req = Request( some_work )
req.notify_finished( handle_result )
req.notify_failed( handle_failure )
req.notify_cancelled( handle_cancelled )

try:
    req.wait()
finally:
    print "Request is no longer executing."

Callback Timing Guarantee

If you’re paying very close attention, you might be thinking of a question:

Does Request.wait() return before or after the callbacks are notified? In other words, after I wait() for a request, is it guaranteed that my callbacks have finished executing?

Answer:

  • Callbacks that were subscribed (via notify_finished, notify_failed, notify_cancelled) before the call to Request.wait() are guaranteed to be called before Request.wait() returns.
  • Callbacks that are subscribed after you call Request.wait() will eventually be called, but the timing of the notification is not guaranteed to be before Request.wait() returns.

Synchronization Primitives

Concurrent requests share the same pool of OS threads. The usual Lock and RLock objects from the python standard threading module will not function as intended within the context of a Request.** The Request Framework provides an alternative lock, which can be used within a Request. The RequestLock class has the same API as threading.Lock, and can be used as a drop-in replacement. See the RequestLock documentation for further details. Also see the SimpleRequestCondition documentation for a threading.Condition-like class.

Note

**Actually, threading.Lock can be used within a Request if used very carefully. As long as wait() is not called while the lock is held, there is no increased risk of deadlock or unexpected race conditions. The ResultLock class relieves the developer of this constraint, so it should be favored over threading.Lock.

Debugging Features

Synchronous Requests

If you’re using an interactive debugger like the one in PyDev/Eclipse, it can be hard to figure out where a particular request came from. Often, the current thread’s stack is truncated because it’s executing inside a worker thread. The original request which “spawned” the problematic one is probably suspended or running in a separate thread.

For debugging purposes, the lazyflow request system can be forced in a special single-threaded mode, in which all requests execute synchronously within the calling thread. (Specifically, they are executed when my_request.submit() is called.) If you’re wondering “how did I end up in this function?”, try re-running your test in single-threaded mode, and just use your debugger to follow the stack frames down to the root cause.

To activate this mode in a unit test, set the size of the lazyflow worker thread pool to 0 threads:

from lazyflow.request import Request
Request.reset_thread_pool(num_workers=0)

Or when debugging in ilastik, change the [lazyflow]/threads config file setting:

[ilastik]
debug: true
plugin_directories: ~/Documents/workspace/object_feature_plugins

[lazyflow]
total_ram_mb: 8000
threads: 0

Alternatively, use this environment variable when launching ilastik:

LAZYFLOW_THREADS=0 python ilastik.py

Implementation Details

This section is of interest to developers who need to maintain or experiment with the implementation of the Request Framework.

ThreadPool

As indicated in the dependency diagram in the introduction, the ThreadPool class is an independent module. In fact, since it does not depend on the rest of the Request Framework in any way, it could be useful as a general thread pool utility for other applications. Tasks are added to the ThreadPool via ThreadPool.wake_up(). At first, they sit in a queue of tasks that is shared by all Worker threads. Each Worker thread keeps its own queue of tasks to execute. When a Worker’s task queue becomes empty, it pulls a task from the shared queue.

Thread Context Consistency Guarantee

For simple tasks (e.g. plain functions), that’s the end of the story. For more complicated cases (e.g. requests, generators, etc.) that may be woken up multiple times, the ThreadPool provides an important guarantee: a given task will always execute on the SAME Worker thread, every time it is woken up. The Worker thread chosen for a particular task is arbitrary for the first time it is woken up, but it will return to the same Worker thread for each subsequent call to wake_up(). This guarantee is essential for coroutine based tasks based on greenlets (e.g. all Requests).

Request Lifetime

We’ll use the following diagram to track the state of a request throughout its lifetime.

empty request lifetime diagram

Let’s consider the first example we used in the Quick Start section from above:

from some_img_lib import smooth
from functools import partial
from lazyflow.request import Request

def f(image, sigmaA, sigmaB):
    r2 = Request( partial(smooth, image, sigmaA) )
    r3 = Request( partial(smooth, image, sigmaB) )

    # Start executing r3
    r3.submit()

    # Wait until both requests are complete
    smoothedA = r2.wait() # (Auto-submits)
    smoothedB = r3.wait()

    result = smoothedA - smoothedB
    return result

r1 = Request( partial(f, my_image, 1.0, 3.0) )
diff_of_smoothed = r1.wait()

The first request is created on this line:

r1 = Request( partial(f, my_image, 1.0, 3.0) )

Since it hasn’t been submitted yet, it isn’t yet known to the ThreadPool:

r1 not yet submitted

The next line (implicitly) submits the request and immediately blocks for it.

diff_of_smoothed = r1.wait()

When the request is submitted, it is given to the ThreadPool. Since the ThreadPool hasn’t seen this request previously, it ends up in the shared task queue.

r1 not yet submitted

Next, it is picked up by one of the ThreadPool’s worker threads:

r1 executing

When r1 starts executing, it creates two new requests:

r2 = Request( partial(smooth, image, sigmaA) )
r3 = Request( partial(smooth, image, sigmaB) )
r2 and r3 not yet submitted

First, it submits r3:

# Start executing r3
r3.submit()
r1 not yet submitted

...which is eventually picked up by a ThreadPool Worker thread:

r3 executing

For the sake of illustration, let’s suppose that some other part of our app has also just submitted some requests:

r2 submitted

Back in r1, we submit and wait for r2.

smoothedA = r2.wait() # (Auto-submits)

This happens in two steps. First, r2 is submitted:

r2 submitted

Next, r1 is suspended (since it is now waiting for r2).

r1 suspended

This next step exhibits the advantage of the Request Framework over a simple ThreadPool. Since r1 has been suspended, it no longer ties up a Thread. The newly available worker now picks up a request from the shared queue:

r1 suspended

Eventually, each request either completes or is suspended, and r2 makes it to the front of the shared queue:

r1 suspended

...and gets picked up by a free worker:

r1 suspended

Meanwhile, r3 finishes execution:

r1 suspended

After a while, suppose other requests (from other parts of the app) continue to be submitted:

r2 executing, other requests woken

Eventually, r2 finishes execution:

r2 finished

Since r2 and r3 are both complete, r1 can finally be woken up again:

r1 woken up

The last figure shows something important. Did you catch it? When r1 was initially submitted to the ThreadPool, it didn’t matter which Worker was chosen to execute it in. But now that it is being re-awoken, it must execute on the same Worker that it used previously. It is not added to the ThreadPool’s shared queue. Also, it does not execute on the second worker thread, even though (in our example) that thread happens to be unoccupied at the moment. It is added to the first worker’s queue. This is a constraint imposed by the greenlet package, which is used to implement Request coroutines. See also: Thread Context Consistency Guarantee.


When the first worker becomes free, r1 can finally resume execution:

r1 executing again

...and eventually r1 finishes execution.

r1 finished

Optimization: Direct Execution

From the user’s perspective, calling req.wait() is equivalent to:

req.submit()
req.wait()

But under the hood, the Request framework uses an optimization for the case where req.wait() is called on a request that hasn’t been submitted yet. Instead of submitting the request to the ThreadPool, the request is simply executed synchronously. There is no need to incur the overhead of creating a new greenlet, queueing the request, and so on. With this optimization, we don’t have to pay a significant penalty for using requests in cases where no parallelism was needed in the first place.

Note

This optimization avoids some overhead of starting new requests in their own greenlets, but it has a side-effect worth noting: When a subrequest is directly executed, it “skips in line”. It does not sit in the ThreadPool shared queue. It is executed immediately, even if higher priority requests are waiting in the shared queue. Before experimenting with alternative request prioritization schemes, it might be worth disabling this optimization.

Foreign Thread Context vs. Request Context

Internally, the Request Framework distinguishes between two types of execution contexts: request.RequestGreenlet and “normal” a.k.a “foreign” threads. If a Request is waited upon from within a foreign thread, we don’t attempt to suspend the foreign thread. Instead, we simply use a regular threading.Event to wait for the Request to complete. The current context is obtained by calling the classmethod Request.current_request(). It returns None if the current context is a “foreign” thread.

Request Priority

The queue class used by the ThreadPool can be easily configured. One of the options is a priority queue, in which tasks are ordered according to their implementation of __lt__. Requests are prioritized according to a simple rule: whichever request has the oldest ancestor (i.e. the request that spawned it) has higher priority. If two requests have a common ancestor, then their next-oldest ancestors are compared, and so on. This way, we hope to avoid cache and RAM thrashing that might be encountered if newer requests were to “cut in line” in front of older requests, preventing the old requests from finishing as quickly as possible.

Note

This prioritization scheme is simple, and could maybe be improved. Fortunately, the ThreadPool class is written to allow easy experimentation with different queueing schemes.

Old API Backwards Compatibility

As a temporary convenience for migration to the latest version of the Request Framework, a few methods from the old API have been provided:

  • Request.getResult() (now replaced with Request.result)
  • Request.writeInto() (This member is specific to the Lazyflow Graph Framework. It will soon be implemented there, in a special subclass of Request.)

Note

Backwards-compatibility support will be removed soon. If you are depending on the old API, please upgrade your code.

Class Reference

Request

class lazyflow.request.Request(fn, root_priority=[0])[source]
__init__(fn, root_priority=[0])[source]

Constructor. Postconditions: The request has the same cancelled status as its parent (the request that is creating this one).

exception CancellationException[source]

This is raised when the whole request has been cancelled. If you catch this exception from within a request, clean up and return immediately. If you have nothing to clean up, you are not required to handle this exception.

Implementation details: This exception is raised when the cancel flag is checked in the wait() function: - immediately before the request is suspended OR - immediately after the request is woken up from suspension

exception Request.CircularWaitException[source]

This exception is raised if a request calls wait() on itself. Currently, this only catches the most basic case. No attempt is made to detect indirect cycles (e.g. if req.wait() is called from within a req’s own child.), so don’t rely on it to catch tricky deadlocks due to indirect self-waiting.

exception Request.InternalError[source]

This is raised if an error is detected in the Request framework itself. If this exception is raised, it implies a bug in this file (request.py).

exception Request.InvalidRequestException[source]

This is raised when calling wait on a request that has already been cancelled, which can only happen if the request you’re waiting for was spawned elsewhere (i.e. you are waiting for someone else’s request to avoid duplicate work). When this occurs, you will typically want to restart the request yourself.

exception Request.TimeoutException[source]

This is raised if a call to wait() times out in the context of a foreign thread. See Request.wait() for details.

Request.assigned_worker

This member is accessed by the ThreadPool to determine which Worker thread this request belongs to.

Request.block(timeout=None)[source]

Like wait, but does not return a result. Can be used even if the request has already been cleaned.

Request.cancel()[source]

Attempt to cancel this request and all requests that it spawned. No request will be cancelled if other non-cancelled requests are waiting for its results.

Request.clean(_fullClean=True)[source]

Delete all state from the request, for cleanup purposes. Removes references to callbacks, children, and the result.

Parameters:_fullClean – Internal use only. If False, only clean internal bookkeeping members. Otherwise, delete everything, including the result.
classmethod Request.current_request_is_cancelled()[source]

Return True if called from within the context of a cancelled request.

Request.notify_cancelled(fn)[source]

Register a callback function to be called when this request is finished due to cancellation. If we’re already finished and cancelled, call it now.

Parameters:fn – The callback to call if the request is cancelled. Signature: fn()
Request.notify_failed(fn)[source]

Register a callback function to be called when this request is finished due to failure (an exception was raised). If we’re already failed, call it now.

Parameters:fn – The callback to call if the request fails. Signature: fn(exception, exception_info) exception_info is a tuple of (type, value, traceback). See Python documentation on sys.exc_info() for more documentation.
Request.notify_finished(fn)[source]

Register a callback function to be called when this request is finished. If we’re already finished, call it now.

Parameters:fn – The callback to be notified. Signature: fn(result)
classmethod Request.raise_if_cancelled()[source]

If called from the context of a cancelled request, raise a CancellationException immediately.

classmethod Request.reset_thread_pool(num_workers=8)[source]

Change the number of threads allocated to the request system.

As a special case, you may set num_workers to 0. In that case, the normal thread pool is not used at all. Instead, all requests will execute synchronously, from within the submitting thread. Utilities like RequestLock, SimpleRequestCondition will use alternate implementations based on equivalent classes in the builtin threading module.

Note

It is only valid to call this function during startup. Any existing requests will be dropped from the pool!

Request.submit()[source]

If this request isn’t started yet, schedule it to be started.

Request.wait(timeout=None)[source]

Start this request if necessary, then wait for it to complete. Return the request’s result.

Parameters:timeout – If running within a request, this parameter must be None. If running within the context of a foreign (non-request) thread, a timeout may be specified in seconds (floating-point). If the request does not complete within the timeout period, then a Request.TimeoutException is raised.

RequestLock

class lazyflow.request.RequestLock[source]

Request-aware lock. Implements the same interface as threading.Lock. If acquire() is called from a normal thread, the the lock blocks the thread as usual. If acquire() is called from a Request, then the request is suspended so that another Request can be resumed on the thread.

Requests and normal threads can share access to a RequestLock. That is, they compete equally for access to the lock.

Implementation detail: Depends on the ability to call two private Request methods: _suspend() and _wake_up().

class RequestLockQueue[source]

This data structure is a pseudo-priority queue. If you’re not ready to process the highest-priority item, you can simply push it back. It will be placed in a secondary queue while you continue to process other items.

Two priority queues are maintained: one for pushing, one for popping. Items are popped from the ‘popping queue’ until it is empty, and then the two queues are swapped.

Suppose you pop an item (the highest priority item), but you discover you’re not able to use it immediately for some reason (e.g. it’s a request that is still waiting for a lock). Hence, you simply ‘push’ it back into this data structure.

If there were only one queue, it would end up a the front of the queue again (it was the highest priority item, after all).

That is, you would never make any progress on the queue because you would just pop and push the same item over and over!

But since this data structure uses TWO queues, the pushed item will be put on the ‘pushing queue’ instead and, it won’t be popped again until the popping queue is depleted (at which point the two queues are swapped).

With this scheme, high-priority requests can opt not to monopolize access to a lock if they need to wait for lower-priority requests to complete before continuing. This is important for code involving condition variables, for instance.

RequestLock.acquire(blocking=True)[source]

Acquire the lock. If blocking is True, block until the lock is available. If blocking is False, don’t wait and return False if the lock couldn’t be acquired immediately.

Parameters:blocking – Same as in threading.Lock
RequestLock.locked()[source]

Return True if lock is currently held by some thread or request.

RequestLock.release()[source]

Release the lock so that another request or thread can acquire it.

SimpleRequestCondition

class lazyflow.request.SimpleRequestCondition[source]

A Request-compatible condition variable that supports a limited subset of the features implemented by the standard threading.Condition.

Limitations:

  • Only one request may call wait() at a time.
  • Likewise, notify() doesn’t accept the n arg.
  • Likewise, there is no notify_all() method.
  • wait() doesn’t support the timeout arg.

Note

It would be nice if we could simply use threading.Condition( RequestLock() ) instead of rolling our own custom condition variable class, but that doesn’t quite work in cases where we need to call wait() from a worker thread (a non-foreign thread). (threading.Condition uses threading.Lock() as its ‘waiter’ lock, which blocks the entire worker.)

Example:

cond = SimpleRequestCondition()

def process_all_data():
    with cond:
        while not all_finished:
            while not is_data_chunk_ready():
                cond.wait()
            all_finished = process_available_data()

def retrieve_some_data():
    get_some_data()
    with cond:
        cond.notify()

req1 = Request( retrieve_some_data )
req2 = Request( retrieve_some_data )
req3 = Request( retrieve_some_data )

req1.submit()
req2.submit()
req3.submit()

# Wait for them all to finish...
process_all_data()
notify()[source]

Notify the condition that it can stop wait()-ing. The called must own (acquire) the condition before calling this method. Also, the waiting request cannot return from wait() until the condition is released, so the caller should generally release the condition shortly after calling this method.

Note

It is okay to call this from more than one request in parallel.

wait()[source]

Wait for another request to call py:meth:notify(). The caller must own (acquire) the condition before calling this method. The condition is automatically released() while this method waits for notify() to be called, and automatically acquired() again before returning.

Note

Unlike threading.Condition, it is NOT valid to call wait() from multiple requests in parallel. That is, this class supports only one ‘consumer’ thread.

Note

Unlike threading.Condition, no timeout parameter is accepted here.

RequestPool

class lazyflow.request.RequestPool(max_active=None)[source]

Class for submitting a batch of requests and waiting until they are all complete. Requests cannot be added to the pool after it has already started. Not threadsafe:

  • don’t call add() from more than one thread
  • don’t call wait() from more than one thread, but you CAN add requests that are already executing in a different thread to a requestpool
exception RequestPoolError[source]

Raised if you attempt to use the Pool in a manner that it isn’t designed for.

RequestPool.add(req)[source]

Add a request to the pool. The pool must not be submitted yet. Otherwise, an exception is raised.

RequestPool.cancel()[source]

Cancel all requests in the pool.

RequestPool.clean()[source]

Release our handles to all requests in the pool, for cleanup purposes. There is no need to call this yourself.

RequestPool.request(func)[source]

Deprecated method. Convenience function to construct a request for the given callable and add it to the pool.

RequestPool.wait()[source]

Launch all requests and return after they have all completed, including their callback handlers.

First, N requests are launched (N=_max_active). As each one completes, launch a new request to replace it until there are no unsubmitted requests remaining.

The block() function is called on every request after it has completed, to ensure that any exceptions from those requests are re-raised by the RequestPool.

(If we didn’t block for ‘finishing’ requests at all, we’d be violating the Request ‘Callback Timing Guarantee’, which must hold for both Requests and RequestPools. See Request docs for details.)

After that, each request is discarded, so that it’s reference dies immediately and any memory it consumed is reclaimed.

So, requests fall into four categories:

  1. unsubmitted

    Not started yet.

  2. active

    Currently running their main workload

  3. finishing

    Main workload complete, but might still be running callback handlers. We will call block() on these to ensure they have finished with callbacks, and then discard them.

  4. discarded

    After each request is processed in _clear_finishing_requests(), it is removed from the ‘finishing’ set and the RequestPool retains no references to it any more.

ThreadPool

class lazyflow.request.threadPool.ThreadPool(num_workers, queue_type=<class 'lazyflow.utility.priorityQueue.PriorityQueue'>)[source]

Manages a set of worker threads and dispatches tasks to them.

__init__(num_workers, queue_type=<class 'lazyflow.utility.priorityQueue.PriorityQueue'>)[source]

Constructor. Starts all workers.

Parameters:
  • num_workers – The number of worker threads to create.
  • queue_type – The type of queue to use for prioritizing tasks. Possible queue types include PriorityQueue, FifoQueue, and LifoQueue, or any class with push(), pop(), and __len__() methods.
stop()[source]

Stop all threads in the pool, and block for them to complete. Postcondition: All worker threads have stopped. Unfinished tasks are simply dropped.

wake_up(task)[source]

Schedule the given task on the worker that is assigned to it. If it has no assigned worker yet, assign it to the first worker that becomes available.

class lazyflow.request.threadPool.PriorityQueue[source]

Threadsafe priority queue based on the python heapq module.

Ties are resolved by popping the element that was added first. If the elements are tuples (p1, p2, ..., element), the elements are never considered for comparison!