Source code for lazyflow.utility.bigRequestStreamer

###############################################################################
#   lazyflow: data flow based lazy parallel computation framework
#
#       Copyright (C) 2011-2014, the ilastik developers
#                                <team@ilastik.org>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the Lesser GNU General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# See the files LICENSE.lgpl2 and LICENSE.lgpl3 for full text of the
# GNU Lesser General Public License version 2.1 and 3 respectively.
# This information is also available on the ilastik web site at:
#		   http://ilastik.org/license/
###############################################################################
import numpy
from lazyflow.request import Request
from lazyflow.utility import RoiRequestBatch
from lazyflow.roi import getIntersectingBlocks, getBlockBounds, getIntersection, determine_optimal_request_blockshape, determineBlockShape

import logging
import warnings
from memory import Memory
logger = logging.getLogger(__name__)

[docs]class BigRequestStreamer(object): """ Execute a big request by breaking it up into smaller requests. This class encapsulates the logic for dividing big rois into smaller ones to be executed separately. It relies on a :py:class:`RoiRequestBatch<lazyflow.utility.roiRequestBatch.RoiRequestBatch>` object, which is responsible for creating and scheduling the request for each roi. Example: >>> import sys >>> import vigra >>> from lazyflow.graph import Graph >>> from lazyflow.operators.operators import OpArrayCache >>> # Example data >>> data = numpy.indices( (100,100) ).sum(0) >>> data = vigra.taggedView( data, vigra.defaultAxistags('xy') ) >>> op = OpArrayCache( graph=Graph() ) >>> op.Input.setValue( data ) >>> total_roi = [(25, 65), (45, 95)] >>> # Init with our output slot and roi to request. >>> # batchSize indicates the number of requests to spawn in parallel. >>> streamer = BigRequestStreamer( op.Output, total_roi, (10,10), batchSize=2, blockAlignment='relative' ) >>> # Use a callback to handle sub-results one at a time. >>> result_count = [0] >>> result_total_sum = [0] >>> def handle_block_result(roi, result): ... # No need for locking here if allowParallelResults=True. ... result_count[0] += 1 ... result_total_sum[0] += result.sum() >>> streamer.resultSignal.subscribe( handle_block_result ) >>> # Optional: Subscribe to progress updates >>> def handle_progress(progress): ... if progress == 0: ... sys.stdout.write("Progress: ") ... sys.stdout.write( "{} ".format( progress ) ) >>> streamer.progressSignal.subscribe( handle_progress ) >>> # Execute the batch of requests, and block for the result. >>> streamer.execute() Progress: 0 16 33 50 66 83 100 100 >>> print "Processed {} result blocks with a total sum of: {}".format( result_count[0], result_total_sum[0] ) Processed 6 result blocks with a total sum of: 68400 """
[docs] def __init__(self, outputSlot, roi, blockshape=None, batchSize=None, blockAlignment='absolute', allowParallelResults=False): """ Constructor. :param outputSlot: The slot to request data from. :param roi: The roi `(start, stop)` of interest. Will be broken up and requested via smaller requests. :param blockshape: The amount of data to request in each request. If omitted, a default blockshape is chosen by inspecting the metadata of the given slot. :param batchSize: The maximum number of requests to launch in parallel. This should not be necessary if the blockshape is small enough that you won't run out of RAM. :param blockAlignment: Determines how block the requests. Choices are 'absolute' or 'relative'. :param allowParallelResults: If False, The resultSignal will not be called in parallel. In that case, your handler function has no need for locks. """ self._outputSlot = outputSlot self._bigRoi = roi self._num_threads = max(1, Request.global_thread_pool.num_workers) totalVolume = numpy.prod( numpy.subtract(roi[1], roi[0]) ) if batchSize is None: batchSize = self._num_threads if blockshape is None: blockshape = self._determine_blockshape(outputSlot) assert blockAlignment in ['relative', 'absolute'] if blockAlignment == 'relative': # Align the blocking with the start of the roi offsetRoi = ([0] * len(roi[0]), numpy.subtract(roi[1], roi[0])) block_starts = getIntersectingBlocks(blockshape, offsetRoi) block_starts += roi[0] # Un-offset # For now, simply iterate over the min blocks # TODO: Auto-dialate block sizes based on CPU/RAM usage. def roiGen(): block_iter = block_starts.__iter__() while True: block_start = block_iter.next() # Use offset blocking offset_block_start = block_start - self._bigRoi[0] offset_data_shape = numpy.subtract(self._bigRoi[1], self._bigRoi[0]) offset_block_bounds = getBlockBounds( offset_data_shape, blockshape, offset_block_start ) # Un-offset block_bounds = ( offset_block_bounds[0] + self._bigRoi[0], offset_block_bounds[1] + self._bigRoi[0] ) logger.debug( "Requesting Roi: {}".format( block_bounds ) ) yield block_bounds else: # Absolute blocking. # Blocks are simply relative to (0,0,0,...) # But we still clip the requests to the overall roi bounds. block_starts = getIntersectingBlocks(blockshape, roi) def roiGen(): block_iter = block_starts.__iter__() while True: block_start = block_iter.next() block_bounds = getBlockBounds( outputSlot.meta.shape, blockshape, block_start ) block_intersecting_portion = getIntersection( block_bounds, roi ) logger.debug( "Requesting Roi: {}".format( block_bounds ) ) yield block_intersecting_portion self._requestBatch = RoiRequestBatch( self._outputSlot, roiGen(), totalVolume, batchSize, allowParallelResults )
def _determine_blockshape(self, outputSlot): """ Choose a blockshape using the slot metadata (if available) or an arbitrary guess otherwise. """ input_shape = outputSlot.meta.shape ideal_blockshape = outputSlot.meta.ideal_blockshape ram_usage_per_requested_pixel = outputSlot.meta.ram_usage_per_requested_pixel max_blockshape = outputSlot.meta.max_blockshape or input_shape num_channels = 1 tagged_shape = outputSlot.meta.getTaggedShape() # Generally, we don't want to split requests across channels. if 'c' in tagged_shape.keys(): num_channels = tagged_shape['c'] channel_index = tagged_shape.keys().index('c') input_shape = input_shape[:channel_index] + input_shape[channel_index+1:] max_blockshape = max_blockshape[:channel_index] + max_blockshape[channel_index+1:] if ideal_blockshape: # Never enlarge 'ideal' in the channel dimension. num_channels = ideal_blockshape[channel_index] ideal_blockshape = ideal_blockshape[:channel_index] + ideal_blockshape[channel_index+1:] available_ram = Memory.getAvailableRamComputation() if ram_usage_per_requested_pixel is None: # Make a conservative guess: 2*(bytes for dtype) * (num channels) + (fudge factor=4) ram_usage_per_requested_pixel = 2*outputSlot.meta.dtype().nbytes*num_channels + 4 warnings.warn( "Unknown per-pixel RAM requirement. Making a guess." ) # Safety factor (fudge factor): Double the estimated RAM usage per pixel safety_factor = 2.0 logger.info("Estimated RAM usage per pixel is {} * safety factor ({})" .format( Memory.format(ram_usage_per_requested_pixel), safety_factor ) ) ram_usage_per_requested_pixel *= safety_factor if ideal_blockshape is None: blockshape = determineBlockShape( input_shape, available_ram/(self._num_threads*ram_usage_per_requested_pixel) ) blockshape = tuple(numpy.minimum(max_blockshape, blockshape)) if 'c' in outputSlot.meta.getAxisKeys(): blockshape = blockshape[:channel_index] + (num_channels,) + blockshape[channel_index:] warnings.warn( "Chose an arbitrary request blockshape {}".format( blockshape ) ) else: logger.info("determining blockshape assuming available_ram is {}" ", split between {} threads" .format(Memory.format(available_ram), self._num_threads)) # By convention, ram_usage_per_requested_pixel refers to the ram used when requesting ALL channels of a 'pixel' # Therefore, we do not include the channel dimension in the blockshapes here. blockshape = determine_optimal_request_blockshape( max_blockshape, ideal_blockshape, ram_usage_per_requested_pixel, self._num_threads, available_ram ) if 'c' in outputSlot.meta.getAxisKeys(): blockshape = blockshape[:channel_index] + (num_channels,) + blockshape[channel_index:] logger.info( "Chose blockshape: {}".format( blockshape ) ) fmt = Memory.format(ram_usage_per_requested_pixel * numpy.prod(blockshape[:-1])) logger.info("Estimated RAM usage per block is {}".format(fmt)) return blockshape @property def resultSignal(self): """ Results signal. Signature: ``f(roi, result)``. Guaranteed not to be called from multiple threads in parallel. """ return self._requestBatch.resultSignal @property def progressSignal(self): """ Progress Signal Signature: ``f(progress_percent)`` """ return self._requestBatch.progressSignal
[docs] def execute(self): """ Request the data for the entire roi by breaking it up into many smaller requests, and wait for all of them to complete. A batch of N requests is launched, and subsequent requests are launched one-by-one as the earlier requests complete. Thus, there will be N requests executing in parallel at all times. This method returns ``None``. All results must be handled via the :py:obj:`resultSignal`. """ self._requestBatch.execute()
if __name__ == "__main__": import doctest doctest.testmod()