Coverage for ion/services/dm/distribution/publisher_subscriber : 93.88%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@file ion/services/dm/distribution/publisher_subscriber.py @author Paul Hubbard @author Dave Foster <dfoster@asascience.com> @brief Publisher/Subscriber classes for attaching to processes """
""" An object that is registerable with the PubSubController.
Provides a mixin-like baseclass to multiple inherit from. Done so because PSC registration is optional and more of a bolt-on style of operation. This baseclass contains the common setup code, the Publisher and Subscriber override as appropriate.
The initializer of this class does the import of things that may not succeed due to cyclical dependencies - import this too early and it will fail etc. It handles all of this and sets a "_pcsr_import_ok" attr on the class to indicate success of registration. """
# save these imports as locals - so we don't have to do another protected import
# flag
except ImportError: log.warn("Could not import MC or PSC in PSCRegisterable base, likely a circular reference")
credentials=None, **kwargs): ''' Common PSC setup methods shared between pub/sub derived implementations. Call this baseclass method first.
@returns A boolean indicating success. Will only return false if the PSCRegisterable was not setup correctly at startup, which indicates an import error and a cyclical import situation. In that case, PSC registration will not be possible. '''
log.warn("Trying to psc_setup when _pscr_import_ok not true!") defer.returnValue(False)
#noinspection PyUnusedLocal """ @brief This represents publishers of (mostly) science data. Intended use is to be instantiated within another class/process/codebase, as an object for sending data to OOI. """
""" Initializer for a Publisher.
@param xp_name Exchange Point name. @param routing_key The routing key this publisher will use. May be None or overridden when calling publish. @param credentials Credentials to use. @param process The owning process of this Publisher. Must be specified. """
# TODO: will the user specify this? will the PSC get it? 'exchange_type' : 'topic', 'durable': False, 'mandatory': True, 'immediate': False, 'warn_if_exists': False }
# we use base Receiver here as we only send with it, no consumption which the base Receiver doesn't do well
# monkey patch receiver as we don't want any of its initialize or activate items running, but we want it to be in the right state pass
pass
# call base class boilerplate log.warn("Publisher could not register with PSC") defer.returnValue(False)
# Register the publisher and save the casref msg.credentials = kwargs['credentials']
""" @brief Publish data on a specified resource id/topic @param data Data, OOI-format, protocol-buffer encoded @param routing_key Routing key to publish data on. Normally the Publisher uses the routing key specified at construction time, but this param may be overriden here. @retval Deferred on send, not RPC """
# set up the sender/sender-name to make it look as if the owning process is doing the sending, which at some level it # technically is. 'content' : data, 'headers' : {'sender-name' : self._process.proc_name }, 'operation' : None, 'sender' : self._process.id.full }
# =================================================================================
""" A factory class for building Publisher objects. """
""" Initializer. Sets default properties for calling the build method.
These default are overridden by specifying the same named keyword arguments to the build method.
@param routing_key Name of the routing key to publish messages on. Typically not set as a factory-wide setting. @param xp_name Name of exchange point to use @param credentials Placeholder for auth* tokens @param process Owning process of the Publisher. @param publisher_type Specific derived Publisher type to construct. You can define a custom Publisher derived class for any custom behavior. If left None, the standard Publisher class is used. """
""" Creates a publisher and calls register on it.
The parameters passed to this method take defaults that were set up when this SubscriberFactory was initialized. If None is specified for any of the parameters, or they are not filled out as keyword arguments, the defaults take precedence.
@param routing_key The AMQP routing key that the Publisher will publish its data to. @param xp_name Name of exchange point to use @param credentials Placeholder for auth* tokens @param process Owning process of the Publisher. @param publisher_type Specific derived Publisher type to construct. You can define a custom Publisher derived class for any custom behavior. If left None, the standard Publisher class is used. """
# OOIION-4: making automatic registration of publishers optional due to speed # Register does the PSC invocations
""" @brief This represents subscribers, both user-driven and internal (e.g. dataset persister) @note All returns are HTTP return codes, 2xx for success, etc, unless otherwise noted. @todo Need a subscriber receiver that can hook into the topic xchg mechanism """
""" Initializer for Subscribers.
@param xp_name Exchange Point name. @param binding_key The binding key (AMQP) to attach this Subscriber to. Can be left blank if attaching to an existing queue. @param queue_name Queue name to attach to. If the queue exists, attaches to it. If not, it is created. If left blank, an anonymous queue is created. @param credentials Credentials for the Subscriber. @param process Owning process of this Subscriber. Must be specified. @param durable If the queue is durable (AMQP), which means the broker will recreate the queue if it restarts. This should not be specified with an anonymous queue name. @param auto_delete If the queue has the auto_delete setting (AMQP), which means that on last consumer detatch from that queue, the queue is deleted. """
# sanity check: worth a warning but not an assert log.warn("Subscriber() - specified no queue name with durable %s or auto_delete %s" % (str(durable), str(auto_delete)))
# set up comms details 'exchange_type' : 'topic', # TODO 'durable': durable, 'auto_delete': auto_delete, 'mandatory': True, 'immediate': False, 'warn_if_exists': False, 'routing_key' : self._binding_key, # may be None, if so, no binding is made to the queue (routing_key is incorrectly named in the dict used by Receiver) 'queue' : self._queue_name, # may be None, if so, the queue is made anonymously (and stored in receiver's consumer.queue attr) }
# TODO: name? str(len(process._registered_life_cycle_objects)) handler=self._receive_handler, consumer_config=consumer_config)
pass
def on_activate(self, *args, **kwargs):
def on_terminate(self, *args, **kwargs):
def register(self): """ Registers this Subscriber with the PSC. Call this prior to calling initialize, as default initialize will try to create a queue. """
# call base class boilerplate log.warn("Subscriber could not register with PSC") defer.returnValue(False)
# Subscriber # @note Not using credentials yet log.debug('Sorry, ignoring the credentials!')
msg.queue_name = self._queue_name
def subscribe(self): """ """
""" @brief Remove a subscription @retval Return code only """ # @TODO: is this even used anywhere? #yield self._pscr_pscc.unsubscribe(self._resource_id)
def _receive_handler(self, data, msg): """ Default handler for messages received by the SubscriberReceiver. Acks the message and calls the ondata handler. @param data Data packet/message matching subscription @param msg Message instance @return The return value of ondata. """ finally:
#noinspection PyUnusedLocal """ @brief Data callback, in the pattern of the current subscriber code @param data Data packet/message matching subscription @retval None, may daisy chain output back into messaging system """ raise NotImplementedError('Must be implemented by subclass')
# =================================================================================
""" Factory to create Subscribers. """
""" Initializer. Sets default properties for calling the build method.
These default are overridden by specifying the same named keyword arguments to the build method.
@param xp_name Name of exchange point to use @param binding_key The binding key to use for the Subscriber. If specified, the queue will have this binding key bound to it. @param queue_name The queue name to use for the Subscriber. If specified, the queue may either exist or be created. If not specified, an anonymous queue is created. @param subscriber_type Specific derived Subscriber type to use. You can define a custom Subscriber derived class if you want to share the implementation across multiple Subscribers. If left None, the standard Subscriber class is used. @param process Process that Subscribers will be attached to. """
""" Creates a subscriber.
The parameters passed to this method take defaults that were set up when this SubscriberFactory was initialized. If None is specified for any of the parameters, or they are not filled out as keyword arguments, the defaults take precedence.
@param proc The process the subscriber should attach to. May be None to create an anonymous process contained in the Subscriber instance itself. @param xp_name Name of exchange point to use @param binding_key The binding key to use for the Subscriber. If specified, the queue will have this binding key bound to it. @param queue_name The queue name to use for the Subscriber. If specified, the queue may either exist or be created. If not specified, an anonymous queue is created. @param subscriber_type Specific derived Subscriber type to use. You can define a custom Subscriber derived class if you want to share the implementation across multiple Subscribers. If left None, the standard Subscriber class is used. @param handler A handler method to replace the Subscriber's ondata method. This is typically a bound method of the process owning this Subscriber, but may be any callable taking a data param. If this is left None, the subscriber_type must be set to a derived Subscriber that overrides the ondata method. @param process Process that Subscribers will be attached to. @param credentials Subscriber credentials (not currently used). """
process=process, credentials=credentials, *args, **kwargs)
# OOIION-4: making automatic registration of subscribers optional due to access speed
# brings the subscriber up to the same state as the process
|