Coverage for ion/services/dm/inventory/dataset_controller : 71.26%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/services/dm/inventory/dataset_controller.py @author David Stuebe @brief An example service definition that can be used as template for resource management. """
HAS_LIFE_CYCLE_STATE_ID, OWNED_BY_ID, DATASET_RESOURCE_TYPE_ID, ANONYMOUS_USER_ID
""" message Dataset { enum _MessageTypeIdentifier { _ID = 10001; _VERSION = 1; } optional net.ooici.core.link.CASRef root_group = 1; } """
""" message IDRef { enum _MessageTypeIdentifier { _ID = 4; _VERSION = 1; } required string key = 1; optional string branch = 3; optional bytes commit = 4; } """
""" message FindDatasetMessage { enum _MessageTypeIdentifier { _ID = 2401; _VERSION = 1; }
optional bool only_mine = 1 ; optional net.ooici.services.coi.LifeCycleState by_life_cycle_state = 2 [default = ACTIVE]; } """
""" message QueryResult{ enum _MessageTypeIdentifier { _ID = 22; _VERSION = 1; } repeated net.ooici.core.link.CASRef idrefs = 1; } """
""" An exception class for the Dataset Controller Service """
""" This class provides the messaging hooks to invoke rsync on receipt of scheduler messages. """
def ondata(self, data): log.debug('Got a rsync update message from the scheduler') yield self.hook_fn()
""" The Dataset Controller service
"""
# Declaration of service version='0.1.0', dependencies=['scheduler'])
def slc_terminate(self):
log.debug('Removing scheduled task: %s' % self.sched_task_id) msg = yield self.message_client.create_instance(RMTASK_REQ_TYPE) msg.task_id = self.sched_task_id yield self.ssc.rm_task(msg)
# Service class initializer. Basic config, but no yields allowed.
#self.private_key = self.spawn_args.get('private_key' , # CONF.getValue('private_key')) #self.public_key = self.spawn_args.get('public_key' , # CONF.getValue('public_key'))
# As per DS, pull config from spawn args first and config file(s) second CONF.getValue('thredds_ncml_url', default='datactlr@thredds.oceanobservatories.org:/opt/tomcat/ooici_tds_data')) CONF.getValue('update_interval', default=5.0)) CONF.getValue('ncml_path', default='/tmp')) # Which Q to receiver scheduler messages? CONF.getValue('queue_name', default='data_controller_scheduler'))
CONF.getValue('task_id', default=str(uuid.uuid4())))
self.spawn_args.get('do-init', CONF.getValue('do-init', default=False)))
if self.walrus == 'False': self.walrus = False elif self.walrus == 'True': self.walrus = True else: raise Exception("Invalid input to dataset controller: argument 'do-init' is True or False" ) raise Exception("Invalid input to dataset controller: argument 'do-init' is True or False" )
def slc_init(self): """ Service life cycle state. Initialize service here. Can use yields.
Can be called in __init__ or in slc_init... no yield required """
queue_name=self.queue_name, origin=SCHEDULE_TYPE_DSC_RSYNC, process=self) # Add the receiver as a registered life cycle object
log.debug('I am the walrus.') yield self._create_scheduled_event() else:
def _create_scheduled_event(self): log.debug('creating scheduled event')
msg = yield self.message_client.create_instance(SCHEDULER_ADD_REQ_TYPE) msg.interval_seconds = int(self.update_interval) msg.task_id = self.task_id msg.desired_origin = SCHEDULE_TYPE_DSC_RSYNC
log.debug('Sending request to scheduler') resp = yield self.ssc.add_task(msg) self.sched_task_id = resp.task_id
log.debug('got scheduler response OK')
#noinspection PyUnusedLocal def do_ncml_sync(self): """ @brief On receipt of scheduler message, do rsync with server, moving any new ncml files over. """ log.debug('rsync scheduled beginning now') #if check_for_ncml_files(self.ncml_path):
query_result = yield self._get_active_dataset_resources()
for id_ref in query_result.idrefs: create_ncml(id_ref.key, self.ncml_path)
log.debug('NcML files found, invoking rsync') self.cwd = getcwd() chdir(self.ncml_path) yield do_complete_rsync(self.ncml_path, self.server_url)
chdir(self.cwd) log.debug('rsync complete')
defer.returnValue(None)
#else: # log.debug('No ncml found, doing nothing')
#noinspection PyUnusedLocal def op_create_dataset_resource(self, request, headers, msg): """ @Brief This method creates an empty dataset resource and returns its ID. It assumes that the caller provides an Instrument Info Object in a Resource Configuration Request message which should be made into a resource.
@param params request GPB, ?, Is there anything in the request? What? @retval response, GPB 12/1, a response containing the dataset resource ID """
# Check only the type received and linked object types. All fields are #strongly typed in google protocol buffers! # This will terminate the hello service. As an alternative reply okay with an error message raise DatasetControllerError('Expected Null message type, received %s' % str(request), request.ResponseCodes.BAD_REQUEST)
# Use the resource client to create a resource! ResourceName='CDM Dataset Resource', ResourceDescription='None')
# What state should this be in at this point? #resource.ResourceLifeCycleState = resource.DEVELOPED
# Create a reference to return to the caller # This is one pattern - it exposes the resource to the caller
# pass the reference
# Set a response code in the message envelope
# pfh - create local ncml file as well. These accumulate and are # harvested by the scheduled rsync # Per DS, empty datasets will cause thredds problems # @bug Test with thredds #create_ncml(response.key, self.ncml_path)
# The following line shows how to reply to a message
def op_find_dataset_resources(self, request, headers, msg): """ @Brief set the lifecycle state of the dataset resource
@param params request GPB, 2401/1, a request to find datasets @retval ListFindResults Type, GPB 22/1, A list of Dataset Resource References that match the request """
# Check only the type recieved and linked object types. All fields are #strongly typed in google protocol buffers! # This will terminate the hello service. As an alternative reply okay with an error message raise DatasetControllerError('Expected message type FindDataSetRequest, received %s' % str(request), request.ResponseCodes.BAD_REQUEST)
### Check the type of the configuration request
# Set the predicate search term
# Set the Object search term
### Check the type of the configuration request
# Add a life cycle state request
# Set the predicate search term
# Set the Object search term
# Set the predicate search term
# Set the Object search term
# Get the user to associate with this new resource
# The result is the same type
def _get_active_dataset_resources(self): """ get the dataset ids to create the tds ncml
No longer only gets active datasets - should be renamed! """
log.info('_get_active_dataset_resources: ')
query = yield self.message_client.create_instance(PREDICATE_OBJECT_QUERY_TYPE)
pair = query.pairs.add()
# Set the predicate search term pref = query.CreateObject(PREDICATE_REFERENCE_TYPE) pref.key = TYPE_OF_ID
pair.predicate = pref
# Set the Object search term
type_ref = query.CreateObject(IDREF_TYPE) type_ref.key = DATASET_RESOURCE_TYPE_ID
pair.object = type_ref
''' #### Add a life cycle state request pair = query.pairs.add()
# Set the predicate search term pref = query.CreateObject(PREDICATE_REFERENCE_TYPE) pref.key = HAS_LIFE_CYCLE_STATE_ID
pair.predicate = pref
# Set the Object search term state_ref = query.CreateObject(LCS_REFERENCE_TYPE) # @TODO What state should we use? state_ref.lcs = state_ref.LifeCycleState.ACTIVE pair.object = state_ref '''
result = yield self.asc.get_subjects(query)
# The result is the same type defer.returnValue(result)
""" Dataset Controller Svc Client """
def create_dataset_resource(self, msg):
def set_dataset_resource_life_cycle(self, msg): yield self._check_init()
(content, headers, msg) = yield self.rpc_send('set_dataset_resource_life_cycle', msg)
defer.returnValue(content)
def find_dataset_resources(self, msg):
# Spawn of the process using the module name
|