Coverage for ion/services/dm/ingestion/ingestion : 78.13%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/services/dm/ingestion/ingestion.py @author Michael Meisinger @author David Stuebe @author Dave Foster <dfoster@asascience.com> @author Tim LaRocque (client changes only) @brief service for registering resources
To test this with the Java CC! > scripts/start-cc -h amoeba.ucsd.edu -a sysname=eoitest res/scripts/eoi_demo.py """
# For testing - used in the client
""" An error occured during the begin_ingest op of IngestionService. """ pass
# Declare a few strings:
# Supplememt added event message dict:
""" DM R1 Ingestion service. """
# Declaration of service
# Declare the excluded types for repository operations CDM_DOUBLE_ARRAY_TYPE, CDM_FLOAT_ARRAY_TYPE, CDM_STRING_ARRAY_TYPE, CDM_OPAQUE_ARRAY_TYPE)
# Service class initializer. Basic config, but no yields allowed.
def slc_activate(self):
def op_create_dataset_topics(self, content, headers, msg_in): """ Creates ingestion and notification topics that can be used to publish ingestion data and notifications about ingestion. """
# OOIION-4: made creation of topics via PSC configurable (def: false) due to performance
# @TODO: adapted from temp reg publisher code in publisher_subscriber, update as appropriate
else:
""" Specially derived EventSubscriber that routes received messages into a custom handler that is similar to the main Process.receive method, but eliminates problems and handles Exceptions better. """ def _receive_handler(self, content, msg): """ Let the ondata method handle acking the message. """
""" Determines if the ingestion data topic is a valid topic for ingestion. The topic should have been registered via op_create_dataset_topics prior to ingestion. @TODO: this """
def _prepare_ingest(self, content): """ Factor out the preparation for ingestion so that we can unit test functionality """
# Get the current state of the dataset:
# Get the bounded arrays but not the ndarrays
def _setup_ingestion_topic(self, content, convid):
# TODO: replace this from the msg itself with just dataset id
# TODO: validate ingest_data_topic
def _handle_ingestion_msg(self, payload, msg, convid): """ Handles recv_dataset, recv_chunk, recv_done
This code is basically Process.receive, but without the conversation/user-id business which comprises most of that. It also adds proper error handling in the context of the ingestion, so if one of these messages triggers one, it will error out of the op_ingest call appropriately. """
# set msg._state to anything to prevent auto-acking
else:
# set flag to prevent routing while in process of termination
# ack the message to make the receiver stack happy (subscriber still active here, so this is ok)
# all error handling goes back to op_ingest
def op_ingest(self, content, headers, msg): """ Start the ingestion process by setting up necessary @TODO NO MORE MAGNET.TOPIC """
raise IngestionError('Expected message type PerformIngestRequest, received %s' % str(content), content.ResponseCodes.BAD_REQUEST)
# Keep track of the update branch name
log.info("Timed out in op_perform_ingest") self._defer_ingest.errback(IngestionError('Time out in communication between the JAW and the Ingestion service', content.ResponseCodes.TIMEOUT))
'Notifying caller that ingest is ready by invoking op_ingest_ready() using routing key: "%s"' % content.reply_to)
# exceptions like IngestionError/ApplicationError can be raised while waiting for this deferred - they will # get pushed via errback to here. We mostly just want them to go through the usual exception stack, but # we should send out a failure notification before we do so.
# clear the repository
# we have to notify that there is a failure, so get details and setup the dict to pass to notify_ingest.
# reraise - in the case of ApplicationError, will simply reply to the original sender # do NOT reraise in the case of a timeout on our side - JAW will timeout client-side # ack the msg #yield msg.ack()
# just return from here defer.returnValue(False) else:
finally: # we finished waiting (either success/failure/timeout), cancel the timeout if active
# reset ingestion deferred so we can use it again
# remove subscriber, deactivate it
# reset terminating flag after shutting down Subscriber # WARNING: MESSAGES MAY STILL ROUTE, CHECK STATE OF TIMEOUTCB IN EACH HANDLER
data_details = self.get_data_details(content)
if isinstance(ingest_res, dict): ingest_res.update(data_details) else: ingest_res={EM_ERROR:'Ingestion Failed!'} ingest_res.update(data_details)
resources = []
if ingest_res.has_key(EM_ERROR): log.info("Ingest Failed! %s" % str(ingest_res))
# Make sure we have the master branch checked out: if self.dataset.Repository.current_branch_key != self.dataset.Repository.branchnicknames.get('master'): self.dataset.Repository.reset() yield self.dataset.Repository.checkout(branchname='master')
# Make sure we don't push a dataset that is partly merged or ingested if len(self.dataset.Repository.branches) is not 1:
if self.update_branch_key is None: log.warn(str(self.dataset)) raise IngestionError('Update branch key is not set but the dataset has more than one branch',500)
# Remove the branch that was created self.dataset.Repository.remove_branch(self.update_branch_key)
# Set the LCS to inactive if there was an error during Merge #self.dataset.ResourceLifeCycleState = self.dataset.INACTIVE #@TODO This should only happen if there was an error during merge - since the state is now reset - we don't need it?
else: log.info("Ingest succeeded!")
resources.append(self.dataset)
# If the dataset / source is new if self.dataset.ResourceLifeCycleState != self.dataset.ACTIVE:
self.dataset.ResourceLifeCycleState = self.dataset.ACTIVE
# Do not push the dataset if it has more than one branch if len(self.dataset.Repository.branches) is not 1: log.warn(str(self.dataset)) # raise an exception and kick over the service so the resource is reset raise IngestionError('Dataset was left in an invalid state with more than one branch.',500)
try: yield self.rc.put_instance(self.dataset) except ResourceClientError, rce: ingest_res[EM_ERROR] = 'Ingestion put_instance operation failed!' log.exception('Ingestion put_instance operation failed!')
yield self._notify_ingest(ingest_res)
self.dataset=None self.data_source = None
# now reply ok to the original message yield self.reply_ok(msg)
log.info('op_ingest - Complete')
title = att.GetValue()
references = att.GetValue()
EM_URL:references, EM_DATA_SOURCE:content.datasource_id, EM_DATASET:content.dataset_id, }
def _notify_ingest(self, ingest_res): """ Generate a notification/event that an ingest succeeded.
This method is really not needed but I like it for testing... """
# Report an error with the data source
# Don't use **kw args - it may fail depending on what is in the dict... #yield self._notify_unavailable_publisher.create_and_publish_event(origin=datasource_id, **ingest_res)
else: # Report a successful update to the dataset
# Don't use **kw args - it may fail depending on what is in the dict... #yield self._notify_ingest_publisher.create_and_publish_event(origin=dataset_id, **ingest_res)
log.debug("_ingest_op_recv_dataset received a routed message AFTER shutdown occured (probably during an ApplicationError). Discarding this message!") # set msg._state to anything to prevent auto-ack msg._state = "ACKED" defer.returnValue(None)
# reset timeout
# notify JAW and others via event that we are still processing dataset_id=self.dataset.ResourceIdentity, ingestion_process_id=self.id.full, conv_id=convid, processing_step="dataset")
% str(content), content.ResponseCodes.BAD_REQUEST)
raise IngestionError('Calling recv_dataset in an invalid state. No Dataset checked out to ingest.')
raise IngestionError('Calling recv_dataset in an invalid state. Dataset is already modified.')
# Clear any bounded arrays which are empty. Create content field if it is not present
# Clear empty bounded arrays that may be sent by dac del content.bounded_arrays[i]
continue else: else: var.content = self.dataset.CreateObject(CDM_ARRAY_STRUCTURE_TYPE)
log.debug("_ingest_op_recv_chunk received a routed message AFTER shutdown occured (probably during an ApplicationError). Discarding this message!") # set msg._state to anything to prevent auto-ack msg._state = "ACKED" defer.returnValue(None)
# reset timeout
# this is NOT rpc raise IngestionError('Expected message type SupplementMessageType, received %s' % str(content), content.ResponseCodes.BAD_REQUEST)
raise IngestionError('Calling recv_chunk in an invalid state. No Dataset checked out to ingest.')
raise IngestionError('Calling recv_chunk in an invalid state. Dataset is not on an update branch! (currently: %s)' % str(self.dataset.ResourceLifeCycleState))
# OOIION-191: sanity check field dataset_id disabled as DatasetAgent does not have the information when making these messages. #if content.dataset_id != self.dataset.ResourceIdentity: # raise IngestionError('Calling recv_chunk with a dataset that does not match the received chunk!.')
# notify JAW and others via event that we are still processing dataset_id=self.dataset.ResourceIdentity, ingestion_process_id=self.id.full, conv_id=convid, processing_step="chunk")
# Get the group out of the datset
# get the bounded array out of the message
# Create a blobs message to send to the datastore with the ndarray
# Put it to the datastore except ReceivedError, re: log.error(re) raise IngestionError('Could not put blob in received chunk to the datastore.')
# Now add the bounded array, but not the ndarray to the dataset in the ingestion service except gpb_wrapper.OOIObjectError, oe: log.error(str(oe)) raise IngestionError('Expected variable name %s not found in the dataset' % (content.variable_name))
""" @TODO deal with FMRC datasets and supplements """
log.debug("_ingest_op_recv_done received a routed message AFTER shutdown occured (probably during an ApplicationError). Discarding this message!") # set msg._state to anything to prevent auto-ack msg._state = "ACKED" defer.returnValue(None)
# notify JAW and others via event that we are still processing dataset_id=self.dataset.ResourceIdentity, ingestion_process_id=self.id.full, conv_id=convid, processing_step="done")
raise IngestionError('Expected message type Data Acquasition Complete Message Type, received %s' % str(content), content.ResponseCodes.BAD_REQUEST)
elif st == 2: status = SERVER_ERROR elif st == 3: status = NO_NEW_DATA elif st == 4: status = AGENT_ERROR else: raise IngestionError('Unexpected StatusCode Enum value in Data Acquisition Complete Message')
else:
#@TODO ask dave for help here - how can I chain these callbacks?
elif self.data_source.aggregation_rule == self.data_source.AggregationRule.FMRC:
result = yield self._merge_fmrc_supplement()
# this is NOT rpc
# trigger the op_perform_ingest to complete!
def _merge_overwrite_supplement(self):
# Perform necessary premerge operations and unpack the result cur_etime, \ cur_agg_dim_length, \ sup_root, \ sup_stime, \ sup_etime, \ sup_agg_dim_length, \ sup_agg_dim_name, \ sup_fcst_dim_name, \ is_new_ds, \ result = yield self.__premerge(is_fmrc=False)
# Calculate offsets and indices for positioning the supplement in the dataset
sup_eindex, \ insertion_offset, \ runtime_offset, \ forecast_offset = yield self.__calculate_merge_offsets(is_overwrite, cur_root, sup_root, sup_agg_dim_name, sup_stime, sup_etime, cur_etime, cur_agg_dim_length, sup_agg_dim_length, sup_fcst_dim_name)
# Perform the merge
def _merge_fmrc_supplement(self):
log.debug('_merge_fmrc_supplement - Start') is_overwrite = False
# Perform necessary premerge operations and unpack the result cur_root, \ cur_etime, \ cur_agg_dim_length, \ sup_root, \ sup_stime, \ sup_etime, \ sup_agg_dim_length, \ sup_agg_dim_name, \ sup_fcst_dim_name, \ is_new_ds, \ result = yield self.__premerge(is_fmrc=True)
# Calculate offsets and indices for positioning the supplement in the dataset log.debug('************* START __calculate_merge_offsets() *************')
sup_sindex, \ sup_eindex, \ insertion_offset, \ runtime_offset, \ forecast_offset = yield self.__calculate_merge_offsets(is_overwrite, cur_root, sup_root, sup_agg_dim_name, sup_stime, sup_etime, cur_etime, cur_agg_dim_length, sup_agg_dim_length, sup_fcst_dim_name)
log.debug('************* END __calculate_merge_offsets() *************')
log.info('>> cur_agg_dim_length = %i' % cur_agg_dim_length) log.info('>> sup_agg_dim_length = %i' % sup_agg_dim_length) log.info('>> sup_sindex = %i' % sup_sindex) log.info('>> sup_eindex = %i' % sup_eindex) log.info('>> insertion_offset = %i' % insertion_offset) log.info('>> (FMRC) runtime_offset hours = %i' % runtime_offset) log.info('>> (FMRC) forecast_offset hours = %i' % forecast_offset)
# Perform the merge merge_res = yield self.__merge(is_overwrite, cur_root, sup_root, sup_agg_dim_name, sup_fcst_dim_name, sup_stime, cur_etime, sup_sindex, sup_eindex, insertion_offset, runtime_offset, forecast_offset, is_new_ds) result.update(merge_res)
log.debug('_merge_overlapping_supplement - Complete') defer.returnValue(result)
def _merge_overlapping_supplement(self):
# Perform necessary premerge operations and unpack the result cur_etime, \ cur_agg_dim_length, \ sup_root, \ sup_stime, \ sup_etime, \ sup_agg_dim_length, \ sup_agg_dim_name, \ sup_fcst_dim_name, \ is_new_ds, \ result = yield self.__premerge(is_fmrc=False)
# Calculate offsets and indices for positioning the supplement in the dataset
sup_eindex, \ insertion_offset, \ runtime_offset, \ forecast_offset = yield self.__calculate_merge_offsets(is_overwrite, cur_root, sup_root, sup_agg_dim_name, sup_stime, sup_etime, cur_etime, cur_agg_dim_length, sup_agg_dim_length, sup_fcst_dim_name)
# Perform the merge
""" Performs the following steps to facilitate common merge operations: 1) Provides basic sanity checks on the dataset 2) Ensures merge data is commited in the datasets repository 3) Extracts time aggregation dimension and other fields to be used in subsequent merge operations (such as in _merge_overwrite_supplement() and _merge_overlapping_supplement()
@return: A tuple containing the following items (in order): sup_stime - the start time of the merge dataset in seconds sup_etime - the end time of the merge dataset in seconds """
# A little sanity check on entering recv_done... raise IngestionError('The dataset is in a bad state - there should be two branches (currently %d) in the repository state on entering recv_done.' % len(self.dataset.Repository.branches), 500)
# Commit the current state of the supplement - ingest of new content is complete
# The current branch on entering recv done is the supplement branch
# Merge it with the current state of the dataset in the datastore
#Remove the head for the supplement - there is only one current state once the merge is complete!
# Get the cur_root group of the current state of the dataset
# Get the cur_root group of the supplement we are merging
# Determine which time variable to aggregate on log.info('(FMRC) Determining model time and forecast time dimension names') sup_agg_dim_name = 'run' sup_fcst_dim_name = 'time' try: sup_agg_dim = sup_root.FindDimensionByName(sup_agg_dim_name) except OOIObjectError, oe: raise IngestionError('FMRC Supplement is missing the dimension "%s"' % sup_agg_dim_name)
else:
result={EM_ERROR:'Error during ingestion: No Time variable found!'} defer.returnValue(result)
# Determine the inner most dimension on which we are aggregating
# Add each dimension in reverse order so that the inside dimension is always in front... to determine the time aggregation dimension
#print 'FINAL DIMENSION ORDER:' #print [ dim.name for dim in dimension_order]
# This is the inner most!
# Retrieve various attributes about the time dimensions
# Get the start time of the supplement
except OOIObjectError, oe: raise IngestionError('No start time attribute found in dataset supplement!' + str(oe)) # this is an error - the attribute must be present to determine how to append the data supplement time coordinate!
# Get the end time of the supplement
except OOIObjectError, oe: raise IngestionError('No end time attribute found in dataset supplement!' + str(oe)) # this is an error - the attribute must be present to determine how to append the data supplement time coordinate!
EM_END_DATE:sup_etime*1000})
# Get the start/end time indices of the current dataset and applicable offsets
# This is not an error - it is a new dataset. # ATTENTION: Is there any benefit to determining if there is a gap in the sequence here?
def __calculate_merge_offsets(self, is_overwrite, cur_root, sup_root, sup_agg_dim_name, sup_stime, sup_etime, cur_etime, cur_agg_dim_length, sup_agg_dim_length, sup_fcst_dim_name):
# defaults to length so that if there are no overlaps (during data append) # we adjust the dimension length by adding this value.. otherwise, this # var will be replaced so that it represents the differences between the # number of values being inserted and the number of values that insertion # would replace. # Example 1: When appending data which doesn't overwrite: # insertion_offset = (length - 0) = length # Example 2: When overwriting identical data: # insertion_offset = (length - length = 0 # Example 3: (insetion) When 5 values overwrite 4 values: # insertion =_offset = (5 - 4) = 1
# If cur_etime is None then the current dataset does not exist. # In this case the cur_agg_dim_length is 0 -- and therefore all our offsets will be as well # insertion_offset will be the sup_agg_dim_length, which is equal to the total number of values we are inserting
else:
# First calculate runtime and forecast offsets (these will affect the other offsets) log.info('__calculate_merge_offsets(): Calculating runtime and forecast time (normalization) offsets for FMRC dataset...')
# Get the runtime and forecast time from dataset and supplement log.debug('__calculate_merge_offsets(): Retrieving time variables from dataset and supplement...') try: cur_agg_var = cur_root.FindVariableByName(sup_agg_dim_name) cur_fcst_var = cur_root.FindVariableByName(sup_fcst_dim_name) except OOIObjectError, ex: raise IngestionError('__calculate_merge_offsets(): FMRC Dataset must have (model) Run Time and Forecast Time dimension, respectivly named ("%s", "%s"). Inner Exception: %s' % (sup_agg_dim_name.encode('utf-8'), sup_fcst_dim_name.encode('utf-8'), str(ex)))
try: sup_agg_var = sup_root.FindVariableByName(sup_agg_dim_name) sup_fcst_var = sup_root.FindVariableByName(sup_fcst_dim_name) except OOIObjectError, ex: raise IngestionError('__calculate_merge_offsets(): FMRC Supplement must have (model) Run Time and Forecast Time dimension, respectivly named ("%s", "%s"). Inner Exception: %s' % (sup_agg_dim_name.encode('utf-8'), sup_fcst_dim_name.encode('utf-8'), str(ex)))
# Build a list containing a tuple of time variable name, units string, units base time (millis) log.debug('__calculate_merge_offsets(): Building a list of tuples with (time variable name, units string) for each time variable...') units_list = [] units_list.append((sup_agg_var.name, sup_agg_var.GetUnits())) units_list.append((cur_agg_var.name, cur_agg_var.GetUnits())) units_list.append((sup_fcst_var.name, sup_fcst_var.GetUnits())) units_list.append((cur_fcst_var.name, cur_fcst_var.GetUnits()))
log.debug('__calculate_merge_offsets(): Calculating units base time for each time variable...') import re time_units_regex = 'hours since ([\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}:[\d]{2}Z)'
for i in range(len(units_list)): vname = units_list[i][0] units = units_list[i][1] m = re.search(time_units_regex, units)
if m is None: raise IngestionError('Units attribute for time variable "%s" must be "hours since yyyy-MM-ddTHH:mm:ssZ" to perform an FMRC aggregation' % units_list[i][0].encode('utf-8'))
base_time_millis = calendar.timegm(time.strptime(m.group(1), '%Y-%m-%dT%H:%M:%SZ'))
units_list[i] = (vname, units, base_time_millis) log.debug('__calculate_merge_offsets(): Adding tuple values to variable units list: %s' % str(units_list[i]))
# Determine the difference between the units in the dataset and supplement for runtime and forecast time runtime_offset_seconds = units_list[0][2] - units_list[1][2] forecast_offset_seconds = units_list[2][2] - units_list[3][2]
# If the dataset is an FMRC the units of the dataset and supplement may differ -- adding runtime_offset_seconds here # accomodates for this case by normalizing the supplements start and end time to be based on the same units as the # the current dataset. sup_stime += runtime_offset_seconds sup_etime += runtime_offset_seconds
else: # Find where the supplement start time and end time should lay in the dataset (supplement_sindex, supplement_eindex) # Find how the origins in the dataset's data should change according to this position (insertion_offset)
# Adjust indices when the supplement times lay between indices in the current dataset sup_sindex = -sup_sindex - 1
# Calculate the insertion offset -- how bounded_arrays ordered after the overwritten section must be offset
else:
# For overlap - determine how many indices overlap to determine our offsets
# Overlap has some restrictions-- elif time_index == -1: raise IngestionError('Supplement data does not actualy overlap the current data! Supplement will NOT be merged! index:%i' % time_index) else: raise IngestionError('Overlapping supplement data does not match current data! Supplement will NOT be merged! index:%i' % time_index)
else:
def __merge(self, is_overwrite, cur_root, sup_root, sup_agg_dim_name, sup_fcst_dim_name, sup_stime, cur_etime, sup_sindex, sup_eindex, insertion_offset, runtime_offset, forecast_offset, is_new_ds):
### ### Add the dimensions from the supplement to the current state if they are not already there ###
raise IngestionError('Can not ingest supplement with different dimensions than the dataset') else: # ! New dataset -- add all dimensions
else: # ! We are appending an existing dataset - adjust the length of the aggregation dimension
### ### Adjust the values of the model time and forecast time in the supplement if this is FMRC: ### # log.debug('before FMRC time var normalize') # if runtime_offset is not 0: # runtime_var = sup_root.FindVariableByName(sup_agg_dim_name) # need_keys = [ba.GetLink('ndarray').key for ba in runtime_var.content.bounded_arrays] # yield self._fetch_blobs(cur_root.Repository, need_keys) # # # @attention: # # @bug: The following lines fail because the merge dataset cannot be modified, we have to figure out a way to modify these values before merging # # @attention: # for ba in runtime_var.content.bounded_arrays: # for i in range(len(ba.ndarray.value)): # ba.ndarray.value[i] += runtime_offset
# if forecast_offset is not 0: # forecast_var = sup_root.FindVariableByName(sup_fcst_dim_name) # need_keys = [ba.GetLink('ndarray').key for ba in forecast_var.content.bounded_arrays] # yield self._fetch_blobs(cur_root.Repository, need_keys) # # for ba in forecast_var.content.bounded_arrays: # for i in range(len(ba.ndarray.value)): # ba.ndarray.value[i] += forecast_offset
### ### Add/merge the variables from the supplement to the current state if they are not already there ### Merge variable if it is dimensioned on the aggregation dimension (sup_agg_dim) ###
# Step 1: Copy new variables from the supplement into the dataset
else: raise IngestionError('Variable %s does not exist in the dataset. Supplement is invalid!' % var_name)
# Step 2: Skip merge for variables which are not dimensioned on the sup_agg_dim # @todo: check to see if supplement shape and dataset shape don't match (like if time dimensions are at different indices) else:
# @todo: check attributes for variables which are not aggregated....
# Step 3: Merge variable values # If this is a new dataset applying all these offsets is not necessary # Step 3a: (overwrite) Restructure the data in the dataset # This requires the following: # 1) Removing bounded arrays which are totally contained in the supplement # 2) Breaking apart bounded arrays which contain 1 or more overlapping indices # and throwing away the overlapping region # 3) Adjusting the indices of all bounded_arrays positioned after the overwrite
# (contains check) # @todo: add unit tests for this case! # Remove this bounded array log.debug('(contains) Removing bounded array from variable "%s" [index:%i, origin:(%s), size:(%s)]' % ( \ var.name.encode('utf-8'), \ i, \ str([bbb.origin for bbb in ba.bounds]), \ str([bbb.size for bbb in ba.bounds]), \ ))
# (intersects check) # @todo: add unit tests for this case! # Split the bounded array according to what data overlaps
# if the supplement starts after this bounded_array's origin... # Create a new bounded_array containing only values leading up to the supplement (along the agg dimension) log.debug('(intersects) Split bounded array for variable "%s". Created [left] [index:%i, origin:(%s), size:(%s)]' % ( \ var.name.encode('utf-8'), \ i, \ str([bbb.origin for bbb in new_ba.bounds]), \ str([bbb.size for bbb in new_ba.bounds]), \ ))
# Create a new bounded_array containing only values from the end of the supplement to the end of the existing bounded_array (along the agg dim) log.debug('(intersects) Split bounded array for variable "%s". Created [right] [index:%i, origin:(%s), size:(%s)]' % ( \ var.name.encode('utf-8'), \ i, \ str([bbb.origin for bbb in new_ba.bounds]), \ str([bbb.size for bbb in new_ba.bounds]), \ ))
# Either way remove this bounded array log.debug('(intersects) Removing bounded array from variable "%s" [index:%i, origin:(%s), size:(%s)]' % ( \ var.name.encode('utf-8'), \ i, \ str([bbb.origin for bbb in ba.bounds]), \ str([bbb.size for bbb in ba.bounds]), \ ))
# (post insertion check) # @todo: add unit tests for this case! # Adjust the origin using insertion_offset
# Step 3b: Normalize the supplement origins to zero # Since the supplement is ReadOnly calculate the offset here.. and apply to the dataset afterwards # Replace with Dim - min_offset?
# Step 3c: Merge the supplement into the dataset
# Keep track of these in case we need to adjust the values
need_keys = [ba.GetLink('ndarray').key for ba in merge_var.content.bounded_arrays] yield self._fetch_blobs(cur_root.Repository, need_keys)
# @attention: # @bug: The following lines fail because the merge dataset cannot be modified, we have to figure out a way to modify these values before merging # @attention: for ba in new_bas: for i in range(len(ba.ndarray.value)): ba.ndarray.value[i] += runtime_offset
need_keys = [ba.GetLink('ndarray').key for ba in merge_var.content.bounded_arrays] yield self._fetch_blobs(cur_root.Repository, need_keys)
for ba in new_bas: for i in range(len(ba.ndarray.value)): ba.ndarray.value[i] += forecast_offset
# Step 4: Merge variable attributes
for merge_att in merge_var.attributes: log.error('Merge Att: %s, %s, %s' % (merge_att.name, str(merge_att.GetValue()), base64.encodestring(merge_att.MyId)[0:-1]))
for att in var.attributes: log.error('Att: %s, %s, %s' % (att.name, str(att.GetValue()), base64.encodestring(att.MyId)[0:-1]))
#@TODO turn this error detection back on! #raise ImportError('Variable %s attributes are not the same in the supplement!' % var_name)
### ### Add/merge the attributes from the supplement to the current state ### # @TODO Get the vertical positive 'direction!' Deal with attributes accordingly.
except OOIObjectError, oe: log.debug(oe) merge_vertical_positive = None
raise IngestionError('Can not merge a data supplement that switches vertical positive!')
else: # Take which ever one is not None
# @TODO Need a better method to merge these - determine the greater extent of a wrapped coordinate
# @TODO Need a better method to merge these - determine the greater extent of a wrapped coordinate
# Check vert vmin/vmax for NaN, either is NaN or missing, don't merge
# Only check for vmax if vmin is available (if either value is not available, we can't continue)
elif vertical_positive == 'up': cur_root.MergeAttGreater(att_name, sup_root)
else: raise OOIObjectError('Invalid value for Vertical Positive but ion_geospatial_vertical_min is present') else: root_min = cur_root.HasAttribute('ion_geospatial_vertical_min') root_max = cur_root.HasAttribute('ion_geospatial_vertical_max')
# if cur_root doesnt have vmin/vmax, add new attributes with default values... if not root_min and not root_max: cur_root.AddAttribute('ion_geospatial_vertical_min', cur_root.DataType.DOUBLE, float('nan')) cur_root.AddAttribute('ion_geospatial_vertical_max', cur_root.DataType.DOUBLE, float('nan'))
# Check vert vmin/vmax for NaN, either is NaN or missing, don't merge
# Only check for vmax if vmin is available (if either value is not available, we can't continue)
elif vertical_positive == 'up': cur_root.MergeAttLesser(att_name, sup_root)
else: raise OOIObjectError('Invalid value for Vertical Positive but ion_geospatial_vertical_max is present') else: root_min = cur_root.HasAttribute('ion_geospatial_vertical_min') root_max = cur_root.HasAttribute('ion_geospatial_vertical_max')
# if cur_root doesnt have vmin/vmax, add new attributes with default values... if not root_min and not root_max: cur_root.AddAttribute('ion_geospatial_vertical_min', cur_root.DataType.DOUBLE, float('nan')) cur_root.AddAttribute('ion_geospatial_vertical_max', cur_root.DataType.DOUBLE, float('nan'))
# @TODO is this the correct treatment for history?
else:
except OOIObjectError, oe:
log.exception('Attribute merger failed for global attribute: %s' % att_name) result[EM_ERROR] = 'Error during ingestion of global attributes'
except ValueError, ex:
log.exception('Attribute merger failed for global attribute "%s". Cause: %s' % (att_name, str(ex)))
#Parse the atts - try to short cut logic to identify time...
log.debug('Found standard name "time" in variable named: %s' % var.name)
time_vars.append(var) break # Don't continue to iterate attributes
# elif att.name == '_CoordinateAxisType' and (att.GetValue() == 'Time' or att.GetValue() =='RunTime'): # log.debug('Found _CoordinateAxisType "%s" in variable name: %s' % (att.GetValue().encode('utf-8'), var.name.encode('utf-8'))) # # time_vars.append(var) # break # Don't continue to iterate attributes
else:
# Do not raise an exception here if none are found - let the ingestion method handle the issue based on the return
def _get_ndarray_keys(cls, dataset_variable): """ @Brief: Retrieves all the ID reference keys for the ndarrays contained in all the bounded_arrays of the given variable. This method is useful to acquire the keys needed in order to fetch blobs from the dataset since, by default, the ndarrays of variables are excluded when datasets are checked out. """
def _get_ndarray_vals(cls, time_variable): """ @Brief: Retrieves all the values of all the bounded arrays in the given time_variable @return: A sorted list of tuples containing the values index followed by the value at that index. Since a variable's bounded array's may contain duplicate data, this array may contain tuples which specify the same first value. When this is the case, it is useful to ensure that the two values specified at the same index match, otherwise the variable is corrupt. NOTE: This validation is NOT accomplished by this method.
@note: This method will not yet work on multidimensional variables (more than one item in the bounded array's list of bounds). This is because iteration over such a structure is quite complicated and currently unnessary since this method is only used for time variables (true time coordinate variables will only ever have one dimension -- time)
@note: This method assumes all blobs for the components of the given time_variable (bounded_arrays, ndarrays, etc) have been fetched. If they have not it will fail with a KeyError. """ raise IngestionException('_get_ndarray_vals does not support enflating bounded arrays with more than one dimension -- yet')
def _fetch_blobs(self, repo, fetch_keys): """ Helper method to fetch blobs from the Datastore
Determines which keys in fetch_keys are not already loaded in the given repositories index_hash and then fetches the blobs for those remaining keys from the datastore. The repositories index_hash is then updated as expected
@attention: This is more of an 'exgest' facility. Should this method be moved to a move cohesive location (datastore)? @attention: Consider exposing and using datastore.DataStoreWorkbench._get_blobs() instead of this method """ raise IngestionError('Cannot fetch blobs for a non-existant repository')
# Find which keys are needed (those not already in the repository's index_hash)
# workbench_keys = set(self._workbench_cache.keys()) # local_keys = workbench_keys.intersection(need_keys)
# for key in local_keys: # for key in need_keys: # try: # repo.index_hash.get(key) # need_keys.remove(key) # except KeyError, ke: # log.info('Key disappeared - get it from the remote after all')
except ReceivedError, re: log.debug('ReceivedError', str(re)) raise IngestionError('Could not fetch ndarray blobs from the datastore during merge! Cause: "%s"' % re.msg_content)
# Put the new objects in the repository
""" @todo: Add documentation about what negative value returns mean """ # Step 1: Get a handle to an array of values from the given time variable # Step 1a: Gather a list of keys for blobs which need to be fetched
log.debug('>> (ndarray) need_keys = %s' % str(need_keys))
# Step 1b: Fetch all blobs for the ndarrays in the time_var
# Step 1c: Now, grab all the array values from the ndarrays.. log.debug('>> ndarray values = %s' % str(values))
# Step 2: Perform a linear search in the values array for each time in search_times # a) Binary search will most likely be performed nearer worse case (logN time) - since the # overlapping region should (on average) be fairly small # * Linear search may be optimal for average case # ** Linear search is "required" when indexing into the current data (rather than the # supplement) because we must traverse multiple bounded arrays
# Step 2a: Perform the linear search to see where in the set of values our search_times lay # @note: this is a good place to check if two entries in the time variables bounded_arrays # contain mismatched values for the same index -- this shouldn't happen however results_dict[search_time] = -(i + 1) search_times_cpy.remove(search_time) # else search_time > val: continue
# don't use the copy of the list after this point -- structure is indeterminant
# Step 2b: @todo: Make sure that we aren't experiencing a roundoff issue by checking how close # our search_start is to the values before and after it in the list (floats only)
def subset_bounded_array(self, repo, old_ba, idx, min, max): """ Creates a new bounded array which is a subset of the given bounded_array. "repo" is used to construct the resultant object but it is the callers responsibility to attach the bounded array to something (such as a variable)
@param repo: The repository in which the bounded_array subset will be created @param old_ba: The bounded array from which a new subset will be created @param idx: The index of the subset dimension in var.bounds @param min: The minimum index for the subset (inclusive) along the subset dimension (idx) @param max: The maximum index for the subset (exclusive) along the subset dimension (idx)
@note: When determining if the index of an element in the bounded_array lies within the range given by min and max, the origin of that bounded_array is applied first. As such, min and max should specify canonical indices of data values in the datastructure itself rather than specifying indices of values in the given bounded_array's underlying ndarray.
Example: For a 1-D bounded array with: origin=12; size=5; values=[10,11,12,13,14]
yield subset_bounded_array(..., idx=0, min=1, max=4) returns nothing yield subset_bounded_array(..., idx=0, min=13, max=16) returns BA [11,12,13]
"""
# Step 1: Fetch blobs for this bounded_array
# Step 1: Create the bounds
else:
# Step 2: Create the ndarray
# If this value is in our subset range (for the dimension indexed by idx)
# Make sure the actual length of the new ndarray matches the length indicated by # the cumulative sizes of the arrays bounds
""" Class for the client accessing the resource registry. """
# Step 1: Delegate initialization to parent "ServiceClient"
# Step 2: Perform Initialization
# self.rc = ResourceClient(proc=self.proc)
def ingest(self, msg): """ Start the ingest process by passing the Service a topic to communicate on, a routing key for intermediate replies (signaling that the ingest is ready), and a custom timeout for the ingest service (since it may take much longer than the default timeout to complete an ingest) @param msg, GPB 2002/1, a PerformIngestMessage @retval Result is an empty ION Message, reply_ok @GPB{Input,2002,1} """ # Ensure a Process instance exists to send messages FROM... # ...if not, this will spawn a new default instance.
# Invoke [op_]() on the target service 'dispatcher_svc' via RPC
defer.returnValue(content)
def create_dataset_topics(self, msg):
@defer.inlineCallbacks def send_dataset(self, topic, msg): ''' For testing the service...''' yield self._check_init() yield self.proc.send(topic, operation='recv_dataset', content=msg)
@defer.inlineCallbacks def send_chunk(self, topic, msg): ''' For testing the service...''' yield self._check_init() yield self.proc.send(topic, operation='recv_chunk', content=msg)
@defer.inlineCallbacks def send_done(self, topic, msg): ''' For testing the service...''' yield self._check_init() yield self.proc.send(topic, operation='recv_done', content=msg) """
# Spawn of the process using the module name |