Coverage for ion/services/sa/instrument_management : 83.26%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/services/sa/data_acquisition.py @author Michael Meisinger @brief service for data acquisition """
# Setup a subscriber to an event topic #self.msgs = [] #self.pdata=[] #self.max_points=50
self.msgs.append(data) #convert the incoming string into a dict list = msg.additional_data.data_block.split(';') dataDict = {} for entry in list: key, val = entry.split(':') dataDict[key] = val
log.info("IMSSRVC SBE37DataEventSubscriber new dict: %s", dataDict )
description = [('time','string', 'time (Seconds)'), ('temperature','number', 'temperature')] self.pdata.append([str(dataDict['time' ]), float(dataDict['temperature']) ] )
dlen = len(self.pdata) if dlen > self.max_points: self.pdata = self.pdata[dlen-self.max_points : ]
data_table = gviz_api.DataTable(description) data_table.LoadData(self.pdata) #json = data_table.ToJSon(columns_order=("name", "salary"),order_by="salary") json = data_table.ToJSon()
# Make message for the screen below hdr = '<p>Timestamp: %s </p>\n' % pu.currenttime()
page = line_template % {'msg':hdr,'json':json}
#log.info("IMSSRVC SBE37DataEventSubscriber page: %s", page)
#self.queue_result(queue,page,'Google Viz of message counts') """
""" Instrument management service interface. This service provides overall coordination for instrument management within an observatory context. In particular it coordinates the access to the instrument and data product registries and the interaction with instrument agents. """
# Declaration of service version='0.1.0', dependencies=[])
#self.irc = InstrumentRegistryClient(proc=self) #self.dprc = DataProductRegistryClient(proc=self) #self.arc = AgentRegistryClient(proc=self) #self.dpsc = DataPubsubClient(proc=self)
def op_create_new_instrument(self, content, headers, msg): """ Service operation: Accepts a dictionary containing user inputs. Updates the instrument registry. """
# Set the attributes resource.name = str(userInput['name'])
resource.description = str(userInput['description'])
def op_create_new_data_product(self, content, headers, msg): """ Service operation: Accepts a dictionary containing user inputs. Updates the data product registry. Also sets up an ingestion pipeline for an instrument """ dataProductInput = content['dataProductInput']
""" newdp = DataProductResource.create_new_resource() if 'instrumentID' in dataProductInput: inst_id = str(dataProductInput['instrumentID']) int_ref = ResourceReference(RegistryIdentity=inst_id, RegistryBranch='master') newdp.instrument_ref = int_ref
if 'name' in dataProductInput: newdp.name = str(dataProductInput['name'])
if 'description' in dataProductInput: newdp.description = str(dataProductInput['description'])
if 'dataformat' in dataProductInput: newdp.dataformat = str(dataProductInput['dataformat'])
# Step: Create a data stream ## Instantiate a pubsubclient #self.dpsc = DataPubsubClient(proc=self) # ## Create and Register a topic #self.topic = PubSubTopicResource.create('SBE49 Topic',"oceans, oil spill") #self.topic = yield self.dpsc.define_topic(self.topic) #log.debug('DHE: Defined Topic') # #self.publisher = PublisherResource.create('Test Publisher', self, self.topic, 'DataObject') #self.publisher = yield self.dpsc.define_publisher(self.publisher)
res = yield self.dprc.register_data_product(newdp) ref = res.reference(head=True)
yield self.reply_ok(msg, res.encode()) """
def op_execute_command(self, content, headers, msg): """ Service operation: Execute a command on an instrument. """
else: raise ValueError("Input for instrument agent resource ID not present")
else: raise ValueError("Input for command not present")
# Step 1: Extract the arguments from the UI generated message content #commandInput = content['commandInput']
#Retreieve the instrument agent resource from the RR
#create the client for this instrumnet agent
# Begin an explicit transaciton.
#Execute the command
# End the transaction.
def op_get_instrument_state(self, content, headers, msg): """ Service operation: . """ else: raise ValueError("Input for instrument agent resource ID not present")
# Step 1: Extract the arguments from the UI generated message content #commandInput = content['commandInput']
#Retreieve the instrument agent resource from the RR
#create the client for this instrumnet agent
# Check agent state upon creation. No transaction needed for # get operation.
def op_start_instrument_agent(self, content, headers, msg): """ Service operation: Starts an instrument agent for a type of instrument. """
else: raise ValueError("Input for instrumentID not present")
else: raise ValueError("Input for instrumentResourceID not present")
else: raise ValueError("Input for model not present")
raise ValueError("Only SBE37 supported!")
#yield self._start_container()
# Driver and agent configuration. Configuration data will ultimatly be # accessed via some persistance mechanism: platform filesystem # or a device registry. For now, we pass all configuration data # that would be read this way as process arguments. 'ipport':sbe_port, 'ipaddr':sbe_host }
# Process description for the SBE37 driver. 'name':'SBE37_driver', 'module':'ion.agents.instrumentagents.SBE37_driver', 'class':'SBE37Driver', 'spawnargs':{'config':driver_config} }
# Process description for the SBE37 driver client. 'name':'SBE37_client', 'module':'ion.agents.instrumentagents.SBE37_driver', 'class':'SBE37DriverClient', 'spawnargs':{} }
# Spawnargs for the instrument agent. 'driver-desc':driver_desc, 'client-desc':driver_client_desc, 'driver-config':driver_config, 'agent-config':agent_config }
# Process description for the instrument agent. 'name':'instrument_agent', 'module':'ion.agents.instrumentagents.instrument_agent', 'class':'InstrumentAgent', 'spawnargs':spawnargs }
# Processes for the tests. agent_desc ]
# Spawn agent and driver, create agent client. #self.sup1 = yield bootstrap.create_supervisor()
#self.ia_client.register_resource(content['instrumentResourceID'])
#store the new instrument agent in the resource registry
# Set the attributes
#Store the resource in the registry
#Associate this agent to the instrument # Put the association and the resources in the datastore
#https://github.com/ooici/ioncore-python/blob/r1lca/ion/services/dm/presentation/web_viz_consumer.py #https://github.com/ooici/ioncore-python/blob/r1lca/ion/services/dm/distribution/consumers/timeseries_consumer.py
def op_stop_instrument_agent(self, content, headers, msg): """ Service operation: Starts direct access mode. """ yield self.reply_err(msg, "Not yet implemented")
def op_start_direct_access(self, content, headers, msg): """ Service operation: Starts direct access mode. """ yield self.reply_err(msg, "Not yet implemented")
def op_stop_direct_access(self, content, headers, msg): """ Service operation: Stops direct access mode. """ yield self.reply_err(msg, "Not yet implemented")
def get_agent_desc_for_instrument(self, instrument_id): log.info("get_agent_desc_for_instrument() instrumentID="+str(instrument_id)) """ int_ref = ResourceReference(RegistryIdentity=instrument_id, RegistryBranch='master') agent_query = InstrumentAgentResourceInstance() agent_query.instrument_ref = int_ref
if not agent_res: defer.returnValue(None) agent_pid = agent_res.proc_id log.info("Agent process id for instrument id %s is: %s" % (instrument_id, agent_pid)) defer.returnValue(agent_pid) """
def get_agent_for_instrument(self, inst_resource_id):
result = None instrument_resource = yield self.rc.get_instance(inst_resource_id) try: results = yield self.ac.find_associations(subject=instrument_resource, predicate_or_predicates=HAS_A_ID)
except AssociationClientError: log.error('AssociationError') defer.returnValue(result)
for association in results: log.info('Associated Source for Instrument: ' + \ association.ObjectReference.key + \ ' is: ' + association.SubjectReference.key)
instrument_agent_resource = yield self.rc.get_instance(association.ObjectReference.key)
""" log.info("get_agent_for_instrument() instrumentID="+str(instrument_id)) int_ref = ResourceReference(RegistryIdentity=instrument_id, RegistryBranch='master') agent_query = InstrumentAgentResourceInstance() agent_query.instrument_ref = int_ref # @todo Need to list the LC state here. WHY??? agent_query.lifecycle = LCStates.developed agents = yield self.arc.find_registered_agent_instance_from_description(agent_query, regex=False) log.info("Found %s agent instances for instrument id %s" % (len(agents), instrument_id)) agent_res = None if len(agents) > 0: agent_res = agents[0] defer.returnValue(agent_res) """
def get_agent_pid_for_instrument(self, instrument_id): """ agent_res = yield self.get_agent_for_instrument(instrument_id) if not agent_res: defer.returnValue(None) agent_pid = agent_res.proc_id log.info("Agent process id for instrument id %s is: %s" % (instrument_id, agent_pid)) defer.returnValue(agent_pid) """
""" Class for the client accessing the instrument management service. """
def create_new_instrument(self, userInput):
def create_new_data_product(self, dataProductInput): reqcont = {} reqcont['dataProductInput'] = dataProductInput
result = yield self.rpc_send('create_new_data_product', reqcont) defer.returnValue(result)
def start_instrument_agent(self, instrumentID, instrumentResourceID, model): #result = yield self._base_command('start_instrument_agent', reqcont)
def stop_instrument_agent(self, instrumentID): reqcont = {} reqcont['instrumentID'] = instrumentID result = yield self._base_command('stop_instrument_agent', reqcont) defer.returnValue(result)
def get_instrument_state(self, instrument_agent_id): #commandInput = {} #reqcont['commandInput'] = commandInput
def execute_command(self, instrument_agent_id, cmd): #commandInput['instrumentID'] = instrumentID #commandInput['command'] = command #if arglist: # argnum = 0 # for arg in arglist: # commandInput['cmdArg'+str(argnum)] = arg # argnum += 1 #reqcont['commandInput'] = commandInput
def _base_command(self, op, content): (cont, hdrs, msg) = yield self.rpc_send(op, content) defer.returnValue(cont)
# Spawn of the process using the module name |