Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

""" 

AMQP Messaging Client Protocol  

 

These are extensions and fixes to the txamqp library 

""" 

 

import os 

from time import time 

 

from twisted.internet import defer 

from twisted.internet import protocol 

from twisted.python import failure 

 

from txamqp import spec 

from txamqp.content import Content 

from txamqp.client import TwistedDelegate 

from txamqp.protocol import AMQClient, AMQChannel, Frame 

from txamqp.queue import TimeoutDeferredQueue 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

from ion.core import messaging 

SPEC_PATH = os.path.join(messaging.__path__[0], 'amqp0-8.xml') 

 

class IDPool(object): 

    """ 

    Create a pool of IDs to allow reuse. The "new_id" function generates the next 

    valid ID from the previous one. If not given, defaults to incrementing an integer. 

    """ 

 

    def __init__(self, new_id=None): 

        if new_id is None: new_id = lambda x: x + 1 

 

        self.ids_in_use = set() 

        self.ids_free = set() 

        self.new_id = new_id 

        self.last_id = 0 

 

    def get_id(self): 

        if len(self.ids_free) > 0: 

            return self.ids_free.pop() 

 

        self.last_id = id_ = self.new_id(self.last_id) 

        self.ids_in_use.add(id_) 

        return id_ 

 

    def release_id(self, the_id): 

        if the_id in self.ids_in_use: 

            self.ids_in_use.remove(the_id) 

            self.ids_free.add(the_id) 

 

 

class HeartbeatExpired(Exception): 

    """ 

    """ 

 

class ChannelWithCallback(AMQChannel): 

 

    def __init__(self, id, outgoing, client): 

        AMQChannel.__init__(self, id, outgoing) 

        self.deliver_callback = lambda dev_null: dev_null 

        self.client = client 

 

    def set_consumer_callback(self, callback): 

        self.deliver_callback = callback 

 

    def close(self, reason): 

        AMQChannel.close(self, reason) 

        self.client._channel_closed(self.id) 

 

    def channel_close(self): 

        def close_cb(r): 

            self.close(r) 

            return r 

        d = super(ChannelWithCallback, self).channel_close() 

        d.addCallback(close_cb) 

        return d 

 

 

class AMQPProtocol(AMQClient): 

    """ Improvements to the txamqp implementation. 

    """ 

 

    channelClass=ChannelWithCallback 

    #next_channel_id = 0 

    closed = True 

 

    def __init__(self, *args, **kwargs): 

        AMQClient.__init__(self, *args, **kwargs) 

        self.id_pool = IDPool() 

 

    def channel(self, id=None): 

        """Overrides AMQClient. Changes:  

            1) no need to return deferred. The channelLock doesn't protect 

            against any race conditions; the channel reference is returned, 

            so any number of those references could exist already.  

            2) auto channel numbering 

            3) replace deferred queue for basic_deliver(s) with simple 

               buffer(list) 

        """ 

        if id is None: 

            #self.next_channel_id += 1 

            #id = self.next_channel_id 

            id = self.id_pool.get_id() 

        try: 

            ch = self.channels[id] 

        except KeyError: 

            ch = self.channelFactory(id, self.outgoing, self) 

            self.channels[id] = ch 

        return ch 

 

    def _channel_closed(self, id): 

        if self.channels.has_key(id): 

            del self.channels[id] 

            self.id_pool.release_id(id) 

 

    def queue(self, key): 

        """ This is the channel basic_deliver queue 

        """ 

        try: 

            q = self.queues[key] 

        except KeyError: 

            q = TimeoutDeferredQueue() 

            self.queues[key] = q 

        return q 

 

    def connectionMade(self): 

        self.closed = False 

        AMQClient.connectionMade(self) 

 

    def processFrame(self, frame): 

        ch = self.channel(frame.channel) 

        if frame.payload.type == Frame.HEARTBEAT: 

            self.lastHBReceived = time() 

        else: 

            ch.dispatch(frame, self.work) 

        if self.heartbeatInterval > 0: 

            self.reschedule_checkHB() 

 

    def frameLengthExceeded(self): 

        """ 

        """ 

 

    def checkHeartbeat(self): 

        if self.checkHB.active(): 

            self.checkHB.cancel() 

        log.critical('AMQP Heartbeat expired on client side') 

        self.transport.loseConnection(failure.Failure(HeartbeatExpired('Heartbeat expired.'))) 

 

 

class ConnectionCreator(object): 

    """Create AMQP Client. 

    The AMQP Client uses one persistent connection, so a Factory is not 

    necessary. 

 

    Client Creator is initialized with AMQP Broker configuration. 

 

    ConnectTCP is called with TCP configuration. 

    """ 

 

    protocol = AMQPProtocol 

    spec_cache = {} 

 

    def __init__(self, reactor, username='guest', password='guest', 

                                vhost='/', delegate=None, 

                                spec_path=SPEC_PATH, 

                                heartbeat=0): 

        self.reactor = reactor 

        self.username = username 

        self.password = password 

        self.vhost = vhost 

 

        # Cache the specs for enormous speedups on subsequent connections 

        cache = ConnectionCreator.spec_cache 

        if spec_path in cache: 

            self.spec = cache[spec_path] 

        else: 

            self.spec = cache.setdefault(spec_path, spec.load(spec_path)) 

 

        self.heartbeat = heartbeat 

        if delegate is None: 

            delegate = TwistedDelegate() 

        self.delegate = delegate 

        self.connector = None 

 

    def connectTCP(self, host, port, timeout=30, bindAddress=None): 

        """Connect to remote Broker host, return a Deferred of resulting protocol 

        instance. 

        """ 

        d = defer.Deferred() 

        p = self.protocol(self.delegate, 

                                    self.vhost, 

                                    self.spec, 

                                    heartbeat=self.heartbeat) 

        p.factory = self 

        f = protocol._InstanceFactory(self.reactor, p, d) 

        self.connector = self.reactor.connectTCP(host, port, f, timeout=timeout, 

                bindAddress=bindAddress) 

        def auth_cb(conn): 

            d = conn.authenticate(self.username, self.password) 

            d.addCallback(lambda _: conn) 

            return d 

        d.addCallback(auth_cb) 

        return d