Coverage for ion/util/task_chain : 80.49%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/util/task_chain.py @author Dave Foster <dfoster@asascience.com> @brief TaskChain class for sequential execution of callables (deferreds and non-deferreds) """
# Disabling MutableSequence for 2.5 compat -> deriving from list for now. # search for MUTABLESEQUENCE to see what needs to be uncommented/fixed. #class TaskChain(MutableSequence): """ Used to set up a chain of tasks that run one after another.
A task chain can be used to script a sequence of actions and have the Twisted reactor manage it all. The run method returns a deferred that is called back when all tasks in the chain complete. If any task errors, the chain is aborted and the errback is raised. The tasks are executed in order.
The tasks should be callables that can either return deferreds or execute synchronously (and TaskChain will wrap them in deferreds). If any of them error, the chain is aborted and the errback is raised.
A TaskChain is a list of either callables or tuples of 2 or 3 length, that consist of a callable, a list of arguments to be passed, and an optional dict of keyword arguments to be passed.
Due to not having MutableSequence available in Python 2.5, type checking is done only at time of execution. """
""" Constructor.
@param tasks Takes a list of tasks that will be executed in order. """ #self._list = [] # implementation backend for MutableSequence methods to use # MUTABLESEQUENCE
""" Returns a string representation of a TaskChain and its status. """
""" Internal safety mechanism that append, insert, and extend all flow through. It makes sure the types being added to the list are expected. """ raise ValueError("First item of tuple not a callable: (callback, list of args, optional dict of kwargs) expected.") else:
# MUTABLESEQUENCE #def __getitem__(self, index): # return self._list.__getitem__(index)
#def __setitem__(self, index, value): # self._check_type(value) # self._list.__setitem(index, value)
#def __delitem__(self, index): # self._list.__delitem__(index)
#def insert(self, index, value): # self._check_type(value) # self._list.insert(index, value)
#def __len__(self): # return len(self._list) # END MUTABLESEQUENCE
""" Starts running the chain of tasks.
@returns A deferred which will callback when the tasks complete. """
""" Runs the next task. """
# if we have no more tasks to run, or we shouldn't be running anymore, # fire our callback
# make sure this is legit - we have no way of checking on insert right now due to not being a MutableSequence
# possibly not a deferred at all!
""" Callback on single task success. """
""" Errback on single failure. """ failure.trap(StandardError) failure.printBriefTraceback()
log.debug(self.__str__() + ":task ERROR")
self._results.append(failure.value) self._fire(False)
""" Calls the task chain's callback or errback as per the success parameter. This method builds the task/result list to pass back through either mechanism. """
# we're no longer running, indicate as such
else: self._deferred.errback(StandardError(res))
""" Shuts down the current chain of tasks. The current executing task will have its cancel method called on its deferred. It is the responsibility of the deferred creator to set up the canceller argument when the deferred is constructed. """
log.debug(self.__str__() + ":close")
if not self._running: # someone could call close after we've already fired, so don't fire again if not self._deferred.called: self._fire(True) return self._deferred
self._running = False
if self._curtask_def: self._curtask_def.cancel()
return self._deferred
|