###############################################################################
# lazyflow: data flow based lazy parallel computation framework
#
# Copyright (C) 2011-2014, the ilastik developers
# <team@ilastik.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the Lesser GNU General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# See the files LICENSE.lgpl2 and LICENSE.lgpl3 for full text of the
# GNU Lesser General Public License version 2.1 and 3 respectively.
# This information is also available on the ilastik web site at:
# http://ilastik.org/license/
###############################################################################
#Python
import sys
import logging
import collections
import itertools
import threading
import functools
import warnings
#SciPy
import numpy
import vigra
#lazyflow
from lazyflow import rtype
from lazyflow.roi import TinyVector
from lazyflow.request import Request
from lazyflow.stype import ArrayLike
from lazyflow.metaDict import MetaDict
from lazyflow.utility import slicingtools, OrderedSignal
class ValueRequest(object):
"""Pseudo request that behaves like a request.Request object.
This object is used to prevent the heavy construction of complete
Request objects in simple cases where they are not needed.
"""
def __init__(self, value):
self.result = value
self.started = False
def wait(self):
return self.result
def block(self):
pass
def submit(self):
pass
def notify_finished(self, callback):
callback(self.result)
def notify_failed(self, callback):
pass
def notify_cancelled(self, callback):
pass
def clean(self):
self.result = None
def writeInto(self, destination):
# Unfortunately, there appears to be a bug when copying masked arrays
# ( https://github.com/numpy/numpy/issues/5558 ).
# So, this must be used in the interim.
if isinstance(destination, numpy.ma.masked_array):
destination.data[...] = numpy.ma.getdata(self.result)
destination.mask[...] = numpy.ma.getmaskarray(self.result)
if isinstance(self.result, numpy.ma.masked_array):
destination.fill_value = self.result.fill_value
elif isinstance(destination, collections.MutableSequence) or \
isinstance(self.result, collections.MutableSequence):
destination[:] = self.result[:]
else:
destination[...] = self.result[...]
return self
def is_setup_fn(func):
"""
Decorator. Marks the function as a 'setup' function,
which means it affects the state of the graph connections.
All Slot methods that will result in any operator setupOutputs()
calls should be marked as setup functions using this decorator.
Executes the function within the context of a
Graph setup operation, which tells the Graph that we are
making graph setup changes by incrementing a counter for
each nested setup function call. See graph.py for details.
"""
@functools.wraps(func)
def call_in_setup_context(self, *args, **kwargs):
if not self.graph:
return func(self, *args, **kwargs)
with self.graph.SetupDepthContext(self.graph):
return func(self, *args, **kwargs)
call_in_setup_context.__wrapped__ = func # Emulate python 3 behavior of @wraps
return call_in_setup_context
class Slot(object):
"""
Base class for InputSlot, OutputSlot
"""
loggerName = __name__ + '.Slot'
logger = logging.getLogger(loggerName)
traceLogger = logging.getLogger('TRACE.' + loggerName)
# Allow slots to be sorted by their order of creation for debug
# output and diagramming purposes.
_global_counter = itertools.count()
class SlotNotReadyError(Exception):
pass
@property
def graph(self):
return (self.operator or None) and self.operator.graph
def __init__(self, name="", operator=None, stype=ArrayLike,
rtype=rtype.SubRegion, value=None, optional=False,
level=0, nonlane=False, allow_mask=False):
"""Constructor of the Slot class.
:param name: user readable name of the slot, is normally
assigned automatically by the Operator
:param operator: the parent operator of a slot
:param stype: the slot type (see stype.py)
:param rtype: the region of interest type (see rtype.py)
:param value: the default value of the slot
:param optional: if True this means the slot needs a value or
connection for its parent operator to be functional
:param level: defines the dimensionality of the slot. 0 for
single element (e.g. single numpy.ndarray), 1 for list of
elements (e.g. list of strings), 2 for list of list of
elements.
:param nonlane: For multislot, this flag protects it from
being considered lane-indexed
"""
# This assertion is here for a reason: default values do NOT work on OutputSlots.
# (We should probably change that at some point...)
assert value is None or isinstance(self, InputSlot), "Only InputSlots can have default values. OutputSlots cannot."
# If we do not support masked arrays, ensure that we are not being passed one.
assert allow_mask or not isinstance(value, numpy.ma.masked_array), \
"The operator, \"%s\", is being setup to receive a masked array as input to slot, \"%s\"." \
" This is currently not supported." \
% (self.operator.name, self.name)
# Check for simple mistakes in parameter order...
assert isinstance(name, str)
assert isinstance(optional, bool)
if not hasattr(self, "_type"):
self._type = None
if type(stype) is str:
stype = ArrayLike
self.partners = []
self.name = name
self._optional = optional
self.operator = operator
self.allow_mask = allow_mask
self._real_operator = None # Memoized in getRealOperator()
# in the case of an InputSlot this is the slot to which it is
# connected
self.partner = None
self.level = level
# in the case of an InputSlot one can directly assign a value
# to a slot instead of connecting it to a partner, this
# attribute holds the value
self._value = None
self._defaultValue = value
# Causes calls to setValue to be propagated backwards to the
# partner slot. Used by the OperatorWrapper.
self._backpropagate_values = False
self.rtype = rtype
# the MetaDict that holds the slots meta information
self.meta = MetaDict()
# if level > 0, this holds the sub-Input/Output slots
self._subSlots = []
self._stypeType = stype
# the slot type instance
self.stype = stype(self)
self.nonlane = nonlane
self._sig_changed = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_value_changed = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_ready = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_unready = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_dirty = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_connect = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_disconnect = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_resize = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_resized = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_remove = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_removed = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_insert = OrderedSignal(hide_cancellation_exceptions=True)
self._sig_inserted = OrderedSignal(hide_cancellation_exceptions=True)
self._resizing = False
self._executionCount = 0
self._settingUp = False
self._condition = threading.Condition()
# Allow slots to be sorted by their order of creation for
# debug output and diagramming purposes.
self._global_slot_id = Slot._global_counter.next()
###########################
# A p i M e t h o d s #
###########################
def notifyDirty(self, function, **kwargs):
"""
calls the corresponding function when the slot gets dirty
first argument of the function is the slot, second argument the roi
the keyword arguments follow
"""
self._sig_dirty.subscribe(function, **kwargs)
def notifyMetaChanged(self, function, **kwargs):
"""calls the corresponding function when the slot meta
information is changed
first argument of the function is the slot
the keyword arguments follow
"""
self._sig_changed.subscribe(function, **kwargs)
def notifyValueChanged(self, function, **kwargs):
"""Used by slots with cached values to notify when the cache
has changed, even if the output is not dirty.
"""
self._sig_value_changed.subscribe(function, **kwargs)
def notifyReady(self, function, **kwargs):
"""Calls the corresponding function when the slot is "ready",
meaning it is connected and will produce data when called.
This is implemented by manipulating and monitoring a flag in
the slot metadata.
first argument of the function is the slot
the keyword arguments follow
"""
self._sig_ready.subscribe(function, **kwargs)
def notifyUnready(self, function, **kwargs):
"""
Subscribe to "unready" callbacks. See notifyReady for details.
"""
self._sig_unready.subscribe(function, **kwargs)
def _notifyConnect(self, function, **kwargs):
"""
calls the corresponding function when the slot is connected
first argument of the function is the slot
the keyword arguments follow
"""
self._sig_connect.subscribe(function, **kwargs)
def notifyDisconnect(self, function, **kwargs):
"""
calls the corresponding function when the slot is disconnected
first argument of the function is the slot
the keyword arguments follow
"""
self._sig_disconnect.subscribe(function, **kwargs)
def notifyResize(self, function, **kwargs):
"""
calls the corresponding function before the slot is resized
first argument of the function is the slot
second argument is the old size and the third
argument is the new size
the keyword arguments follow
"""
self._sig_resize.subscribe(function, **kwargs)
def notifyResized(self, function, **kwargs):
"""
calls the corresponding function after the slot is resized
first argument of the function is the slot
second argument is the old size and the third
argument is the new size
the keyword arguments follow
"""
self._sig_resized.subscribe(function, **kwargs)
def notifyRemove(self, function, **kwargs):
"""
calls the corresponding function BEFORE a slot is removed
first argument of the function is the slot
second argument is the old size and the third
argument is the new size
the keyword arguments follow
"""
self._sig_remove.subscribe(function, **kwargs)
def notifyRemoved(self, function, **kwargs):
"""
calls the corresponding function AFTER a slot is removed
first argument of the function is the slot
second argument is the old size and the third
argument is the new size
the keyword arguments follow
"""
self._sig_removed.subscribe(function, **kwargs)
def notifyInsert(self, function, **kwargs):
"""
calls the corresponding function BEFORE a slot has been added
first argument of the function is the slot
second argument is the old size and the third
argument is the new size
the keyword arguments follow
"""
self._sig_insert.subscribe(function, **kwargs)
def notifyInserted(self, function, **kwargs):
"""
calls the corresponding function AFTER a slot has been added
first argument of the function is the slot
second argument is the old size and the third
argument is the new size
the keyword arguments follow
"""
self._sig_inserted.subscribe(function, **kwargs)
def unregisterDirty(self, function):
"""
unregister a dirty callback
"""
self._sig_dirty.unsubscribe(function)
def _unregisterConnect(self, function):
"""
unregister a connect callback
"""
self._sig_connect.unsubscribe(function)
def unregisterDisconnect(self, function):
"""
unregister a disconnect callback
"""
self._sig_disconnect.unsubscribe(function)
def unregisterMetaChanged(self, function):
"""
unregister a changed callback
"""
self._sig_changed.unsubscribe(function)
def unregisterValueChanged(self, function):
"""
unregister a value changed callback
"""
self._sig_value_changed.unsubscribe(function)
def unregisterReady(self, function):
"""
unregister a ready callback
"""
self._sig_ready.unsubscribe(function)
def unregisterUnready(self, function):
"""
unregister an unready callback
"""
self._sig_unready.unsubscribe(function)
def unregisterResize(self, function):
"""
unregister a resize callback
"""
self._sig_resize.unsubscribe(function)
def unregisterResized(self, function):
"""
unregister a resized callback
"""
self._sig_resized.unsubscribe(function)
def unregisterRemove(self, function):
"""
unregister a remove callback
"""
self._sig_remove.unsubscribe(function)
def unregisterRemoved(self, function):
"""
unregister a removed callback
"""
self._sig_removed.unsubscribe(function)
def unregisterInsert(self, function):
"""
unregister a insert callback
"""
self._sig_insert.unsubscribe(function)
def unregisterInserted(self, function):
"""
unregister a inserted callback
"""
self._sig_inserted.unsubscribe(function)
def _handleUpstreamUnready(self, slot):
"""
This handler ensures that UNready status propagates quickly
through the graph (before the normal _changed path)
"""
if self.meta._ready:
self.meta._ready = False
self._sig_unready(self)
@is_setup_fn
def connect(self, partner, notify=True, permit_distant_connection=False):
"""
Connect a slot to another slot
Arguments:
partner : the slot to which this slot is conencted
"""
try:
if partner is None:
self.disconnect()
return
assert isinstance(partner, Slot), ("Slot.connect() can only be used to"
" connect other Slots. Did you mean"
" to use Slot.setValue()?")
assert self.allow_mask or (not partner.meta.has_mask), \
"The operator, \"%s\", is being setup to receive a masked array as input to slot, \"%s\"," \
" from the output slot, \"%s\", on operator, \"%s\". This is currently not supported." \
% (self.operator.name, self.name, partner.name, partner.operator.name)
my_op = self.getRealOperator()
partner_op = partner.getRealOperator()
if partner_op and not( partner_op.parent is my_op.parent or \
(self._type == "output" and partner_op.parent is my_op) or \
(self._type == "input" and my_op.parent is partner_op) or \
my_op is partner_op):
if not permit_distant_connection:
msg = "It is forbidden to connect slots of operators that are not siblings "\
"or not directly related as parent and child."
if partner_op.parent is None or my_op.parent is None:
msg += "\n(For one of your operators, parent=None. Was it already cleaned up?"
raise Exception(msg)
if self.partner is partner and partner.level == self.level:
return
if self.level == 0:
self.disconnect()
if partner is not None:
partner._sig_unready.subscribe( self._handleUpstreamUnready )
self._value = None
if partner.level == self.level:
assert isinstance(partner.stype, type(self.stype)), \
"Can't connect slots of non-matching stypes!" \
" Attempting to connect '{}' (stype: {}) to '{}' (stype: {})".format(self.name, self.stype, partner.name, partner.stype)
self.partner = partner
notifyReady = (self.partner.meta._ready and
not self.meta._ready)
self.meta = self.partner.meta.copy()
# the slot with more sub-slots determines
# the number of subslots
if len(self) < len(partner):
self.resize(len(partner))
elif len(self) > len(partner):
partner.resize(len(self))
partner.partners.append(self)
for i in range(len(self.partner)):
p = self.partner[i]
self[i].connect(p)
# call slot type connect function
self.stype.connect(partner)
if self.level > 0 or self.stype.isConfigured():
self._changed()
# call connect callbacks
self._sig_connect(self)
# Notify readiness after partner is updated
if notifyReady:
self._sig_ready(self)
elif partner.level < self.level:
self.partner = partner
notifyReady = (self.partner.meta._ready and not
self.meta._ready)
self.meta = self.partner.meta.copy()
for i, slot in enumerate(self._subSlots):
slot.connect(partner)
if notifyReady:
self._sig_ready(self)
self._changed()
# call connect callbacks
self._sig_connect(self)
elif partner.level > self.level:
msg = str("Can't connect slots:"
" {}.{}.level={}, but"
" {}.{}.level={}"
" (Implicit OpearatorWrapper creation"
" is no longer supported.)").format(
self.getRealOperator().name,
self.name, self.level,
partner.getRealOperator().name,
partner.name, partner.level)
raise RuntimeError(msg)
# propagate value changed signals from inner to outer
# operators.
if self._type == partner._type == "output":
partner.notifyValueChanged(self._sig_value_changed)
except:
try:
raise
finally:
try:
# We would like to clean up by calling self.disconnect()
# ... but if that raises an exception, it OVERWRITES the original exception.
# This complicated nest of try/except/finally is supposed to prevent that from happening.
# For example, see the bottom of this site:
# http://doughellmann.com/2009/06/19/python-exception-handling-techniques.html
# And yet, that DOESN'T work here for some unknown reason.
# Hence, we can't actually clean up.
# What a bummer.
##self.disconnect() # commented out because it might throw and hide the original exception. See note above.
pass
except:
# Well, this is bad. We caused an exception while handling an exception.
# We're more interested in the FIRST exception, so print this one out and
# continue unwinding the stack with the first one.
self.logger.error("Error: Caught a secondary exception while handling a different exception.")
import traceback
traceback.print_exc()
pass
@is_setup_fn
def disconnect(self):
"""
Disconnect a InputSlot from its partner
"""
if self.backpropagate_values and self.getRealOperator() and not self.getRealOperator()._cleaningUp:
if self.partner is not None:
self.partner.disconnect()
return
for slot in self._subSlots:
slot.disconnect()
had_partner = False
if self.partner is not None:
had_partner = True
# safe to unsubscribe, even if not subscribed
self.partner._sig_unready.unsubscribe(self._handleUpstreamUnready)
try:
self.partner.partners.remove(self)
except ValueError:
pass
self.partner = None
had_value = self._value is not None
self._value = None
oldReady = self.meta._ready
self.meta = MetaDict()
if len(self._subSlots) > 0 and self.getRealOperator() and not self.getRealOperator()._cleaningUp:
self.resize(0)
# call callbacks
if had_partner or had_value:
self._sig_disconnect(self)
# Notify our partners that we changed.
self._changed()
# If we were ready before, signal that we aren't any more
if oldReady:
self._sig_unready(self)
@is_setup_fn
def resize(self, size):
"""
Resizes a slot to the desired length
Arguments:
size : the desired number of subslots
"""
assert numpy.issubdtype(type(size), numpy.integer), \
"Bug: 'size' must be int, not {}".format( type(size) )
if self._resizing:
return
if self.level == 0:
raise RuntimeError("Can't resize a level-0 slot!")
oldsize = len(self)
if size == oldsize:
return
self._resizing = True
if self.operator is not None:
self.logger.debug("Resizing slot {} of operator {} to size {}".format(
self.name, self.operator.name, size))
# call before resize callbacks
self._sig_resize(self, oldsize, size)
new_subslots = []
while size > len(self):
self.insertSlot(len(self), len(self)+1, propagate=False)
new_subslots.append( len(self) - 1 )
while size < len(self):
self.removeSlot(len(self)-1, len(self)-1, propagate=False)
# propagate size change downward
for c in self.partners:
if c.level == self.level:
c.resize(size)
# propagate size change upward
if (self.partner and len(self.partner) < size and self.partner.level == self.level):
self.partner.resize(size)
# connect newly added slots
# We must connect these subslots here, AFTER all resizes have propagated up and down through the graph.
# Otherwise, our new subslots may lose downstream partners (happens in "diamond" shaped graphs.)
for i in new_subslots:
self._connectSubSlot(i)
# call after resize callbacks
self._sig_resized(self, oldsize, size)
self._resizing = False
@is_setup_fn
def insertSlot(self, position, finalsize, propagate=True):
"""
Insert a new slot at the specified position
finalsize indicates the final destination size
"""
if len(self) >= finalsize:
return self[position]
# call after insert callbacks
self._sig_insert(self, position, finalsize)
slot = self._insertNew(position)
# New slot inherits our settings
slot.backpropagate_values = self.backpropagate_values
operator_name = '<NO OPERATOR>'
if self.operator:
operator_name = self.operator.name
self.logger.debug("Inserting slot {} into slot {} of operator {} to size {}".format(
position, self.name, operator_name, finalsize))
if propagate:
if self.partner is not None and self.partner.level == self.level:
self.partner.insertSlot(position, finalsize)
for p in self.partners:
if p.level == self.level:
p.insertSlot(position, finalsize)
self._connectSubSlot(position)
# call after insert callbacks
self._sig_inserted(self, position, finalsize)
return slot
@is_setup_fn
def removeSlot(self, position, finalsize, propagate=True):
"""
Remove the slot at position
finalsize indicates the final size of all subslots
"""
if len(self) <= finalsize:
return None
assert position < len(self)
if self.operator is not None:
self.logger.debug("Removing slot {} into slot {} of operator {} to size {}".format(
position, self.name, self.operator.name, finalsize))
# call before-remove callbacks
self._sig_remove(self, position, finalsize)
slot = self._subSlots.pop(position)
slot.disconnect()
slot.operator = None
slot._real_operator = None
if propagate:
if self.partner is not None and self.partner.level == self.level:
self.partner.removeSlot(position, finalsize)
for p in self.partners:
if p.level == self.level:
p.removeSlot(position, finalsize)
# call after-remove callbacks
self._sig_removed(self, position, finalsize)
def get(self, roi):
"""This method is used to retrieve the actual content of a Slot.
:param roi: the region of interest, e.g. a subregion in the
case of an ArrayLike stype
:param destination: this may define a destination area for the
request, for example a ndarray into which the results should
be written in the case of an ArrayLike stype
Returns:
a request.Request object.
"""
if self._value is not None:
# this handles the case of an inputslot
# having a ._value
# --> construct cheaper request object for this case
result = self.stype.writeIntoDestination(None, self._value, roi)
return ValueRequest(result)
elif self.partner is not None:
# this handles the case of an inputslot
# --> just relay the request
return self.partner.get(roi)
else:
if not self.ready():
# Something is wrong. Are we cancelled?
Request.raise_if_cancelled()
msg = "Can't get data from slot {}.{} yet."\
" It isn't ready."\
"First upstream problem slot is: {}"
problem_slot = Slot._findUpstreamProblemSlot(self)
problem_str = str( problem_slot )
if isinstance( problem_slot, Slot ):
problem_op = problem_slot.getRealOperator()
problem_str = problem_op.name + '/' + str( problem_slot )
msg = msg.format( self.getRealOperator() and self.getRealOperator().__class__, self.name, problem_str )
raise Slot.SlotNotReadyError(msg)
# If someone is asking for data from an inputslot that has
# no value and no partner, then something is wrong.
if self._type == "input":
# Something is wrong. Are we cancelled?
Request.raise_if_cancelled()
assert self._type != "input", "This inputSlot has no value and no partner. You can't ask for its data yet!"
# normal (outputslot) case
# --> construct heavy request object..
execWrapper = Slot.RequestExecutionWrapper(self, roi)
request = Request(execWrapper)
# We must decrement the execution count even if the
# request is cancelled
request.notify_cancelled(execWrapper.handleCancel)
return request
@staticmethod
def _findUpstreamProblemSlot(slot):
if slot.partner is not None:
return Slot._findUpstreamProblemSlot( slot.partner )
if slot.getRealOperator() is not None:
for inputSlot in slot.getRealOperator().inputs.values():
if not inputSlot._optional and not inputSlot.ready():
return inputSlot
return "Couldn't find an upstream problem slot."
class RequestExecutionWrapper(object):
def __init__(self, slot, roi):
self.started = False
self.finished = False
self.slot = slot
self.operator = slot.operator
self.lock = threading.Lock()
self.roi = roi
def __call__(self, destination=None):
# store whether the user wants the results in a given
# destination area
destination_given = destination is not None
if destination is None:
destination = self.slot.stype.allocateDestination(self.roi)
else:
if self.slot.meta.dtype is not None and hasattr(destination, 'dtype'):
assert self.slot.meta.dtype == destination.dtype, \
"Can't provide a destination array of the wrong dtype. "\
"Slot generates {}, but you gave {}".format( self.slot.meta.dtype, destination.dtype )
# We are executing the operator. Incremement the execution
# count to protect against simultaneous setupOutputs()
# calls.
self._incrementOperatorExecutionCount()
try:
# Execute the workload, which might not ever return
# (if we get cancelled).
result_op = self.operator.execute(self.slot, (), self.roi, destination)
# copy data from result_op to destination, if
# destination was actually given by the user, and the
# returned result_op is different from destination.
# (but don't copy if result_op is None, this means
# legacy op which wrote into destination anyway)
if destination_given and result_op is not None and id(result_op) != id(destination):
# check that the returned value is compatible with the requested roi
self.slot.stype.check_result_valid(self.roi, result_op)
self.slot.stype.copy_data(dst=destination, src = result_op)
elif result_op is not None:
# FIXME: this should be moved to a isCompatible
# check in stypes.py
if hasattr(result_op, "shape"):
assert result_op.shape == destination.shape, \
("ERROR: Operator {} has failed to provide a"
" result of correct shape. result shape is"
" {} vs {}. roi was {}".format(
self.operator, result_op.shape,
destination.shape, str(self.roi)))
destination = result_op
# check that the returned value is compatible with the requested roi
self.slot.stype.check_result_valid(self.roi, destination)
# Decrement the execution count
self._decrementOperatorExecutionCount()
return destination
except: # except Request.CancellationException
# Decrement the execution count
self._decrementOperatorExecutionCount()
raise
def _incrementOperatorExecutionCount(self):
self.started = True
assert self.operator._executionCount >= 0, \
"BUG: How did the execution count get negative?"
# We can't execute while the operator is in the middle of
# setupOutputs
with self.operator._condition:
while self.operator._settingUp:
self.operator._condition.wait()
self.operator._executionCount += 1
def handleCancel(self, *args):
# The new request api does clean up by handling an
# exception, not in this callback. Only clean up if we are
# using the old request api
using_old_api = len(args) > 0 and not hasattr(args[0], 'notify_cancelled')
if using_old_api:
self._decrementOperatorExecutionCount()
def _decrementOperatorExecutionCount(self):
# Must lock here because cancel callbacks are
# asynchronous. (Perhaps it would be better if they were
# called from the worker thread instead...)
with self.lock:
# Only do this once per execution. If we were cancelled
# after we finished working, don't do anything
if self.started and not self.finished:
assert self.operator._executionCount > 0, \
"BUG: Can't decrement the execution count below zero!"
self.finished = True
with self.operator._condition:
self.operator._executionCount -= 1
self.operator._condition.notifyAll()
@is_setup_fn
def setDirty(self, *args, **kwargs):
"""This method is called by a partnering OutputSlot when its
content changes.
The 'key' parameter identifies the changed region
of an numpy.ndarray
"""
assert self.operator is not None, ("Slot '{}' cannot be set dirty,"
" slot not belonging to any"
" actual operator instance".format(self.name))
if self.stype.isConfigured():
if len(args) == 0 or not isinstance(args[0], rtype.Roi):
roi = self.rtype(self, *args, **kwargs)
else:
roi = args[0]
for c in self.partners:
c.setDirty(roi)
# call callbacks
self._sig_dirty(self, roi)
if self._type == "input" and self.operator.configured():
self.operator.propagateDirty(self, (), roi)
def __iter__(self):
assert self.level >= 1
return self._subSlots.__iter__()
def __getitem__(self, key):
"""If level=0, emulate __call__ but with a slicing instead of
a roi.
If level>0, return the subslot corresponding to the key, which
may be a tuple
"""
if self.level > 0:
if isinstance(key, tuple):
assert len(key) > 0
assert len(key) <= self.level
if len(key) == 1:
return self._subSlots[key[0]]
else:
return self._subSlots[key[0]][key[1:]]
return self._subSlots[key]
else:
if self.meta.shape is None:
# Something is wrong. Are we cancelled?
Request.raise_if_cancelled()
if not self.ready():
#msg = "This slot ({}.{}) isn't ready yet, which means " \
# "you can't ask for its data. Is it connected?".format(self.getRealOperator() and self.getRealOperator().name, self.name)
#self.logger.error(msg)
problem_slot = Slot._findUpstreamProblemSlot(self)
problem_str = str( problem_slot )
if isinstance( problem_slot, Slot ):
problem_op = problem_slot.getRealOperator()
if problem_op is not None:
problem_str = problem_op.name + '/' + str( problem_slot )
else:
problem_str = '<NO OPERATOR> /' + str( problem_slot )
slotInfoMsg = "Can't get data from slot {}.{} yet."\
" It isn't ready."\
"First upstream problem slot is: {}"\
"".format( self.getRealOperator() and self.getRealOperator().__class__, self.name, problem_str )
#self.logger.error(slotInfoMsg)
raise Slot.SlotNotReadyError(slotInfoMsg)
assert self.meta.shape is not None, \
("Can't ask for slices of this slot yet:"
" self.meta.shape is None!"
" (operator {} [self={}] slot: {}, key={}".format(
self.operator.name, self.operator, self.name, key))
return self(pslice=key)
def __setitem__(self, key, value):
"""This method provides access to the subslots of a
MultiSlot.
"""
assert not isinstance(value, Slot), \
"Can't use setitem to connect slots. Use connect()"
assert self.level == 0, \
("setitem can only be used with slots of level 0."
" Did you forget to append a key?")
assert self.operator is not None, \
"cannot do __setitem__ on Slot '{}' -> no operator !!"
assert slicingtools.is_bounded(key), \
"Can't use Slot.__setitem__ with keys that include : or ..."
# If we do not support masked arrays, ensure that we are not being passed one.
assert self.allow_mask or not (self.meta.has_mask or isinstance(value, numpy.ma.masked_array)), \
"The operator, \"%s\", is being setup to receive a masked array as input to slot, \"%s\"." \
" This is currently not supported." \
% (self.operator.name, self.name)
roi = self.rtype(self, pslice=key)
if self._value is not None:
self._value[key] = value
# only propagate the dirty key at the very beginning of
# the chain
self.setDirty(roi)
if self._type == "input":
self.operator.setInSlot(self, (), roi, value)
# Forward to partners
for p in self.partners:
p[key] = value
def index(self, slot):
return self._subSlots.index(slot)
@is_setup_fn
def setInSlot(self, slot, subindex, roi, value):
"""For now, Slots of level > 0 pretend to be operators (as far
as their subslots are concerned). That's why they have to have
this setInSlot() method.
"""
# If we do not support masked arrays, ensure that we are not being passed one.
assert self.allow_mask or not (self.meta.has_mask or isinstance(value, numpy.ma.masked_array)), \
"The operator, \"%s\", is being setup to receive a masked array as input to slot, \"%s\"." \
" This is currently not supported." \
% (self.operator.name, self.name)
# Determine which subslot this is and prepend it to the totalIndex
totalIndex = (self._subSlots.index(slot),) + subindex
# Forward the call to our operator
self.operator.setInSlot(self, totalIndex, roi, value)
def __len__(self):
"""In the case of a MultiSlot this returns the number of
subslots, i.e. the length of the list
"""
return len(self._subSlots)
@property
def value(self):
"""This method directly returns the full content of a slot.
Is mainly used when region of interest specification make no
sense, e.g. in the case of slots which hold a single integer
or float value
"""
if self.partner is not None:
# outputslot-inputsslot, inputslot-inputslot and outputslot-outputslot case
temp = self[:].wait()
elif self._value is None:
# outputslot case
temp = self[:].wait()
else:
# _value case
return self._value
if isinstance(temp, numpy.ndarray):
if temp.shape == (1,):
return temp[0]
return temp
elif isinstance(temp, list):
return temp[0]
else:
warnings.warn("FIXME: Slot.value for slot {} is {},"
" which should be wrapped in an ndarray."
.format(self.name, temp))
return temp
@is_setup_fn
def setValue(self, value, notify=True, check_changed=True, extra_meta={}):
"""This method can be used to directly assign a value to an
InputSlot.
Usually a slot is either connected to another slot from which
it retrieves the content when it is queried, or it directly
holds a value itself. This method can be used to set such a
value.
If check_changed is True, the new value is compared to the
current one and updates are only triggered if the new value differs
from the old one according to the __eq__ operator.
The check can be turned off with the check_changed flag.
If the value is a VigraArray, then shape/axistags/dtype will be automatically
assigned in self.meta. Additional metadata fields can be added via the
extra_meta parameter.
"""
try:
assert isinstance(notify, bool)
assert isinstance(check_changed, bool)
# This assertion is here to prevent accidental use of setValue
# when connect should be used. If your use case requires
# passing slots as values, then this assertion can be refined.
assert not isinstance(value, Slot), \
"When using setValue, value cannot be a slot. Use connect instead."
# If we do not support masked arrays, ensure that we are not being passed one.
assert self.allow_mask or not (self.meta.has_mask or isinstance(value, numpy.ma.masked_array)), \
"The operator, \"%s\", is being setup to receive a masked array as input to slot, \"%s\"." \
" This is currently not supported." \
% (self.operator.name, self.name)
if not self.backpropagate_values:
assert self.partner is None, \
("Cannot call setValue on this slot."
" It is already connected to a partner."
" Call disconnect first if that's what you really wanted.")
elif self.partner is not None:
self.partner.setValue(value, notify, check_changed)
return
changed = True
# We use == here instead of 'is' to avoid subtle bugs that
# can occur if you supplied an equivalent value that 'is not' the original.
# For example: x=numpy.uint8(3); y=numpy.int64(3); assert x == y; assert x is not y
if check_changed:
changed = False
# Fast path checks for array types
if isinstance(value, numpy.ndarray) or isinstance(self._value, numpy.ndarray):
if type(value) != type(self._value) or value.shape != self._value.shape:
changed = True
if isinstance(value, numpy.ma.masked_array) and isinstance(self._value, numpy.ma.masked_array):
# Type comparison already checked as all masked arrays are subclasses of ndarrays.
# NAN does not compare equal so we need a way to check that separately.
if (value.fill_value != self._value.fill_value) and \
not (numpy.isnan(value.fill_value) and numpy.isnan(self._value.fill_value)):
changed = True
if isinstance(value, vigra.VigraArray) or isinstance(self._value, vigra.VigraArray):
if type(value) != type(self._value) or value.axistags != self._value.axistags:
changed = True
if not changed:
# Slow path checks
same = (value is self._value)
if not same:
try:
same = ( value == self._value )
except ValueError:
# Some values can't be compared with __eq__,
# in which case we assume the values are different
same = False
if isinstance(same, (numpy.ndarray, TinyVector)):
same = same.all()
changed = not same
if changed:
# call disconnect callbacks
self._sig_disconnect(self)
self._value = value
self.stype.setupMetaForValue(value)
for k,v in extra_meta.items():
setattr(self.meta, k, v)
self.meta._dirty = True
for s in self._subSlots:
s.setValue(self._value)
# a slot with a value is ready unless the value is None.
if self._value is not None:
if self.meta._ready != True:
self.meta._ready = True
self._sig_ready(self)
else:
if self.meta._ready != False:
self.meta._ready = False
self._sig_unready(self)
# call connect callbacks
self._sig_connect(self)
self._changed()
# Propagate dirtyness
if self.rtype == rtype.List:
self.setDirty(())
else:
self.setDirty(slice(None))
except:
try:
raise
finally:
try:
# We would like to clean up by calling self.disconnect()
# ... but if that raises an exception, it OVERWRITES the original exception.
# This complicated nest of try/except/finally is supposed to prevent that from happening.
# For example, see the bottom of this site:
# http://doughellmann.com/2009/06/19/python-exception-handling-techniques.html
# And yet, that DOESN'T work here for some unknown reason.
# Hence, we can't actually clean up.
# What a bummer.
##self.disconnect() # commented out because it might throw and hide the original exception. See note above.
pass
except:
# Well, this is bad. We caused an exception while handling an exception.
# We're more interested in the FIRST excpetion, so print this one out and
# continue unwinding the stack with the first one.
self.logger.error("Error: Caught a secondary exception while handling a different exception.")
import traceback
traceback.print_exc()
pass
@is_setup_fn
def setValues(self, values):
"""Set values of subslots with arraylike object. Resizes the
multinputslot with the length of the values array
"""
try:
# call disconnect callbacks
self._sig_disconnect(self)
self.resize(len(values))
for i, s in enumerate(self._subSlots):
s.setValue(values[i])
# call connect callbacks
self._changed()
self._sig_connect(self)
except:
try:
exc_info = sys.exc_info()
self.disconnect()
except:
# Well, this is bad. We caused an exception while handling an exception.
# We're more interested in the FIRST excpetion, so print this one out and
# continue unwinding the stack with the first one.
self.logger.error("Error: Caught a secondary exception while handling a different exception.")
import traceback
traceback.print_exc()
raise exc_info[0], exc_info[1], exc_info[2]
raise
@property
def backpropagate_values(self):
return self._backpropagate_values
@backpropagate_values.setter
def backpropagate_values(self, backprop):
self._backpropagate_values = backprop
for slot in self._subSlots:
slot.backpropagate_values = backprop
def connected(self):
"""Returns True if the slot is conencted to a partner slot or
has a _value assigned as input
"""
answer = True
if self._value is None and self.partner is None:
answer = False
if answer is False and len(self._subSlots) > 0:
answer = True
for s in self._subSlots:
if s.connected() is False:
answer = False
break
return answer
def configured(self):
"""Slots of level >= 1 must implement parts of the operator
interface, including this function. This "operator" is
considered "configured" if it is ready.
"""
return self._optional or self.ready()
def ready(self):
if self.level == 0:
# If this slot is non-multi, then just check our own
# status
ready = self.meta._ready
else:
# If this slot is multi, check all of our subslots. (If we
# have no subslots, then we are NOT ready). Operators that
# can properly handle an empty multi-input slot should
# mark the input as optional.
ready = len(self._subSlots) > 0 and all(p.ready() for p in self._subSlots)
return ready
def _setReady(self):
wasReady = self.ready()
for p in self._subSlots:
p._setReady()
self.meta._ready = (self.level == 0) or (len(self._subSlots) > 0)
# If we just became ready...
if not wasReady and self.meta._ready:
# Notify partners of changed readystatus
self._changed()
self._sig_ready(self)
def __call__(self, *args, **kwargs):
"""The slot relays all arguments to the __init__ method of the
Roi type. This allows lazyflow to support different types of
rois without knowing anything about them.
"""
roi = self.rtype(self, *args, **kwargs)
return self.get(roi)
def getRealOperator(self):
"""If a slot is owned by a higher-level slot, self.operator is
a slot. This function keeps going up the hierarchy until it
finds the actual operator this slot belongs to.
"""
if self._real_operator is not None:
# use memoized
return self._real_operator
if isinstance(self.operator, Slot):
self._real_operator = self.operator.getRealOperator()
else:
self._real_operator = self.operator
return self._real_operator
#####################
# Private Methods #
#####################
def _getInstance(self, operator, **init_kwarg_overrides):
"""
This method constructs a copy of the slot.
This method is used when creating an Instance of an Operator.
All slot parameters (e.g. level, optional, etc.) are copied, but can be overridden with the init_kwarg_overrides parameter.
"""
init_kwargs = {}
init_kwargs['stype'] = self._stypeType
init_kwargs['rtype'] = self.rtype
init_kwargs['value'] = self._defaultValue
init_kwargs['level'] = self.level
init_kwargs['nonlane'] = self.nonlane
init_kwargs['allow_mask'] = self.allow_mask
if self._type == "input":
init_kwargs['optional'] = self._optional
init_kwargs.update( init_kwarg_overrides )
if self._type == "input":
s = InputSlot(self.name, operator, **init_kwargs)
elif self._type == "output":
s = OutputSlot(self.name, operator, **init_kwargs)
return s
def _changed(self):
oldMeta = self.meta
old_ready = self.ready()
if self.partner is not None and self.meta != self.partner.meta:
self.meta = self.partner.meta.copy()
if self._type == "output":
for o in self._subSlots:
o._changed()
# Notify readiness after subslots are updated
if self.ready() != old_ready:
if self.ready():
self._sig_ready(self)
else:
self._sig_unready(self)
wasdirty = self.meta._dirty
if self.meta._dirty:
assert self.allow_mask or (not self.meta.has_mask), \
"The operator, \"%s\", is being setup to receive a masked array as input to slot, \"%s\"." \
" This is currently not supported." \
% (self.operator.name, self.name)
for c in self.partners:
c._changed()
self.meta._dirty = False
if self._type != "output":
op = self.getRealOperator()
if op is not None and not op._cleaningUp:
self._configureOperator(self)
if wasdirty:
# call changed callbacks
self._sig_changed(self)
def _configureOperator(self, slot, oldSize=0, newSize=0, notify=True):
"""Call setupOutputs of Operator if all slots of the operator
are connected and configured.
"""
if self.operator is not None:
# check whether all slots are connected and notify operator
if self.operator.configured():
self.operator._setupOutputs()
def _setupOutputs(self):
"""
"""
self._changed()
def _connectSubSlot(self, slot, notify=True):
"""Connect a subslot either to the partner, or set the correct
value in case of self._value != None
"""
if type(slot) is int:
index = slot
slot = self._subSlots[slot]
else:
index = self._subSlots.index(slot)
if self.partner is not None:
if self.partner.level == self.level:
if len(self.partner) > index:
slot.connect(self.partner[index])
else:
slot.connect(self.partner)
if self._value is not None:
slot.setValue(self._value, notify=notify)
def _insertNew(self, position):
"""Construct a new subSlot of correct type and level and
insert it to the list of subslots
"""
assert position >= 0 and position <= len(self._subSlots)
slot = self._getInstance(self, level=self.level - 1)
self._subSlots.insert(position, slot)
slot.name = self.name
if self._value is not None:
slot.setValue(self._value)
return slot
def pop(self, index=-1, event=None):
if index < 0:
index = len(self) + index
self._subSlots.pop(index)
def propagateDirty(self, slot, subindex, roi):
"""Slots with level > 0 must implement part of the operator
interface so they look like an operator as far as their
subslots are concerned. That's why this function is here.
"""
totalIndex = (self._subSlots.index(slot),) + subindex
self.operator.propagateDirty(self, totalIndex, roi)
######################################
# methods aimed to enhance usability #
######################################
def setShapeAtAxisTo(self, axis, size):
tmpshape = list(self.meta.shape)
tmpshape[self.meta.axistags.index(axis)] = size
self.meta.shape = tuple(tmpshape)
def __str__(self):
mslot_info = ""
if self.level > 0 or isinstance(self.operator, Slot):
mslot_info += "["
if isinstance(self.operator, Slot):
if self in self.operator._subSlots:
mslot_info += " index={}".format( self.operator.index(self) )
else:
mslot_info += " index=NOTFOUND"
if self.level > 0:
mslot_info += " len={}".format( len(self) )
if self.level > 1:
mslot_info += " level={}".format( self.level )
mslot_info += " ] "
return '{}.{} {}: \t{}\n'.format( self.getRealOperator().name, self.name, mslot_info, self.meta )
def __repr__(self):
return self.__str__()
class InputSlot(Slot):
"""The base class for input slots, it provides methods to connect
the InputSlot to an OutputSlot of another operator (i.e.
.connect(partner) call) or allows to directly provide a value as
input (i.e. .setValue(value) call)
"""
def __init__(self, *args, **kwargs):
super(InputSlot, self).__init__(*args, **kwargs)
self._type = "input"
# configure operator in case of slot change
self.notifyResized(self._configureOperator)
class OutputSlot(Slot):
"""The base class for output slots, it provides methods to connect
the OutputSlot to an InputSlot of another operator (i.e.
.connect(partner) call).
the content of the OutputSlot e.g. the result of the operator it
belongs to can be requested with the usual python array slicing
syntax, i.e.
outputslot[3,:,14:32]
This call returns an GetItemRequestObject.
"""
def __init__(self, *args, **kwargs):
super(OutputSlot, self).__init__(*args, **kwargs)
self._type = "output"
assert 'optional' not in kwargs, '"optional" init arg cannot be used with OutputSlot'
def execute(self, slot, subindex, roi, result):
"""For now, OutputSlots with level > 0 must pretend to be
operators. That's why this function is here.
"""
totalIndex = (self._subSlots.index(slot),) + subindex
return self.operator.execute(self, totalIndex, roi, result)