Coverage for ion/agents/instrumentagents/instrument_agent : 67.37%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/agents/instrumentagents/instrument_agent.py @author Steve Foley @author Edward Hunter @brief Instrument Agent and client classes. """
import BusinessStateModificationEventPublisher AgentConnectionState, AgentState, driver_client, \ DriverAnnouncement, InstErrorCode, DriverParameter, DriverChannel, \ ObservatoryState, DriverStatus, InstrumentCapability, DriverCapability, \ MetadataParameter, AgentCommand, Datatype, TimeSource, ConnectionMethod, \ AgentEvent, AgentStatus, ObservatoryCapability
""" Instrument agent observatory metadata. """
AgentParameter.EVENT_PUBLISHER_ORIGIN: {MetadataParameter.DATATYPE: Datatype.PUBSUB_ORIGIN, MetadataParameter.LAST_CHANGE_TIMESTAMP: (0, 0), MetadataParameter.FRIENDLY_NAME: 'Event Publisher Origin'}, AgentParameter.TIME_SOURCE: {MetadataParameter.DATATYPE: Datatype.ENUM, MetadataParameter.LAST_CHANGE_TIMESTAMP: (0, 0), MetadataParameter.VALID_VALUES: TimeSource, MetadataParameter.FRIENDLY_NAME: 'Time Source'}, AgentParameter.CONNECTION_METHOD: {MetadataParameter.DATATYPE: Datatype.ENUM, MetadataParameter.LAST_CHANGE_TIMESTAMP: (0, 0), MetadataParameter.VALID_VALUES: ConnectionMethod, MetadataParameter.FRIENDLY_NAME: 'Connection Method'}, AgentParameter.DEFAULT_EXP_TIMEOUT: {MetadataParameter.DATATYPE: Datatype.INT, MetadataParameter.LAST_CHANGE_TIMESTAMP: (0, 0), MetadataParameter.MINIMUM_VALUE: 0, MetadataParameter.UNITS: 'Seconds', MetadataParameter.FRIENDLY_NAME: \ 'Default Transaction Expire Timeout'}, AgentParameter.MAX_EXP_TIMEOUT: {MetadataParameter.DATATYPE: Datatype.INT, MetadataParameter.LAST_CHANGE_TIMESTAMP: (0, 0), MetadataParameter.MINIMUM_VALUE: 0, MetadataParameter.UNITS: 'Seconds', MetadataParameter.FRIENDLY_NAME: 'Max Transaction Expire Timeout'}, AgentParameter.MAX_ACQ_TIMEOUT: {MetadataParameter.DATATYPE: Datatype.INT, MetadataParameter.LAST_CHANGE_TIMESTAMP: (0, 0), MetadataParameter.MINIMUM_VALUE: 0, MetadataParameter.UNITS: 'Seconds', MetadataParameter.FRIENDLY_NAME: 'Max Transaction Acquire Timeout'}, }
""" A generic ion representation of an instrument as an ION resource. """
""" The software version of the instrument agent. """
def get_version(cls): """ Return the software version of the instrument agent. """
def plc_init(self): # Initialize base class.
# We need a yield in a inlineCallback.
""" The ID of the instrument this agent represents. """
""" Driver process and client descriptions. Parameter dictionaries used to launch driver processes, and dynamically construct driver client objects. """
""" The driver config dictionary. Default passed as a spawn arg. """
""" The driver process ID. """
""" List of old driver processes to be cleaned up. """
""" The driver client to communicate with the child driver. """
""" The PubSub origin for the event publisher that this instrument agent uses to distribute messages related to generic events that it handles. One queue sends all messages, each tagged with an event ID number and the "agent" keyword or channel name if applicable (delimited by "."). If "agent" keyword is used, the publication applies to the agent. If the channel is a "*", the event applies to the instrument as a whole or all channels on the instrument For example: 3003.chan1.machine_example_org_14491.357 @see ion/services/dm/distribution/events.py @see https://confluence.oceanobservatories.org/display/ syseng/CIAD+DM+SV+Notifications+and+Events """
""" The PubSub publisher for informational/log events. These include agent op errors, transaction events, driver state changes, driver and agent config changes. """ InfoLoggingEventPublisher(process=self, origin=self.event_publisher_origin)
""" The PubSub publisher for data events """ DataBlockEventPublisher(process=self, origin=self.event_publisher_origin)
""" The PubSub publisher for agent state change events. """ BusinessStateModificationEventPublisher(process=self, origin=self.event_publisher_origin)
""" The transducer of the last data received event. Used to publish left over buffer contents on end of a streaming session. """
""" A UUID specifying the current transaction. None indicates no current transaction. """
""" If a transaction expires during an op_ call, this flag is set so the transaction can be retired when finishing the call. It is handled there to keep the current operation protected until it completes. """
""" A twisted delayed function call that implements the transaction timeout. This object allows us to cancel the timeout when the transaction is ended before timeout. """
""" A queue of pending transactions. Start the top one on the list when the current transaction ends. """
""" An integer in seconds for the maximum allowable timeout to wait for a new transaction. """
""" An integer in seconds for the minimum time a transaction must be open. """
""" An integer in seconds for the default time a transaction may be open. """
""" An integer in seconds giving the maximum allowable time a transaction may be open. """
""" Upon transaction expire timeout, this flag indicates if the transaction can be immediately retired or should be flagged for retire upon completion of a protected operation. """
""" String indicating the source of time being used for the instrument. See time_sources list for available values. """
""" String describing how the device is connected to the observatory. See connection_methods list for available values. """
""" Buffer to hold instrument data for periodic transmission. """
""" The number of samples to keep in the data buffer before publicaiton. """
""" A dict of device capabilities that is read from the driver upon driver construction. The dict persists whether we are connected to the driver or not. """
""" List of current alarm conditions. Tuple of (ID, description). """
""" Dictionary of time status values. """ 'Uncertainty': None, 'Peers': None }
""" Agent state handlers """ AgentState.POWERED_DOWN: self.state_handler_powered_down, AgentState.UNINITIALIZED: self.state_handler_uninitialized, AgentState.INACTIVE: self.state_handler_inactive, AgentState.IDLE: self.state_handler_idle, AgentState.STOPPED: self.state_handler_stopped, AgentState.OBSERVATORY_MODE: self.state_handler_observatory_mode, AgentState.DIRECT_ACCESS_MODE: \ self.state_handler_direct_access_mode }
""" A finite state machine to track and manage agent state according to the general instrument state model. """ AgentEvent.ENTER, AgentEvent.EXIT)
# Set initial state.
########################################################################### # State handlers. ###########################################################################
def state_handler_powered_down(self, event, params): """ State handler for AgentState.POWERED_DOWN. This is a major state. TODO: Need to investigate use models of POWERED_DOWN. """
yield success = InstErrorCode.OK next_state = None result = None self._debug_print(self._fsm.get_current_state(), event)
if event == AgentEvent.ENTER: origin = 'agent.%s' % self.event_publisher_origin yield self._state_publisher.create_and_publish_event(origin=origin, description=AgentState.POWERED_DOWN) pass
elif event == AgentEvent.EXIT: pass
else: success = InstErrorCode.INCORRECT_STATE
defer.returnValue((success, next_state, result))
def state_handler_uninitialized(self, event, params): """ State handler for AgentState.UNINITIALIZED. Substate of major state AgentState.POWERED_UP. """
# Low level agent initialization beyond construction and plc. description=AgentState.UNINITIALIZED) pass
pass
# Initialize: start driver and client and switch to INACTIVE # if successful.
else: # Could not initialize error. Set error return value. success = InstErrorCode.AGENT_INIT_FAILED
pass
elif event == AgentEvent.RESET: next_state = AgentState.UNINITIALIZED
else: success = InstErrorCode.INCORRECT_STATE
def state_handler_inactive(self, event, params): """ State handler for AgentState.INACTIVE. Substate of major state AgentState.POWERED_UP. """
# Agent initialization beyond driver spawn. description=AgentState.INACTIVE)
# Read the device capabilities. [DriverCapability.DEVICE_ALL]) except: pass
else:
pass
else: success = InstErrorCode.AGENT_DEINIT_FAILED
next_state = AgentState.INACTIVE
# Attempt to configure driver.
# If successful, attempt to connect.
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, if successful switch state to IDLE. else:
else: success = InstErrorCode.INCORRECT_STATE
def state_handler_stopped(self, event, params): """ State handler for AgentState.STOPPED. Substate of major state AgentState.ACTIVE. """
yield success = InstErrorCode.OK next_state = None result = None self._debug_print(self._fsm.get_current_state(), event)
if event == AgentEvent.ENTER: # Save agent and driver running state. origin = 'agent.%s' % self.event_publisher_origin yield self._state_publisher.create_and_publish_event(origin=origin, description=AgentState.STOPPED) pass
elif event == AgentEvent.EXIT: pass
elif event == AgentEvent.CLEAR: next_state = AgentState.IDLE
elif event == AgentEvent.RESUME: # Restore agent and driver running state. next_state = AgentState.OBSERVATORY_MODE
elif event == AgentEvent.GO_INACTIVE: try: reply = yield self._driver_client.disconnect() success = reply['success']
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, if successful switch state to IDLE. else: if InstErrorCode.is_ok(success): next_state = AgentState.INACTIVE
elif event == AgentEvent.RESET: try: reply = yield self._driver_client.disconnect() success = reply['success']
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, shut down driver. else: if InstErrorCode.is_ok(success): self._condemn_driver()
# If successful, switch to UNINITIALIZED. if self._driver_pid == None: next_state = AgentState.UNINITIALIZED
# If unsuccessful, switch to inactive. else: success = InstErrorCode.AGENT_DEINIT_FAILED next_state = AgentState.INACTIVE
else: success = InstErrorCode.INCORRECT_STATE
defer.returnValue((success, next_state, result))
def state_handler_idle(self, event, params): """ State handler for AgentState.IDLE. Substate of major state AgentState.ACTIVE. """
# Clear agent and driver running state. description=AgentState.IDLE)
pass
try: reply = yield self._driver_client.disconnect() success = reply['success']
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, if successful switch state to IDLE. else: if InstErrorCode.is_ok(success): next_state = AgentState.INACTIVE
try: reply = yield self._driver_client.disconnect() success = reply['success']
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, shut down driver. else: if InstErrorCode.is_ok(success): self._condemn_driver()
# If successful, switch to UNINITIALIZED. if self._driver_pid == None: next_state = AgentState.UNINITIALIZED
# If unsuccessful, switch to inactive. else: success = InstErrorCode.AGENT_DEINIT_FAILED next_state = AgentState.INACTIVE
else: success = InstErrorCode.INCORRECT_STATE
def state_handler_observatory_mode(self, event, params): """ State handler for AgentState.OBSERVATORY_MODE. Substate of major state AgentState.ACTIVE.RUNNING. """
description=AgentState.OBSERVATORY_MODE) pass
pass
next_state = AgentState.IDLE
next_state = AgentState.STOPPED
next_state = AgentState.DIRECT_ACCESS_MODE
try: reply = yield self._driver_client.disconnect() success = reply['success']
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, if successful switch state to IDLE. else: if InstErrorCode.is_ok(success): next_state = AgentState.INACTIVE
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_DISCONNECT_FAILED
# Command returned, shut down driver. else: #yield pu.asleep(5) # If successful, switch to UNINITIALIZED.
# If unsuccessful, switch to inactive. else: success = InstErrorCode.AGENT_DEINIT_FAILED
next_state = AgentState.INACTIVE
else: success = InstErrorCode.INCORRECT_STATE
def state_handler_direct_access_mode(self, event, params): """ State handler for AgentState.DIRECT_ACCESS_MODE. Substate of major state AgentState.ACTIVE.RUNNING. """
yield success = InstErrorCode.OK next_state = None result = None self._debug_print(self._fsm.get_current_state(), event)
if event == AgentEvent.ENTER: origin = 'agent.%s' % self.event_publisher_origin yield self._state_publisher.create_and_publish_event(origin=origin, description=AgentState.DIRECT_ACCESS_MODE) pass
elif event == AgentEvent.EXIT: pass
elif event == AgentEvent.GO_OBSERVATORY_MODE: next_state = AgentState.OBSERVATORY_MODE
elif event == AgentEvent.GO_INACTIVE: try: reply = yield self._driver_client.disconnect() success = reply['success']
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, if successful switch state to IDLE. else: if InstErrorCode.is_ok(success): next_state = AgentState.INACTIVE
elif event == AgentEvent.RESET: try: reply = yield self._driver_client.disconnect() success = reply['success']
# Exception raised, reply error. except: success = InstErrorCode.DRIVER_CONNECT_FAILED
# Command returned, shut down driver. else: if InstErrorCode.is_ok(success): self._condemn_driver()
# If successful, switch to UNINITIALIZED. if self._driver_pid == None: next_state = AgentState.UNINITIALIZED
# If unsuccessful, switch to inactive. else: success = InstErrorCode.AGENT_DEINIT_FAILED next_state = AgentState.INACTIVE
else: success = InstErrorCode.INCORRECT_STATE
defer.returnValue((success, next_state, result))
########################################################################### # Transaction Management ###########################################################################
def op_start_transaction(self, content, headers, msg): """ Begin an exclusive transaction with the agent. @param content A dict with None or nonnegative integer values 'acq_timeout' and 'exp_timeout' for acquisition and expiration timeouts respectively. @retval A dict with 'success' success/fail string and 'transaction_id' transaction ID UUID string. """
(isinstance(acq_timeout, int) and acq_timeout >= 0)), \ 'Expected None or nonnegative int acquisition timeout' (isinstance(exp_timeout, int)) and exp_timeout >= 0), \ 'Expected None or nonnegative int expiration timeout'
exp_timeout, headers['sender'])
# Publish any errors. InstErrorCode.get_string(success) #origin = "agent.%s" % self.event_publisher_origin #yield self._log_publisher.create_and_publish_event(origin=origin, # description=desc_str)
else:
description=desc_str)
""" Begin an exclusive transaction with the agent. @param exp_timeout An integer in seconds giving the allowable time the transaction may be open. @retval A tuple containing (success/fail, transaction ID UUID string). """
(isinstance(exp_timeout, int)) and exp_timeout >= 0), \ 'Expected None or nonnegative int expiration timeout'
# Ensure the expiration timeout is in the valid range. exp_timeout = self._default_exp_timeout exp_timeout = self._max_exp_timeout exp_timeout = self._min_exp_timeout
# If the resource is free, issue a new transaction immediately.
# Create and queue up a transaction expiration callback. """ A callback to expire a transaction. Either retire the transaction directly (no protected call running), or set a flag for a protected call to do the cleanup when finishing. """
else:
transaction_expired)
# Otherwise return locked resource error. else:
""" @param acq_timeout An integer in seconds to wait to acquire a new transaction. @param exp_timeout An integer in seconds to allow the new transaction to remain open. @param requester A process ID for requester. @retval A deferred that will fire when the a new transaction has been constructed or timeout occurs. The deferred value is a tuple (success/fail, transaction_id). """
(isinstance(acq_timeout, int) and acq_timeout >= 0)), \ 'Expected None or nonnegative int acquisition timeout' (isinstance(exp_timeout, int)) and exp_timeout >= 0), \ 'Expected None or nonnegative int expiration timeout'
# Ensure the expiration timeout is in the valid range. exp_timeout = self._max_exp_timeout exp_timeout = self._min_exp_timeout
# Ensure the acquisition timeout is in the valid range. acq_timeout = self._max_acq_timeout
# If the resource is free, issue a new transaction immediately.
else:
# If resourse not free and no acquisition timeout, return # locked error immediately.
# If resource not free and there is a valid acquisition timeout, # add the deferred return to the list of pending transactions and # start the acquisition timeout.
acquisition_timeout)
exp_timeout, requester))
def op_end_transaction(self, content, headers, msg): """ End the current transaction. @param content A uuid specifying the current transaction to end. @retval success/fail message. """
# Publish an end transaction message...mainly as a test for now # yield self._log_publisher.create_and_publish_event(\ # name="Transaction ended!")
# Publish any errors. InstErrorCode.get_string(success) #origin = "agent.%s" % self.event_publisher_origin #yield self._log_publisher.create_and_publish_event(origin=origin, # description=desc_str)
else:
description=desc_str)
""" End the current transaction and start the next pending transaction if one is waiting. @param tid A uuid specifying the current transaction to end. @retval success/fail message. """
# Remove the current transaction.
# Reset expiration flag and cancel expiration timeout.
# If there is a pending transaction, issue a new transaction # and cancel the acquisition timeout. self._pending_transactions.pop(0)
# Return success.
# If there is no transaction to end, return not locked error.
# If the tid does not match the current trasaction, return # locked error. else: result['success'] = InstErrorCode.LOCKED_RESOURCE
def _verify_transaction(self, tid, optype): """ Verify the passed transaction ID is currently open, or open an implicit transaction. @param tid 'create' to create an implicit transaction, 'none' to perform the operation without a transaction, or a UUID to test against the current transaction ID. @param optype 'get' 'set' or 'execute' @retval True if the transaction is valid or if one was successfully created, False otherwise. """
success = InstErrorCode.INVALID_TRANSACTION_ID
# Try to start an implicit transaction if tid is 'create'
else:
# Allow only gets without a current or created transaction.
# Otherwise, the given ID must match the outstanding one
else:
# Publish any errors. InstErrorCode.get_string(success) description=desc_str)
########################################################################### # Observatory Facing Interface ###########################################################################
def op_hello(self, content, headers, msg):
# The following line shows how to reply to a message yield self.reply_ok(msg, {'value': 'Hello there, ' + str(content)}, {})
def op_execute_observatory(self, content, headers, msg): """ Execute infrastructure commands related to the Instrument Agent instance. This includes commands for messaging, resource management processes, etc. @param content A dict {'command': [command, arg, ,arg], 'transaction_id': transaction_id)} @retval ACK message containing a dict {'success': success, 'result': command-specific, 'transaction_id': transaction_id}. """
list or tuple.'
# Set up the transaction.
# TRANSITION command.
# Verify required parameter present. success = InstErrorCode.REQUIRED_PARAMETER
# Verify required parameter valid. success = InstErrorCode.INVALID_PARAM_VALUE
else:
# TRANSMIT DATA command. success = InstErrorCode.NOT_IMPLEMENTED
# SLEEP command. success = InstErrorCode.REQUIRED_PARAMETER
else: success = InstErrorCode.INVALID_PARAM_VALUE
success = InstErrorCode.INVALID_PARAM_VALUE else:
else: success = InstErrorCode.UNKNOWN_COMMAND
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else:
# Publish errors, clean up transaction. finally:
desc_str = 'Error in op_execute_observatory: ' + \ InstErrorCode.get_string(InstErrorCode.UNKNOWN_ERROR) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
desc_str = 'Error in op_execute_observatory: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
def op_get_observatory(self, content, headers, msg): """ Get data from the cyberinfrastructure side of the agent (registry info, topic locations, messaging parameters, process parameters, etc.) @param content A dict {'params': [param_arg, ,param_arg], 'transaction_id': transaction_id}. @retval A reply message containing a dict {'success': success, 'result': {param_arg: (success, val), ..., param_arg: (success, val)}, 'transaction_id': transaction_id) """ list or tuple.' # Set up the transaction
# Add each observatory parameter given in params list. result[arg] = (InstErrorCode.INVALID_PARAMETER, None) get_errors = True continue
arg == AgentParameter.ALL: result[AgentParameter.EVENT_PUBLISHER_ORIGIN] = \ (InstErrorCode.OK, None) else: (InstErrorCode.OK, self.event_publisher_origin)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._driver_desc)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._client_desc)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._driver_config)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._time_source)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._connection_method)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._max_acq_timeout)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._default_exp_timeout)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._max_exp_timeout)
arg == AgentParameter.ALL: (InstErrorCode.OK, self._data_buffer_limit)
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else: success = InstErrorCode.GET_OBSERVATORY_ERR
else:
# Publish errors, clean up transaction. finally:
# Publish any errors. desc_str = 'Error in op_get_observatory: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
# Transaction clean up. End implicit or expired transactions.
def op_set_observatory(self, content, headers, msg): """ Set parameters related to the infrastructure side of the agent (registration information, location, network addresses, etc.) @param content A dict {'params': {param_arg: val, ..., param_arg: val}, 'transaction_id': transaction_id}. @retval Reply message with dict {'success': success, 'result': {param_arg: success, ..., param_arg: success}, 'transaction_id': transaction_id}. """
# Set up the transaction
# Add each observatory parameter given in params list. # Note: it seems like all the current params should be read only by # general agent users.
if not isinstance(val, dict): # Better checking here. result[arg] = InstErrorCode.INVALID_PARAM_VALUE set_errors = True
else: self._driver_desc = val result[arg] = InstErrorCode.OK set_successes = True
if not isinstance(val, dict): # Better checking here. result[arg] = InstErrorCode.INVALID_PARAM_VALUE set_errors = True
else: self._client_desc = val result[arg] = InstErrorCode.OK set_successes = True
if not isinstance(val, dict): # Better checking here. result[arg] = InstErrorCode.INVALID_PARAM_VALUE set_errors = True
else: self._driver_config = val result[arg] = InstErrorCode.OK set_successes = True
# Logic here when new time source set. # And test for successful switch.
else: result[arg] = InstErrorCode.OK
else: set_errors = True result[arg] = InstErrorCode.INVALID_PARAM_VALUE
# Logic here when new connection method set. # And test for successful switch.
else: result[arg] = InstErrorCode.OK
else:
if isinstance(val, int) and val >= 0: self._max_acq_timeout = val result[arg] = InstErrorCode.OK set_successes = True
else: set_errors = True result[arg] = InstErrorCode.INVALID_PARAM_VALUE
and val <= self._max_exp_timeout:
else:
else: set_errors = True result[arg] = InstErrorCode.INVALID_PARAM_VALUE
elif arg == AgentParameter.BUFFER_SIZE: if isinstance(val, int) and val >= 0: self._data_buffer_limit = val result[arg] = InstErrorCode.OK set_successes = True
else: set_errors = True result[arg] = InstErrorCode.INVALID_PARAM_VALUE
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else:
else:
# Publish errors, clean up transaction. finally:
# Publish any errors. InstErrorCode.get_string(success) origin, description=desc_str)
# Publish the new agent configuration. # strval = self._get_data_string(config) origin, description=json_val)
def op_get_observatory_metadata(self, content, headers, msg): """ Retrieve metadata about the observatory configuration parameters. @param content A dict {'params': [(param_arg, meta_arg), ..., (param_arg, meta_arg)], 'transaction_id': transaction_id} @retval A reply message with a dict {'success': success, 'result': {(param_arg, meta_arg): (success, val), ..., (param_arg,meta_arg): (success, val)} 'transaction_id': transaction_id}. """
list or tuple.'
# Set up the transaction reply['success'] = success yield self.reply_ok(msg, reply) return
pass
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else:
# Publish errors, clean up transaction. finally:
# Publish any errors. InstErrorCode.get_string(success) origin, description=desc_str)
self._end_transaction(self.transaction_id)
def op_get_observatory_status(self, content, headers, msg): """ Retrieve the observatory status values, including lifecycle state and other dynamic observatory status values indexed by status keys. @param content A dict {'params': [status_arg, ..., status_arg], 'transaction_id': transaction_id}. @retval Reply message with a dict {'success': success, 'result': {status_arg: (success, val), ..., status_arg: (success, val)}, 'transaction_id': transaction_id} """
list or tuple.'
# Set up the transaction
# Set up the result message.
# If status key not recognized, report error.
# Agent state. (InstErrorCode.OK, self._fsm.get_current_state())
# Connection state. AgentStatus.ALL: (InstErrorCode.OK, self._get_connection_state())
# Alarm conditions. (InstErrorCode.OK, self._alarms)
# Time status. (InstErrorCode.OK, self._time_status)
# Data buffer size. (InstErrorCode.OK, self._get_buffer_size())
# Agent software version. (InstErrorCode.OK, self.get_version())
# Pending transactions. AgentStatus.ALL: [item[3] for item in self._pending_transactions] (InstErrorCode.OK, pending_transaction_pids)
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else: else:
# Publish errors, clean up transaction. finally:
# Publish any errors. InstErrorCode.get_string(success) origin, description=desc_str)
def op_get_capabilities(self, content, headers, msg): """ Retrieve the agent capabilities, including observatory and device values, both common and specific to the agent / device. @param content A dict {'params': [cap_arg, ..., cap_arg], 'transaction_id': transaction_id}. @retval Reply message with a dict {'success': success, 'result': {cap_arg: (success, [cap_val, ..., cap_val]), ..., cap_arg: (success, [cap_val, ..., cap_val])}, 'transaction_id': transaction_id} """
or tuple.'
# Set up the transaction
# Do the work here. # Set up the result message.
InstrumentCapability.ALL:
arg == InstrumentCapability.OBSERVATORY_ALL or \ arg == InstrumentCapability.ALL: (InstErrorCode.OK, AgentCommand.list())
arg == InstrumentCapability.OBSERVATORY_ALL or \ arg == InstrumentCapability.ALL: (InstErrorCode.OK, AgentParameter.list())
arg == InstrumentCapability.OBSERVATORY_ALL or \ arg == InstrumentCapability.ALL: (InstErrorCode.OK, AgentStatus.list())
arg == InstrumentCapability.OBSERVATORY_ALL or \ arg == InstrumentCapability.ALL: (InstErrorCode.OK, MetadataParameter.list())
InstrumentCapability.ALL:
arg == InstrumentCapability.DEVICE_ALL or \ arg == InstrumentCapability.ALL: get(InstrumentCapability.DEVICE_CHANNELS, None) (InstErrorCode.OK, val) else: (InstErrorCode.INVALID_CAPABILITY, None)
arg == InstrumentCapability.DEVICE_ALL or \ arg == InstrumentCapability.ALL: get(InstrumentCapability.DEVICE_COMMANDS, None) (InstErrorCode.OK, val) else: (InstErrorCode.INVALID_CAPABILITY, None)
arg == InstrumentCapability.DEVICE_ALL or \ arg == InstrumentCapability.ALL: get(InstrumentCapability.DEVICE_METADATA, None) (InstErrorCode.OK, val) else: (InstErrorCode.INVALID_CAPABILITY, None)
arg == InstrumentCapability.DEVICE_ALL or \ arg == InstrumentCapability.ALL: get(InstrumentCapability.DEVICE_PARAMS, None) (InstErrorCode.OK, val) else: (InstErrorCode.INVALID_CAPABILITY, None)
arg == InstrumentCapability.DEVICE_ALL or \ arg == InstrumentCapability.ALL: get(InstrumentCapability.DEVICE_STATUSES, None) (InstErrorCode.OK, val) else: (InstErrorCode.INVALID_CAPABILITY, None)
# Unkonwn error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else: else:
# Publish errors, clean up transaction. finally:
# Publish any errors. InstErrorCode.get_string(success) origin, description=desc_str)
self._end_transaction(self.transaction_id)
########################################################################### # Instrument Facing Interface ###########################################################################
def op_execute_device(self, content, headers, msg): """ Execute a command on the device fronted by the agent. Commands may be common or specific to the device, with specific commands known through knowledge of the device or a previous get_capabilities query. @param content A dict {'channels': [chan_arg, ..., chan_arg], 'command': [command, arg, ..., argN], 'transaction_id': transaction_id} @retval A reply message with a dict {'success': success, 'result': {chan_arg: (success, command_specific), ..., chan_arg: (success, command_specific_values)}, 'transaction_id': transaction_id}. """
list or tuple.' or tuple.'
# Set up the transaction reply['success'] = success yield self.reply_ok(msg, reply) return
timeout)
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else:
# Publish errors, clean up transaction. finally:
# Publish any errors. desc_str = 'Error in op_execute_device: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin\ =origin, description=desc_str)
self._end_transaction(self.transaction_id)
def op_get_device(self, content, headers, msg): """ Get configuration parameters from the instrument. @param content A dict {'params': [(chan_arg, param_arg), ..., (chan_arg, param_arg)], 'transaction_id': transaction_id} @retval A reply message with a dict {'success': success, 'result': {(chan_arg, param_arg): (success, val), ..., (chan_arg, param_arg): (success, val)}, 'transaction_id': transaction_id} """
list or tuple.'
# Set up the transaction reply['success'] = success yield self.reply_ok(msg, reply) return
agent_state != AgentState.IDLE and \ agent_state != AgentState.STOPPED: reply['success'] = InstErrorCode.INCORRECT_STATE yield self.reply_ok(msg, reply) return
#pass
# Unkonwn error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else: #reply['success'] = InstErrorCode.OK #reply['result'] = {'parameter': (InstErrorCode.OK, 'value')}
# Publish errors, clean up transaction. finally:
desc_str = 'Error in op_get_device: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
self._end_transaction(self.transaction_id)
def op_set_device(self, content, headers, msg): """ Set parameters to the instrument side of of the agent. @param content A dict {'params': {(chan_arg, param_arg): val, ..., (chan_arg, param_arg): val}, 'transaction_id': transaction_id}. @retval Reply message with a dict {'success': success, 'result': {(chan_arg, param_arg): success, ..., chan_arg, param_arg): success}, 'transaction_id': transaction_id}. """
# Set up the transaction reply['success'] = success yield self.reply_ok(msg, reply) return
reply['success'] = InstErrorCode.INCORRECT_STATE yield self.reply_ok(msg, reply) return
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR raise
# Set reply values. else:
# Publish errors, clean up transaction. finally:
desc_str = 'Error in op_execute_device: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
self._end_transaction(self.transaction_id)
def op_execute_device_direct(self, content, headers, msg): """ Execute untranslated byte data commands on the device. Must be in direct access mode. @param content A dict {'bytes': bytes} @retval A dict {'success': success, 'result': result}. """
self._in_protected_function = True
assert(isinstance(content, dict)), 'Expected a dict content.' assert('bytes' in content), 'Expected bytes.' assert('transaction_id' in content), 'Expected a transaction_id.'
bytes = content['bytes'] tid = content['transaction_id']
assert(isinstance(bytes, str)), 'Expected a bytes string.' assert(isinstance(tid, str)), 'Expected a transaction_id str.'
reply = {'success': None, 'result': None, 'transaction_id': None}
# Set up the transaction success = yield self._verify_transaction(tid, 'execute') if InstErrorCode.is_error(success): reply['success'] = success yield self.reply_ok(msg, reply) return
reply['transaction_id'] = self.transaction_id
agent_state = self._fsm.get_current_state() if agent_state != AgentState.DIRECT_ACCESS_MODE: reply['success'] = InstErrorCode.INCORRECT_STATE yield self.reply_ok(msg, reply) return
timeout = 60 success = None result = None
try:
dvr_result = yield self._driver_client.execute_direct(bytes, timeout) success = dvr_result.get('success', None) result = dvr_result.get('result', None)
# Unknown error. except: success = InstErrorCode.UNKOWN_ERROR raise
# Set reply values. else: reply['success'] = success reply['result'] = result
# Publish errors, clean up transaction. finally:
# Publish any errors. if InstErrorCode.is_error(success): desc_str = 'Error in op_execute_device_direct: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
if (tid == 'create') or (self._transaction_timed_out == True): self._end_transaction(self.transaction_id) self._in_protected_function = False
yield self.reply_ok(msg, reply)
def op_get_device_metadata(self, content, headers, msg): """ Retrieve metadata for the device, its transducers and parameters. @param content A dict {'params': [(chan_arg, param_arg, meta_arg), ..., (chan_arg, param_arg, meta_arg)], 'transaction_id': transaction_id} @retval Reply message with a dict {'success': success, 'result': {(chan_arg, param_arg, meta_arg): (success, val), ..., chan_arg, param_arg, meta_arg): (success, val)}, 'transaction_id': transaction_id}. """
self._in_protected_function = True
assert(isinstance(content, dict)), 'Expected a dict content.' assert('params' in content), 'Expected params.' assert('transaction_id' in content), 'Expected a transaction_id.'
params = content['params'] tid = content['transaction_id']
assert(isinstance(params, (tuple, list))), 'Expected a parameter \ list or tuple.' assert(isinstance(tid, str)), 'Expected a transaction_id str.'
reply = {'success': None, 'result': None, 'transaction_id': None}
# Set up the transaction success = yield self._verify_transaction(tid, 'get') if InstErrorCode.is_error(success): reply['success'] = success yield self.reply_ok(msg, reply) return
reply['transaction_id'] = self.transaction_id
agent_state = self._fsm.get_current_state() if agent_state != AgentState.OBSERVATORY_MODE and \ agent_state != AgentState.IDLE and \ agent_state != AgentState.STOPPED: reply['success'] = InstErrorCode.INCORRECT_STATE yield self.reply_ok(msg, reply) return
timeout = 60 success = None result = None
try:
dvr_content = {'params': params} dvr_result = yield self._driver_client.get_metadata(dvr_content, timeout) success = dvr_result.get('success', None) result = dvr_result.get('result', None)
# Unkown error. except: success = InstErrorCode.UNKNOWN_ERROR
# Set reply values. else: reply['success'] = success reply['result'] = result
# Publish errors, clean up transaction. finally:
# Publish any errors. if InstErrorCode.is_error(success): desc_str = 'Error in op_get_device_metadata: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
if (tid == 'create') or (self._transaction_timed_out == True): self._end_transaction(self.transaction_id) self._in_protected_function = False
yield self.reply_ok(msg, reply)
def op_get_device_status(self, content, headers, msg): """ Obtain the status of an instrument. This includes non-parameter and non-lifecycle state of the instrument. @param content A dict {'params': [(chan_arg, status_arg), ..., chan_arg, status_arg)], 'transaction_id': transaction_id}. @retval A reply message with a dict {'success': success, 'result': {(chan_arg, status_arg): (success, val), ..., (chan_arg, status_arg): (success, val)}, 'transaction_id': transaction_id}. """
self._in_protected_function = True
assert(isinstance(content, dict)), 'Expected a dict content.' assert('params' in content), 'Expected params.' assert('transaction_id' in content), 'Expected a transaction_id.'
params = content['params'] tid = content['transaction_id']
assert(isinstance(params, (tuple, list))), 'Expected a parameter \ list or tuple.' assert(isinstance(tid, str)), 'Expected a transaction_id str.'
reply = {'success': None, 'result': None, 'transaction_id': None}
# Set up the transaction success = yield self._verify_transaction(tid, 'get') if InstErrorCode.is_error(success): reply['success'] = success yield self.reply_ok(msg, reply) return
reply['transaction_id'] = self.transaction_id
agent_state = self._fsm.get_current_state() if agent_state != AgentState.OBSERVATORY_MODE and \ agent_state != AgentState.IDLE and \ agent_state != AgentState.STOPPED: reply['success'] = InstErrorCode.INCORRECT_STATE yield self.reply_ok(msg, reply) return
timeout = 60 success = None result = None
try:
dvr_content = {'params': params} dvr_result = yield self._driver_client.get_status(dvr_content, timeout) success = dvr_result.get('success', None) result = dvr_result.get('result', None)
# Unknown error. except: success = InstErrorCode.UNKNOWN_ERROR
# Set reply values. else: reply['success'] = success reply['result'] = result
# Publish errors, clean up transaction. finally:
# Publish any errors. if InstErrorCode.is_error(success): desc_str = 'Error in op_get_device_metadata: ' + \ InstErrorCode.get_string(success) origin = "agent.%s" % self.event_publisher_origin yield self._log_publisher.create_and_publish_event(origin=\ origin, description=desc_str)
if (tid == 'create') or (self._transaction_timed_out == True): self._end_transaction(self.transaction_id) self._in_protected_function = False
yield self.reply_ok(msg, reply)
########################################################################### # Publishing Methods ###########################################################################
def op_driver_event_occurred(self, content, headers, msg): """ Called by the driver to announce the occurance of an event. The agent take appropriate action including state transitions, data formatting and publication. This method must be called by a child process of the agent. @param content a dict with 'type' and 'transducer' strings and 'value' object. """
yield self.reply_err(msg, 'driver event occured evoked from a non-child process') return
# If data received, coordinate buffering and publishing. # Remember the transducer in case we need to transmit at a time # other than these events.
# Get the driver observatory state.
# If in streaming mode, buffer data and publish at intervals. # strval = self._get_data_string(self._data_buffer)
# If not in streaming mode, always publish data upon receipt. else: #strval = self._get_data_string(value) json_val = json.dumps([value])
#if len(strval) > 0: origin=origin, data_block=json_val)
# Driver configuration changed, publish config.
DriverParameter.ALL)]) #strval = self._get_data_string(result) origin, description=json_val)
pass
# If the driver state changed, publish any buffered data remaining. #strval = self._get_data_string(self._data_buffer) json_val = json.dumps(self._data_buffer) #if len(strval) > 0: if json_val != None: origin = "%s.%s" % (self._prev_data_transducer, self.event_publisher_origin) yield self._log_publisher.create_and_publish_event(origin=\ origin, description=json_val)
elif type == DriverAnnouncement.EVENT_OCCURRED: pass
else: pass
########################################################################### # Driver lifecycle. ###########################################################################
def _start_driver(self): """ Spawn the driver and dynamically construct the client from the current description dictionaries. @retval True if both the client and driver were successfully created, False otherwise. """
'module' in self._client_desc and \ 'class' in self._client_desc and \ self._driver_desc and \ 'module' in self._driver_desc and \ 'class' in self._driver_desc and \ 'name' in self._driver_desc: ' import ' + self._client_desc['class']
# Spawn the driver process.
# If the process desc is bad, trap the error and proceed. # Do not construct client or set member objects. except ImportError: pass
# Process spawn successful, start client, set member objects. else:
# Dynamically construct the client object '(proc=self, target=self._driver_pid)'
# Client import is bad, shutdown driver and exit. except ImportError, NameError: log.info('Client import was bad, shutting down driver: %s' \ % NameError) self._stop_driver()
# Other error, shutdown driver and raise. except: self._stop_driver() raise
# Driver and client constructed. Set client object. else: str(self._driver_client))
""" Add current driver to a list to be shutdown at a convenient time. Destroy the client object. """
""" Shutdown any old driver processes. """
else: new_children.append(item)
""" Shutdown the driver and driver client processes. """
# Shutdown the driver process and remove its reference.
# Add code to correctly shut down the child proc. str(self._driver_pid))
########################################################################### # Other. ###########################################################################
""" @retval The current connection state of the agent, including connection to a remote-side agent, existence of a driver, and connection to instrument hardware. Should be extended to handle cases where there is a persistent shoreside and intermittant wetside agent component. """
return AgentConnectionState.POWERED_DOWN elif curstate == AgentState.INACTIVE: return AgentConnectionState.DISCONNECTED elif curstate == AgentState.IDLE: return AgentConnectionState.CONNECTED elif curstate == AgentState.STOPPED: return AgentConnectionState.CONNECTED elif curstate == AgentState.OBSERVATORY_MODE: return AgentConnectionState.CONNECTED elif curstate == AgentState.DIRECT_ACCESS_MODE: return AgentConnectionState.CONNECTED elif curstate == AgentState.UNKNOWN: return AgentConnectionState.UNKOWN else: return AgentConnectionState.UNKOWN
""" Determine if a process with the given name is a child process @param name The name to test for subprocess-ness @retval True if the name matches a child process name, False otherwise """ name, self.child_procs)
""" Return the total size in characters of the data buffer. Assumes the buffer is a list of string data lines. """
""" Convert a sample dictionary or list of sample dictionaries into a publishable string value. @param data A dictionary containing an instrument data sample in key-value pairs or a list of such dictionaries representing a buffered set of samples. @retval A string representation of the data to be published. """
assert(isinstance(data, (list, tuple, dict))), 'Expected a data dict, \ or a list or tuple of data dicts'
if isinstance(data, dict): return str(data) else: strval = '' for item in data: strval += str(item) + ','
strval = strval[:-1] return strval
""" Get a dictionary of agent parameter values. @retval A dict containing the agent parameters as key-value pairs. """
self.event_publisher_origin
""" Print debug driver events to stdio. @param type String event type. @param transducer String transducer producing the event. @param value Value of the event. """ type, transducer, value)
""" Print debug agent events to stdio. @param event String event type. @param value String event value. """
""" Agent client class provides RPC messaging to the agent service. """
# Increased rpc timeout for agent operations.
########################################################################### # Transaction Management. ###########################################################################
""" Begin an exclusive transaction with the agent. @param acq_timeout An integer in seconds to wait for the transaction. @param exp_timeout An integer in seconds to expire the transaction. @retval Transaction ID UUID string. """
'Expected int or None acquisition timeout.' 'Expected int or None expire timeout.'
'acq_timeout': acq_timeout, 'exp_timeout': exp_timeout }
yield self.rpc_send('start_transaction', params, timeout=rpc_timeout)
else: yield self.rpc_send('start_transaction', params, timeout=self.default_rpc_timeout)
def end_transaction(self, tid): """ End the current transaction. @param tid A uuid string specifying the current transaction to end. """ tid) #yield pu.asleep(1)
########################################################################### # Observatory Facing Interface. ###########################################################################
yield self._check_init() (content, headers, msg) = yield self.rpc_send('hello', text) defer.returnValue(str(content))
def execute_observatory(self, command, transaction_id): """ Execute infrastructure commands related to the Instrument Agent instance. This includes commands for messaging, resource management processes, etc. @param command A command list [command, arg, ,arg]. @param transaction_id A transaction_id uuid4 or string 'create,' or 'none.' @retval Reply dict {'success': success, 'result': command-specific, 'transaction_id': transaction_id}. """
'Expected a transaction_id str.'
self.rpc_send('execute_observatory', content, timeout=self.default_rpc_timeout)
""" Get data from the cyberinfrastructure side of the agent (registry info, topic locations, messaging parameters, process parameters, etc.) @param params A paramter list [param_arg, ,param_arg]. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval A reply dict {'success': success, 'result': {param_arg: (success, val), ..., param_arg: (success, val)},'transaction_id': transaction_id}. """
'Expected a transaction_id str.'
self.rpc_send('get_observatory', content, timeout=self.default_rpc_timeout)
""" Set parameters related to the infrastructure side of the agent (registration information, location, network addresses, etc.) @param params A parameter-value dict {'params': {param_arg: val, ..., param_arg: val}. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval Reply dict {'success': success, 'result': {param_arg: success, ..., param_arg: success}, 'transaction_id': transaction_id}. """ 'Expected a transaction_id str.'
self.rpc_send('set_observatory', content, timeout=self.default_rpc_timeout)
""" Retrieve metadata about the observatory configuration parameters. @param params A metadata parameter list [(param_arg, meta_arg), ..., (param_arg, meta_arg)]. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval A reply dict {'success': success, 'result': {(param_arg, meta_arg): (success, val), ..., (param_arg, meta_arg): (success, val)}, 'transaction_id': transaction_id}. """ 'Expected a transaction_id str.'
self.rpc_send('get_observatory_metadata', content, timeout=self.default_rpc_timeout)
""" Retrieve the observatory status values, including lifecycle state and other dynamic observatory status values indexed by status keys. @param params A parameter list [status_arg, ..., status_arg]. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval Reply dict {'success': success, 'result': {status_arg: (success, val), ..., status_arg: (success, val)}, 'transaction_id': transaction_id} """ 'Expected a transaction_id str.'
self.rpc_send('get_observatory_status', content, timeout=self.default_rpc_timeout)
""" Retrieve the agent capabilities, including observatory and device values, both common and specific to the agent / device. @param params A parameter list [cap_arg, ..., cap_arg]. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval Reply dict {'success': success, 'result': {cap_arg: (success, val), ...,cap_arg: (success, val)}, 'transaction_id': transaction_id} """ 'Expected a transaction_id str.'
self.rpc_send('get_capabilities', content, timeout=self.default_rpc_timeout)
########################################################################### # Instrument Facing Interface. ###########################################################################
""" Execute a command on the device fronted by the agent. Commands may be common or specific to the device, with specific commands known through knowledge of the device or a previous get_capabilities query. @param channels A channels list [chan_arg, ..., chan_arg]. @param command A command list [command, arg, ..., argN]). @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval A reply dict {'success': success, 'result': {chan_arg: (success, command_specific_values), ..., chan_arg: (success, command_specific_values)}, 'transaction_id': transaction_id}. """ 'Expected a transaction_id str.'
'transaction_id': transaction_id} self.rpc_send('execute_device', content, timeout=self.default_rpc_timeout)
""" Get configuration parameters from the instrument. @param params A parameters list [(chan_arg, param_arg), ..., (chan_arg, param_arg)]. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval A reply dict {'success': success, 'result': {(chan_arg, param_arg): (success, val), ..., (chan_arg, param_arg): (success, val)}, 'transaction_id': transaction_id} """
'Expected a transaction_id str.'
self.rpc_send('get_device', content, timeout=self.default_rpc_timeout)
""" Set parameters to the instrument side of of the agent. @param params A parameter-value dict {(chan_arg, param_arg): val, ..., (chan_arg, param_arg): val}. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval Reply dict {'success': success, 'result': {(chan_arg, param_arg): success, ..., chan_arg, param_arg): success}, 'transaction_id': transaction_id}. """ 'Expected a transaction_id str.'
self.rpc_send('set_device', content, timeout=self.default_rpc_timeout)
""" Retrieve metadata for the device, its transducers and parameters. @param params A metadata parameter list [(chan_arg, param_arg, meta_arg), ..., (chan_arg, param_arg, meta_arg)]. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval Reply dict {'success': success, 'result': {(chan_arg, param_arg, meta_arg): (success, val), ..., (chan_arg, param_arg, meta_arg): (success, val)}, 'transaction_id': transaction_id}. """
assert(isinstance(params, list)), 'Expected a parameter list.' assert(isinstance(transaction_id, str)), \ 'Expected a transaction_id str.'
yield self._check_init() content = {'params': params, 'transaction_id': transaction_id} (content, headers, messaage) = yield \ self.rpc_send('get_device_metadata', content, timeout=self.default_rpc_timeout)
assert(isinstance(content, dict)) defer.returnValue(content)
""" Obtain the status of an instrument. This includes non-parameter and non-lifecycle state of the instrument. @param params A parameter list [(chan_arg, status_arg), ..., (chan_arg, status_arg)]. @param transaction_id A transaction ID uuid4 or string 'create,' or 'none.' @retval A reply dict {'success': success, 'result': {(chan_arg, status_arg): (success, val), ..., (chan_arg, status_arg): (success, val)}, 'transaction_id': transaction_id}. """
assert(isinstance(params, list)), 'Expected a parameter list.' assert(isinstance(transaction_id, str)), \ 'Expected a transaction_id str.'
yield self._check_init() content = {'params': params, 'transaction_id': transaction_id} (content, headers, messaage) = yield \ self.rpc_send('get_device_status', content, timeout=self.default_rpc_timeout)
assert(isinstance(content, dict)) defer.returnValue(content)
""" Execute untranslated byte data commands on the device. Must be in direct access mode and possess the correct transaction_id key for the direct access session. @param bytes An untranslated block of data to send to the device. @param transaction_id A transaction ID uuid4 specifying the direct access session. @retval A reply dict {'success': success, 'result': bytes}. """
assert(bytes), 'Expected command bytes.' assert(isinstance(transaction_id, str)), \ 'Expected a transaction_id str.'
yield self._check_init() content = {'bytes': bytes, 'transaction_id': transaction_id} (content, headers, messaage) = yield \ self.rpc_send('execute_device_direct', content, timeout=self.default_rpc_timeout)
assert(isinstance(content, dict)) defer.returnValue(content)
########################################################################### # Publishing interface. ###########################################################################
# op_publish and op_driver_event_occurred are used by the driver # child process and are not invoked through a client.
########################################################################### # Registration interface. ###########################################################################
def register_resource(self, instrument_id): """ Register the resource. Since this is a subclass, make the appropriate resource description for the registry and pass that into the registration call. """
""" ia_instance = InstrumentAgentResourceInstance() ci_params = yield self.get_observatory([driver_address]) ia_instance.driver_process_id = ci_params[driver_address] ia_instance.instrument_ref = ResourceReference( RegistryIdentity=instrument_id, RegistryBranch='master') result = yield ResourceAgentClient.register_resource(self, ia_instance) defer.returnValue(result) """ pass
# Spawn of the process using the module name |