Coverage for ion/core/process/process : 83.51%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/core/process/process.py @author Michael Meisinger @brief base classes for processes within a capability container """
# despite being from services.dm this is safe - events should be moved to core sometime soon! @TODO
# Conversation Context object - used to manage workbench GC and identity...
# @todo CHANGE: Dict of "name" to process (service) declaration
# @todo CHANGE: Static store (kvs) to register process instances with names
""" Interface for all capability container application processes """
""" An exception class for errors that occur in Process """
# @todo do NOT fill the process namespace with response codes; remove mixin! """ This is the base class for all processes. Processes can be spawned and have a unique identifier. Each process has one main process receiver and can define additional receivers as needed. This base class provides a lot of mechanics for processes, such as sending and receiving messages, RPC style calls, spawning and terminating child processes. Subclasses may use the plc-* process life cycle events. """
""" Define some constants used in messaging: """
""" Only used by process.py for uncaught exceptions. Do not catch generic exceptions in an ION service - only those which are expected. The Error status and reply error are intended only to deal with fatal - unexpected errors. """ """ Generic OK message added """
""" Initialize process using an optional receiver and optional spawn args @param receiver instance of a Receiver for process control (unused) @param spawnargs standard and additional spawn arguments """
# An Id with the process ID (fully qualified)
# Name (human readable label) of this process.
# The system unique name; propagates from root supv to all child procs
# An Id with the process ID of the parent (supervisor) process
# Name (human readable label) of this process.
# Set the container
# Ignore supplied receiver for consistency purposes # Create main receiver; used for incoming process interactions label=self.proc_name, name=self.id.full, group=self.proc_group, process=self, handler=self.receive, error_handler=self.receive_error)
# Create a backend receiver for outgoing RPC process interactions. # Needed to avoid deadlock when processing incoming messages # because only one message can be consumed before ACK. label=self.proc_name, name=self.backend_id.full, group=self.proc_group, process=self, handler=self.receive, error_handler=self.receive_error)
# Dict of all receivers of this process. Key is the name
# Delegate class to manage all conversations of this process
# Dict of message publishers and subscribers
# List of ProcessDesc instances of defined and spawned child processes
#The data object Workbench for all object repositories used by this process
# Create a message Client
# A list of other kinds of life cycle objects which are tied to the process
# TCP Connectors and Listening Ports
# publisher for lifecycle change notifications
# Callbacks before and after ops (handy for unittests)
# Context default dictionary
self.proc_name, self.id, self.proc_supid, self.sys_name))
""" Register a callback for before this operation is invoked. Supports "op_thing" and "thing" syntax. """ self.op_cbs_before.setdefault(self._sanitize_opname(opname), []).append(cb)
""" Register a callback for after this operation is invoked. Supports "op_thing" and "thing" syntax. """
self.op_cbs_before.setdefault(self._sanitize_opname(opname), []).remove(cb)
""" Get a deferred that will callback when the given op completes the next time (once). """
# --- Life cycle management # Categories: # op_XXX Message incoming interface # spawn, init: Boilerplate API # initialize, activate, deactivate, terminate: (Super class) State management API # on_XXX: State management API action callbacks # plc_XXX: Callback hooks for subclass processes
""" A debug method - used in the shell to print the foot print of the workbench """ return "Cached Structure Elements - %d, Cached Repositories - %d, Working Repositories - %d, Memory - %d kb" % \ (len(self.workbench._workbench_cache), len(self.workbench._repo_cache),len(self.workbench._repos), self.workbench._repo_cache.total_size/1000)
def spawn(self): """ Manually (instead of through the container) spawns this process and activate it in the same call. Spawn can only be called once. Equivalent to calling initialize() and activate() @retval Deferred for the Id of the process (self.id) """
def on_initialize(self, *args, **kwargs): """ LifeCycleObject callback for the initialization "spawn" of the process. @retval Deferred for the Id of the process (self.id) """
# Create queue only for process receiver
# Create queue and consumer for backend receiver
# advance the registered objects as if this process has already transitioned to the READY state, # which it will do after this on_initialize method completes
# get length of all registered lcos
# Callback to subclasses #import pdb; pdb.set_trace() except Exception, ex: log.exception('----- Process %s INIT ERROR -----' % (self.id)) raise ex
# publish initialize -> ready transition
""" Process life cycle event: on initialization of process (once) """
def op_activate(self, content, headers, msg): """ Activate operation, on receive of the activate system message @note PROBLEM: Cannot receive activate if receiver not active. Activation has to go through the container (agent) or backend. """ try: yield self.activate(content, headers, msg) if msg != None: yield self.reply_ok(msg) except Exception, ex: if msg != None: yield self.reply_uncaught_err(msg, content=None, exception=ex, response_code="Process %s ACTIVATE ERROR" % (self.id))
def on_activate(self, *args, **kwargs): """ LifeCycleObject callback for activate @retval Deferred """
# Create consumer for process receiver
# advance the registered objects as if this process has already transitioned to the ACTIVE state, # which it will do after this on_activate method completes
# get length of all registered lcos
# Callback to subclasses except Exception, ex: log.exception('----- Process %s ACTIVATE ERROR -----' % (self.id)) raise ex
# publish ready -> active transition
# last step in activation - cleanup!
""" Process life cycle event: on activate of process. Subclasses override. """
def op_terminate(self, content, headers, msg): """ Shutdown operation, on receive of the init message """ except Exception, ex:
log.error('Error during op_terminate: ' + str(ex)) raise ProcessError("Process %s TERMINATE ERROR" % (self.id)) ### Let the mesg dispatcher catch the error #if msg != None: # yield self.reply_err(msg, content=None, exception=ex, response_code = "Process %s TERMINATE ERROR" % (self.id))
def on_terminate_active(self, *args, **kwargs): # This is temporary while there is no deactivate for a process (don't # want to do this right now).
else:
# @todo There should be nothing after the terminate call
""" @retval Deferred """ # publish active -> terminate transition
# Clean up all TCP connections and listening ports # XXX What is the best way to unit test this? connector.disconnect() yield port.stopListening()
# advance the registered objects as if this process has already transitioned to the TERMINATED state, # which it will do after this on_terminate method completes
# Terminate all child processes
""" Process life cycle event: on termination of process (once) """
# publish whatever -> error transition - hopefully this works!
# attempt to move all registered lcos to the terminated state cleanly! except Exception: log.debug("Error terminating registered LCOs, ignoring...")
pass else: raise RuntimeError("Illegal process state change")
# --- Non-lifecycle process interactions
def op_ping(self, content, headers, msg): """ Service operation: ping reply """ yield self.reply_ok(msg, {'pong':'pong'}, {'quiet':True})
# @defer.inlineCallbacks """ Called when a child process has exited without being terminated. A supervisor may process this event and restart the child. """ pass
# --- Process resource management
""" Adds a publisher to this process.
@param publisher A Publisher instance, preferably tied to this process. """ self.publishers[publisher.name] = publisher
""" Adds a subscriber to this process, preferably tied to this process. """ self.subscribers[subscriber.name] = subscriber
connector = reactor.connectTCP(host, port, factory, timeout, bindAddress) self.connectors.append(connector) return connector
port = reactor.listenTCP(port, factory, backlog, interface) self.listeners.append(port) return port
""" Add a life cycle object to the process during init. This method can only be used during init to add lco's which are also in the INIT state. """ 'Can not add an instance that does not inherit from BasicLifecycleObject'
def register_life_cycle_object(self, lco): """ Register a life cycle object with the process and automatically advance its state to the current state of the process. """ 'Can not register an instance that does not inherit from BasicLifecycleObject'
""" Helper method to move all registered lifecycle objects in this process to the state of either the process or the state passed into this method.
You can explicitly define the state you want to transition everything to. This is used by on_initialize and friends to advance LCOs with a state that is just about to occur. """
def helper(idx, lco): """ This inline helper methods takes an index and LCO and advances that LCO to match the state in curstate. It has an inlineCallbacks decorator to give back a deferred when called so we can wrap that in a deferredList to operate on all LCOs in "parallel" - aka not cause an error in one's transitioning to stop transitioning LCOs that happen to be later in the registered list. """
# @TODO: should not be catching this exception. # This should cause the deferred gen'd by inlineCallbacks to errback, which then gets wrapped # nicely by the deferred list. It should not throw an exception in the state object?!?
# build deferred list out of calling helper method on all registered LCOs
########################################################################## # --- Incoming message handling
def receive(self, payload, msg): """ This is the first and MAIN entry point for received messages. Using the conv-id header, an ongoing conversation is retrieved and checked for a blocking condition (eg. RPC reply). Messages are separated whether blocking or not. """ # Establish security context for request processing # Check if there is a user id in the header, stash if so else: log.debug('[%s] receive(): payload anonymous request' % (self.proc_name)) if self.context.get('user_id', 'Not set') == 'Not set': self.context.user_id = 'ANONYMOUS' _action = 'set ANONYMOUS user_id' else: _action = "keep stashed user_id='%s'" % self.context.get('user_id')
# User session expiry. else: if self.context.get('expiry', 'Not set') == 'Not set': self.context.expiry = '0' _action = _action + '/set 0 expiry' else: _action = _action + "/keep stashed expiry='%s'" % self.context.get('expiry')
self.proc_name, _pre_uid, _pre_exp, _action, _post_uid, _post_exp))
# Extract some headers and make log statement. self.proc_name, fromname))
#CONVID is already added to the process.request
# Conversation handling. # Compose in memory message object for callbacks performative=payload.get('performative','request'), operation=payload.get('op',None), headers=payload, content=payload.get('content',None), process=self, msg=msg, conversation=None)
# Retrieve ongoing Conversation instance by conv-id header. # If not existing, create new instance. # @todo How do we find out we are the participant or initiator role? message, initiator=initiator)
# Check some state conditions if payload.get('op',None) == 'terminate': yield self.reply_ok(msg) defer.returnValue()
text = "[%s] Process TERMINATED. Message refused!" % (self.proc_name) log.error(text) raise ProcessError(text)
text = "[%s] Process in ERROR state. Message refused!" % (self.proc_name) log.error(text) raise ProcessError(text)
# Detect and handle request to terminate process. # This does not yet work properly with fail fast... yield self.terminate() defer.returnValue()
# Non-request messages (e.g. RPC reply are OK before ACTIVE) text = "[%s] Process in invalid state: '%s'" % (self.proc_name, self._get_state()) log.error(text)
# @todo: Requeue would be ok, but does not work (Rabbit or client limitation) #d = msg.requeue()
# Let the error back handle the exception raise ProcessError(text)
# Regular message handling in expected state # pu.log_message(msg)
# Delegate further message processing to conversation specific # implementation. Trigger conversation FSM. # @see ion.interact.rpc, ion.interact.request
# Problem: what if FSM produces an error (it does not currently) # Problem: log message here after reply/failure have been sent
# Case of one-off messages # pu.log_message(msg)
else: # Legacy case of no conv-id set or one-off messages (events?) log.warn("No conversation id in message") # pu.log_message(msg) res = yield self._dispatch_message_op(payload, msg, None)
except ApplicationError, ex: # In case of an application error - do not terminate the process! if log.getEffectiveLevel() <= logging.INFO: # only output all this stuff when debugging log.exception("*****Non Conversation Application error in message processing*****") log.error('*** Message Payload which cause the error: \n%s' % pu.pprint_to_string(payload)) log.error('*** Message Content: \n%s' % str(payload.get('content', '## No Content! ##'))) log.error("*****End Non Conversation Application error in message processing*****")
# @todo Should we send an err or rather reject the msg? # @note We can only send a reply_err to an RPC if msg and msg.payload['reply-to'] and msg.payload.get('performative',None)=='request': yield self.reply_err(msg, exception = ex)
except Exception, ex: # *** PROBLEM. Here the conversation is in ERROR state
log.exception("*****Non Conversation Application error in message processing*****") log.error('*** Message Payload which cause the error: \n%s' % pu.pprint_to_string(payload)) if log.getEffectiveLevel() <= logging.WARN: log.error('*** Message Content: \n%s' % str(payload.get('content', '## No Content! ##'))) log.error("*****End Non Conversation Application error in message processing*****")
# @todo Should we send an err or rather reject the msg? # @note We can only send a reply_err to an RPC if msg and msg.payload['reply-to'] and msg.payload.get('performative',None)=='request': yield self.reply_err(msg, exception = ex)
#@Todo How do we know if the message was ack'ed here? # The supervisor will also call shutdown child procs. This causes a recursive error when using fail fast! #if CF_fail_fast: # yield self.terminate() # Send exit message to supervisor finally: # @todo This is late here (potentially after a reply_err before) # Only if msg has not been ack/reject/requeued before
else: log.error("Invalid message. No 'op' in header", payload)
def _dispatch_message_call(self, payload, msg, conv, opname): """ Dispatch of messages to handler callback functions within this Process instance. If handler is not present, use op_none. @retval Deferred """ # dynamically invoke the operation in the given class
cb(cb_opname, content, payload, msg)
else: # Change to Raise? assert False, "Cannot dispatch to operation"
""" The method called if operation callback handler is not existing """
# --- Standard conversation type support: RPC, Request
""" @brief Starts a simple RPC style conversation. """
self.proc_name, rpc_conv.protocol, recv))
content=content, headers=headers, conv=rpc_conv, **kwargs)
""" @brief Sends a request message to the recipient. Instantiates the FIPA request interaction pattern. The recipient needs to responde with either "refuse" or "agree". In case of "agree", a second "failure", "inform-done", "inform-result" message must follow. This function checks for timeouts. Synchronous call. @exception Various exceptions for different failures """
self.proc_name, req_conv.protocol, receiver))
content=content, headers=headers, conv=req_conv, **kwargs)
""" @brief Sends a request message to the recipient. Asynchronous call. """ pass
# --- Outgoing message handling
""" @brief Sends a message and waits for conversation message reply. @retval a Deferred with the message value on receipt """
# Create a new deferred that the caller can yield on to wait for RPC # Timeout handling
# Remove RPC. Delayed result will go to catch operation
# Call to send() operation=operation, headers=headers, content=content, conv=conv) # d is a deferred. The actual send of the request message will happen # after this method returns. This is OK, because functions are chained # to call back the caller on the rpc_deferred when the receipt is done.
""" @brief Send a message via the process receiver to destination. Starts a new conversation. @retval Deferred for send of message """
#log.debug("****SEND, headers %s" % str(msgheaders)) #log.debug("Send conversation %r from %r" % (conv, self.conv_manager.conversations)) else: # Not a new and not a reply to a conversation #conv = self.conv_manager.new_conversation(GenericType.CONV_TYPE_GENERIC) #msgheaders['conv-id'] = conv.conv_id # One-off send message. Do not create a conversation instance.
else: log.debug('[%s] send(): using user id from msgheaders [%s]' % (self.proc_name, msgheaders['user-id'])) else: log.debug('[%s] send(): using expiry from msgheaders [%s]' % (self.proc_name, msgheaders['expiry']))
# Now allow headers to override any of the precomputed headers
# Assemble message content
# Assemble the standard in-memory message object content=content, headers=msgheaders, performative=msgheaders.get('performative','request'), process=self, conversation=conv)
# Put the message through the conversation FSM
# PROBLEM: FSM does not raise any exception.
# FSM processed successfully # The default case is to send out via the backend receiver else: # Use given receiver (e.g. primary receiver for RPC replies)
except Exception, ex: log.exception("ERROR [%s] send() in FSM - Message not sent" % self.proc_name) raise ex
""" @brief Replies to a given message, continuing the ongoing conversation @retval Deferred for message send """ # The causing message we reply to
# Assemble the reply message
log.error('No reply-to given for message '+str(msg)) else:
elif 'performative' not in msgheaders: msgheaders['performative'] = 'inform_result'
# Now allow headers to override any of the precomputed headers
operation = self.MSG_RESULT
operation=operation, headers=msgheaders, content=content, send_receiver=self.receiver, quiet=quiet)
""" @brief Internal boilerplate method that replies to a given message with an agree performative @retval Deferred for send of reply """ performative='agree', operation=self.MSG_RESULT, headers=headers, content=content, quiet=True)
""" @brief Internal boilerplate method that replies to a given message with a refuse performative @retval Deferred for send of reply """ return self.reply(msg, performative='refuse', operation=self.MSG_RESULT, headers=headers, content=content)
""" @brief Internal boilerplate method that replies to a given message with an inform-result performative @retval Deferred for send of reply """ performative='inform_result', operation=self.MSG_RESULT, headers=headers, content=content, quiet=(content is None))
""" @brief Internal boilerplate method that replies to a given message with a failure performative. Will result at the receiver end in an application level error. The result can include content, a caught exception and an application level error_code as an indication of the error. @content Message content object @exception an instance of Exception @response_code an ION application level defined error code for a handled exception @retval Deferred for send of reply """
else:
msgheaders.update(headers)
performative='failure', operation=self.MSG_RESULT, headers=msgheaders, content=content)
""" This is the entry point for handling messaging errors. As appropriate, this method will attempt to respond with a meaningful error code to the sender. """
convid = headers.get('conv-id', None) return self.conv_manager.get_conversation(convid)
# --- Process and child process management
# Proposed modificaiton to get_scoped_name to only add the scope if needed? else:
# OTP style functions for working with processes and modules/apps
""" Spawns a process described by the ProcessDesc instance as child of this process instance. An init message is sent depending on flag. @param childproc ProcessDesc instance with attributes @param init flag determining whether an init message should be sent @retval process id of the child process """
pass
pass
def shutdown_child_procs(self):
""" @retval the ProcessDesc instance of a child process by name """
""" @retval the process id a child process by name """
assert invocation.path == Invocation.PATH_IN defer.maybeDeferred(self.before, invocation) return invocation
# ============================================================================
""" This is the base class for a process client. A process client is code that executes in the process space of a calling process. If no calling process is given, a local one is created on the fly. """ """ Initializes a process client base @param proc a IProcess instance as originator of messages """
def _check_init(self): """ Called in client methods to ensure that there exists a spawned process to send messages from """
def attach(self): yield self._check_init()
""" This specific derivation adds some glue to interact with a specific targer process. """ """ Initializes a process client @param proc a IProcess instance as originator of messages @param target global scoped (process id or name) to send to @param targetname system scoped exchange name to send messages to """
""" Sends an RPC message to the specified target via originator process """
""" Sends an RPC message to the specified target via originator process """
# Validate expiry value
except ValueError, ex: assert False, 'Expiry must be string representation of int time value'
""" Sends a message to the specified target via originator process """ return self.proc.send(self.target, *args, **kwargs)
""" Replies to a message via the originator process """ return self.proc.reply(*args, **kwargs)
# ============================================================================
""" Class that encapsulates attributes about a spawnable process; can spawn and init processes. """ """ Initializes ProcessDesc instance with process attributes. Also acts as a weak proxy object for use by the parent process. @param name name label of process @param module module name of process module @param class or procclass is class name in process module (optional) @param node ID of container to spawn process on (optional) @param spawnargs dict of additional spawn arguments (optional) """
# Life cycle
""" Boilerplate for initialize() @param parent the process instance that should be set as supervisor """ #log.info('Spawning name=%s on node=%s' % # (self.proc_name, self.proc_node))
""" Spawns this process description with the initialized attributes. @retval Deferred -> Id with process id """ # Note: If this fails, an exception will occur and be passed through procdesc=self, parent=self.sup_process, node=self.proc_node, activate=activate)
def on_activate(self, *args, **kwargs): """ @retval Deferred """ else: headers = yield self.container.activate_process(parent=self.sup_process, pid=self.proc_id)
def on_terminate(self, *args, **kwargs): """ @retval Deferred """
pass else:
""" """
""" This protocol factory returns receiver instances used to spawn processes from a module. This implementation creates process class instances together with the receiver. This is a standard implementation that can be used in the code of every module containing a process. This factory also collects process declarations alongside. """
raise RuntimeError("Class does not implement IProcess")
# Collecting the declare static class variable in a process class raise RuntimeError('Process already declared: '+str(procname))
""" Factory method to return a process instance from given arguments. """
#log.debug("ProcessFactory.build(name=%s, args=%s)" % (self.name,spawnargs))
# Create a process receiver
# Instantiate the IProcess class
""" Instantiator for IProcess instances. Relies on an IProcessFactory to instantiate IProcess instances from modules. """
else: # Purely for tests to avoid a _start_container() in setUp() containerid = "TEST-CONTAINER-ID"
""" @brief Factory method to spawn a Process instance from a Python module. By default, spawn includes an activate @param module A module (<type 'module'>) with a ProcessFactory factory @param space MessageSpace instance @param spawnargs argument dict given to the factory on spawn @retval Deferred which fires with the IProcess instance """
raise RuntimeError("Must define factory in process module to spawn")
raise RuntimeError("Process model factory must provide IProcessFactory")
raise RuntimeError("ProcessFactory returned non-IProcess instance")
# Give a callback to the process to initialize and activate (if desired) except Exception, ex: log.exception("Error spawning process from module") raise ex
# Spawn of the process using the module name |