Coverage for ion/core/messaging/messaging : 85.86%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
|
#!/usr/bin/env python
@author Dorian Raymer @author Michael Meisinger @brief AMQP configuration factories as a function of application/service level names. """
""" This class defines handlers for asynchronous amqp events (events the broker can raise at any time). """
""" """
"""implement this to handle messages that could not be delivered to a queue or consumer """ log.warning("""basic.return event received! This means the broker\ could not guarantee reliable delivery for a message sent\ from a process in this container.""")
"""implement this to handle a broker flow control request """ log.warning('channel.flow event received')
"""implement this to handle a broker channel.alert notification. """ log.warning('channel.alert event received')
"""The AMQClient protocol calls this as a result of a connectionLost event. The TwistedDelegate.close method finishes shutting off the client, and we get a chance to react to the event here. """
""" Represents a connection to a broker vhost with credentials. Follows a basic life cycle. """
virtual_host='/', username='guest', password='guest', heartbeat=0): """ @param exchange_manager So we can link our states. If I'm in a bad state, then I need to notify ExchangeManager. """
# Immediately transition to READY state
""" Nothing to do here. What needed to be done was done in __init__ """ #self.connection = connection.BrokerConnection(*args, **kwargs) #self.closing = False # State that determines if we expect a close event
#assert not self.connection._connection, "Already connected to broker" # interface...one day vhost=self.virtual_host, delegate=amqpEvents, heartbeat=self.heartbeat, username=self.username, password=self.password)
raise NotImplementedError("Not implemented")
"""Close the AMQP broker connection by calling connection_close on channel 0""" return
#raise RuntimeError("Illegal state change for MessageSpace") self.exchange_manager.error(*args, **kwargs)
""" @brief The processing of a message has the ability to raise a FatalError exception. This lets the container know that the process should be killed. Exceptions other than FatalError are ignored. @note A configurable list of Exceptions could be added to this class, to enable more types of fatal exceptions. """ log.warning('MessageSpace delivery error') log.warning(str(reason)) if reason.check(FatalError): self.exchange_manager.container.fatalError(reason)
""" AMQP Client triggers this event when it has a conenctionLost event itself. """ self.exchange_manager.connectionLost(reason) # perpetuate the event
def __repr__(self): params = ['hostname', 'userid', 'password', 'virtual_host', ] s = "MessageSpace(hostname=%s,port=%s,virtual_host=%s" % (self.hostname, self.port, self.virtual_host) s += ")" return s
""" give it a name and a connection """ #self.connection = self.message_space.connection
# @todo remove: Store of messaging names
""" Exchange Space with support for only process participants. Such participants can uniquely identified by name. Services and fanout names fall into the same category. """
""" Exchange Space with support for topic trees (Exchange Points). Such participants have a name but """
""" High-level messaging name. Encapsulates messaging (amqp) details
Might also retain name config dict OR might just be the config """
""" Represents an AMQP exchange (name and type) in the context of an Exchange Space.
Currently, the Container has a default space (vhost '/'). An exchange point is just a well known exchange processes can send messages through. It abstracts away amqp details like exchange type, persistance options, etc. The Container has a default exchange called 'magnet.topic'. The default exchange is a topic exchange; topic is generaly useful and flexible
The amqp exchange parameters stored in this class only have to do with with the amqp method exchange_declare. """
""" @param connection the broker connection. It's called space becasue it is also a natural namespace, but not yet the one Magnet will use for it's "Exchange Space" """ 'exchange_type':self.exchange_type, 'durable':self.durable, 'auto_delete':self.auto_delete, }
################################################################## ## Transition code copied from carrot
"""The message has already been acknowledged."""
"""A message received by the broker. This re-presents the amqp basic content message from txamp. This is kind of a vestige from carrot. This also adds functionality to the received message; actions take place in the context of an amqp channel. """
"content type", "content encoding", "headers", "delivery mode", "priority", "correlation id", "reply to", "expiration", "message id", "timestamp", "type", "user id", "app id", "cluster id", ): amqp_message.content.properties.get(attr_name, None)) """Deserialize the message body, returning the original python structure sent by the publisher.""" self.content_encoding)
def payload(self): """The decoded message."""
"""Acknowledge this message as being processed., This will remove the message from the queue.
:raises MessageStateError: If the message has already been acknowledged/requeued/rejected.
""" raise self.MessageStateError( "Message already acknowledged with state: %s" % self._state)
"""Reject this message.
The message will be discarded by the server.
:raises MessageStateError: If the message has already been acknowledged/requeued/rejected.
""" if self.acknowledged: raise self.MessageStateError( "Message already acknowledged with state: %s" % self._state) d = self.channel.basic_reject(self.delivery_tag, requeue=False) self._state = "REJECTED" return d
"""Reject this message and put it back on the queue.
You must not use this method as a means of selecting messages to process.
:raises MessageStateError: If the message has already been acknowledged/requeued/rejected.
""" if self.acknowledged: raise self.MessageStateError( "Message already acknowledged with state: %s" % self._state) d = self.channel.basic_reject(self.delivery_tag, requeue=True) self._state = "REQUEUED" return d
def acknowledged(self):
## ##############################################################
def check_queue_exists(client, name):
""" Consumer for AMQP. """
exchange=None, routing_key=None, exchange_type="direct", durable=False, exclusive=False, auto_delete=True, no_ack=True, binding_key=None, **kwargs): # **kwargs is a sloppy hack
def new(cls, client, **kwargs): """ Use this creator for deferred instantiation. inst.declare returns a deferred that will fire with the Consumer instance. """ chan = client.channel() d = chan.channel_open() def instantiate(result, chan, **kwargs): inst = cls(chan, **kwargs) return inst.declare() d.addCallback(instantiate, chan, **kwargs) return d
def declare(self): """Declares the queue, the exchange and binds the queue to the exchange."""
# In the current design, exchange is always defined type=self.exchange_type, durable=self.durable, auto_delete=self.auto_delete)
# Specific queue name given durable=self.durable, exclusive=self.exclusive, auto_delete=self.auto_delete) else: # Generate internal unique name durable=self.durable, exclusive=self.exclusive, auto_delete=self.auto_delete) # remember the queue name the broker made for us
exchange=self.exchange, routing_key=routing_key, arguments=arguments)
global_=False)
def name(cls, ex_space, config): """ @brief configure name with out creating a consumer yet. @param ex_space is the broker connection, and (in the current design) the exchange information. @param config is a dict of amqp options that __init__ extracts. """
no_ack=self.no_ack, consumer_tag=self.consumer_tag, nowait=False)
if self._consuming: self._consuming = False return self.channel.basic_cancel(consumer_tag=self.consumer_tag) return defer.succeed(None)
""" Close the amqp channel, deactivating the Consumer. """ return defer.succeed(None)
""" Publisher for one message
delivery_modes: 1 - non persistent 2 - persistent """
routing_key=None, exchange_type="direct", delivery_mode=1, durable=False, exclusive=False, auto_delete=True, immediate=False, mandatory=False, **kwargs): # **kwargs is a sloppy hack
def declare(self):
type=self.exchange_type, durable=self.durable, auto_delete=self.auto_delete)
def name(cls, ex_space, config): """ Factory to create new Publisher instance from given params """ raise RuntimeError("Publisher.name(): No config given")
# Are we doing an exchange declare on every send???
content_type=None, content_encoding=None, serializer=None, reply_to=None): """With any data, serialize it and encapsulate it in a AMQP message with the proper headers set."""
# No content_type? Then we're serializing the data internally. message_data) = serialization.encode(message_data, serializer=serializer) else: # If the programmer doesn't want us to serialize, # make sure content_encoding is set. if isinstance(message_data, unicode): if not content_encoding: content_encoding = 'utf-8' message_data = message_data.encode(content_encoding)
# If they passed in a string, we can't know anything # about it. So assume it's binary data. elif not content_encoding: content_encoding = 'binary'
priority=priority, content_type=content_type, content_encoding=content_encoding, reply_to=reply_to)
content_type=None, content_encoding=None, headers=None, reply_to=None, correlation_id=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None): """Encapsulate data into a AMQP message. This method should be reconciled with interceptor functionality and the ion message format. """
'content type':content_type, 'content encoding':content_encoding, 'application headers':headers, 'delivery mode':delivery_mode, 'priority':priority, 'correlation id':correlation_id, 'reply to':reply_to, 'expiration':expiration, 'message id':message_id, 'timestamp':timestamp, 'type':type, 'user id':user_id, 'app id':app_id, 'cluster id':cluster_id, }
mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, reply_to=None, headers=None, serializer=None): """Send a message.
:keyword content_type: The messages content_type. If content_type is set, no serialization occurs as it is assumed this is either a binary object, or you've done your own serialization. Leave blank if using built-in serialization as our library properly sets content_type.
:keyword content_encoding: The character set in which this object is encoded. Use "binary" if sending in raw binary objects. Leave blank if using built-in serialization as our library properly sets content_encoding.
"""
delivery_mode=delivery_mode, content_type=content_type, content_encoding=content_encoding, serializer=serializer, reply_to=reply_to) exchange=self.exchange, routing_key=routing_key, mandatory=self.mandatory, immediate=self.immediate)
""" Close the amqp channel, deactivating the Consumer. """ return defer.succeed(None)
'queue' : name, 'binding_key' : name, 'exclusive' : False, 'mandatory' : True, 'no_ack' : False, 'auto_delete' : True, 'routing_key' : name, 'immediate' : False, }
'queue' : name, 'binding_key' : name, 'exclusive' : True, 'mandatory' : True, 'no_ack' : False, 'auto_delete' : True, 'routing_key' : name, 'immediate' : False, }
'queue' : '', 'binding_key' : name, 'exclusive' : True, 'mandatory' : True, 'no_ack' : False, 'auto_delete' : True, 'routing_key' : name, 'immediate' : False, }
#def consume_on(name, config_factory=''): # """ # @param name Name others send messages to (queue name) # @param type messaging pattern; Experimental way to configure message system. # @brief Experimental mechanism for applications to create more # sophisticated message patterns. # @retval defer.Deferred that fires a consumer instance # # notes: # Create a consumer based on an existing queue/ messaging pattern set up in # the broker. # # """ # amqp_conf = config_factory(name) # consumer = messaging.Consumer(ioninit.container_instance.exchange_manager.message_space.connection, **amqp_conf) # yield consumer.declare() |