Coverage for ion/services/dm/scheduler/scheduler_service : 82.67%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/services/dm/scheduler/scheduler_service.py @date 9/21/10 @author Paul Hubbard @package ion.services.dm.scheduler.service Implementation of the scheduler """
# get configuration
# constants from https://confluence.oceanobservatories.org/display/syseng/Scheduler+Events # import these and use them to schedule your events, they should be in the "desired origin" field
""" message AddTaskRequest { enum _MessageTypeIdentifier { _ID = 2601; _VERSION = 1; }
// desired_origin is where the event notification will originate from // this is not required to be sent... one will be generated if not // interval is seconds between messages // payload is ref to some GPB
optional string desired_origin = 1; optional uint64 interval_seconds = 2; optional sint64 start_time = 3; // format:UNIX epoch, in ms, can be unset, will use current time optional sint64 end_time = 4; // format:UNIX epoch, in ms, can be unset optional string user_id = 5; optional net.ooici.core.link.CASRef payload = 6; }
"""
""" message AddTaskResponse { enum _MessageTypeIdentifier { _ID = 2602; _VERSION = 1; }
// the string guid // the origin is where the event notifications will come from
optional string task_id = 1; optional string origin = 2; }
"""
""" message RmTaskRequest { enum _MessageTypeIdentifier { _ID = 2603; _VERSION = 1; }
// task id is GUID optional string task_id = 1;
} """
""" message QueryTaskRequest { enum _MessageTypeIdentifier { _ID = 2605; _VERSION = 1; }
optional string task_regex = 1;
} """
""" message QueryTaskResponse { enum _MessageTypeIdentifier { _ID = 2606; _VERSION = 1; }
// can be an empty list repeated string task_ids = 1;
} """
""" Raised when invalid params are passed to an op on the scheduler. """ pass
""" First pass at a message-based cron service, where you register a send-to address, interval and payload, and the scheduler will message you when the timer expires. @note this will be subsumed into CEI at some point; consider this a prototype. """ # Declaration of service version='0.1.1', dependencies=['attributestore'])
'desired_origin', 'interval_seconds', 'payload', 'user_id', 'constant', 'start_time', 'end_time' ]
""" Specifically derived IndexStore for scheduler use. We do NOT want to use class variables for storage, we want fresh copies on every instance. """
self.index_store_class = pu.get_class(index_store_class_name) else:
'The back end class for the index store passed to the scheduler service does not implement the required IIndexStore interface.'
# Get the configuration for cassandra - may or may not be used depending on the backend class #self._storage_conf = get_cassandra_configuration()
# maps task_ids to IDelayedCall objects, popped off when callback is called, used to cancel tasks
# will move pub through the lifecycle states with the service
def slc_init(self):
log.info("Instantiating CassandraStore") keyspace = self._storage_conf[PERSISTENT_ARCHIVE]['name']
self.scheduled_events = self.index_store_class(self._username, self._password, self._storage_provider, keyspace, self._column_family)
yield self.scheduled_events.initialize() yield self.scheduled_events.activate()
yield self.register_life_cycle_object(self.scheduled_events) log.info("Done with instantiating the Cassandra store")
else:
def slc_activate(self): # get all items from the store
log.debug("slc_activate: scheduling %s" % task_id)
# could be None try: start_time = int(tdef['start_time']) except ValueError: start_time = None
self._schedule_event(start_time, int(tdef['interval_seconds']), task_id)
""" Called before terminate, this is a good place to tear down the AS and jobs. @todo iterate over the list foreach task in op_query: rm_task(task) """
""" Helper method to schedule and record a callback in the service. Used by op_add_task and on startup.
@param starttime The time to start the callbacks. This is used with the interval to calculate the first callback. If None is specified, will use now. Note: the first callback to occur will not happen immediatly, it will be after the first interval has elapsed, whether starttime is specified or not. This parameter should be specified in UNIX epoch format, in ms. You will have to convert the output from time.time() in Python, or use the IonTime utility class. @param interval The interval to trigger scheduler events, in seconds. @param task_id The task_id to trigger. @param query_result Internal param passed on from service activation which contains the query result already instead of making _send_and_reschedule go get it again. """
# determine first callback time # we started a while ago, so just find what is remaining of the interval from now else: # start time is in THE FUTURE
def op_add_task(self, content, headers, msg): """ @brief Add a new task to the crontab. Interval is in seconds. @param content Message payload, must be a GPB #2601 @param headers Ignored here @param msg Ignored here @retval reply_ok or reply_err """
# need to sanity check this input
raise SchedulerError("start_time %d out of allowable range (%d to %d)" % (starttime, (-sys.maxint-1)*1000, sys.maxint*1000), content.ResponseCodes.BAD_REQUEST)
# now make sure start time + interval is in the same range raise SchedulerError("start_time + interval %d out of allowable range (%d to %d)" % (starttime + msg_interval, (-sys.maxint-1)*1000, sys.maxint*1000), content.ResponseCodes.BAD_REQUEST)
else: # extract, serialize else: payload = None log.warn("Scheduler does not handle end_time yet!") endtime = content.end_time else: user_id = content.user_id else:
log.exception('Required keys in op_add_task content not found!') raise SchedulerError(str(ke), content.ResponseCodes.BAD_REQUEST)
# check to see if the task_id already exists in the store
#create the response: task_id and actual origin
# extract content of message task_id, # ok to use for value? seems kind of silly index_attributes={'task_id': task_id, 'constant': '1', # used for being able to pull all tasks 'user_id': user_id, 'start_time': str(starttime), 'end_time': str(endtime), 'interval_seconds': str(msg_interval), 'desired_origin': desired_origin, 'payload': str(payload)})
# Now that task is stored into registry, add to messaging callback
def op_rm_task(self, content, headers, msg): """ Remove a task from the list/store. Will be dropped from the reactor when the timer fires and _send_and_reschedule checks the registry. """
err = 'required argument task_id not found in message' log.error(err) self.reply_err(msg, {'value': err}) return
# if the task is active, remove it
################################################## # Internal methods
""" Check to see if we're still in the store - if not, we've been removed and should abort the run.
@param query_result Internal param - passed in from Scheduler activation, which has already done a query. Allows us to skip making another query for the info we already have. """
log.error("Query did not find task_id: %s, expected 1, got %d" % (task_id, len(tdefs))) defer.returnValue(False)
else: tdef = query_result
# pop callback object off of scheduled items log.warn("task_id %s no longer in list of callbacks, aborting" % task_id) defer.returnValue(False)
# deserialize and objectify payload
task_id=tdef['task_id'], user_id=tdef['user_id'])
except: log.info('No payload found or payload in incorrect format')
################################################# ## BANDAID FIX FOR 262 RE-OPEN ## ## In live system, Jamie noticed a scheduler crash on this line, saying that the Message/Repository ## is invalidated already before going into this call, which promptly crashes it and no longer ## executes scheduled events. This try/except prevents this from occurring so scheduled events ## will still run. ## ## Proper fix: find out why msg/repo is invalid! ## ################################################# except Exception, ex: log.error("Could not clear repository: %s" % str(ex)) pass
# start time of None is fine, we just happened so we can be sure interval_seconds is just about right
""" Client class for the SchedulerService, simple muster/send/reply. """
def add_task(self, msg): """ @brief Add a recurring task to the scheduler @param msg protocol buffer @GPB(Input,2601,1) @GPB(Output,2602,1) @retval Task ID and origin """
def rm_task(self, msg): """ @brief Remove a task from the scheduler @note If using cassandra, writes are delayed @param msg protocol buffer @GPB(Input,2603,1) @GPB(Output,2604,1) @retval OK or error """ #log.info("In SchedulerServiceClient: rm_task")
# Spawn of the process using the module name |