Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

#!/usr/bin/env python 

 

""" 

@author Dorian Raymer 

@author Michael Meisinger 

@brief ION Exchange manager for CC. 

""" 

 

from twisted.internet import defer 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

 

from ion.core.messaging import messaging 

from ion.core.messaging.messaging import MessageSpace, ProcessExchangeSpace, Consumer 

from ion.util.state_object import BasicLifecycleObject 

 

DEFAULT_EXCHANGE_SPACE = 'magnet.topic' 

 

class ExchangeManager(BasicLifecycleObject): 

    """ 

    Manager class for capability container exchange management. 

    """ 

 

    def __init__(self, container): 

        BasicLifecycleObject.__init__(self) 

        self.container = container 

 

        # Container broker connection / vhost parameters 

        self.message_space = None 

 

        # Default exchange space 

        self.exchange_space = None 

 

    # Life cycle 

 

    def on_initialize(self, config, *args, **kwargs): 

        """ 

        """ 

        self.config = config 

 

        # Configure the broker connection 

        hostname = self.config['broker_host'] 

        port = self.config['broker_port'] 

        virtual_host = self.config['broker_vhost'] 

        username = self.config['broker_username'] 

        password = self.config['broker_password'] 

        credfile = self.config['broker_credfile'] 

        heartbeat = int(self.config['broker_heartbeat']) 

 

        if credfile: 

            username, password = open(credfile).read().split() 

 

        # Is a BrokerConnection instance (no action at this point) 

        self.message_space = MessageSpace(self, 

                                hostname=hostname, 

                                port=port, 

                                virtual_host=virtual_host, 

                                username=username, 

                                password=password, 

                                heartbeat=heartbeat) 

 

        return defer.succeed(None) 

 

    @defer.inlineCallbacks 

    def on_activate(self, *args, **kwargs): 

        """ 

        @retval Deferred 

        """ 

        # Initiate the broker connection 

        yield self.message_space.activate() 

        self.client = self.message_space.client 

        self.exchange_space = ProcessExchangeSpace( 

                message_space=self.message_space, 

                name=DEFAULT_EXCHANGE_SPACE) 

 

    @defer.inlineCallbacks 

    def on_terminate(self, *args, **kwargs): 

        """ 

        @retval Deferred 

        """ 

 

        # Close the broker connection 

        yield self.message_space.terminate() 

 

    def on_error(self, *args, **kwargs): 

        raise RuntimeError("Illegal state change for ExchangeManager") 

 

    # API 

 

    @defer.inlineCallbacks 

    def configure_messaging(self, name, config): 

        """ 

        """ 

        if config['name_type'] == 'worker': 

            name_type_f = messaging.worker 

        elif config['name_type'] == 'direct': 

            name_type_f = messaging.direct 

        elif config['name_type'] == 'fanout': 

            name_type_f = messaging.fanout 

        else: 

            raise RuntimeError("Invalid name_type: "+config['name_type']) 

 

        amqp_config = name_type_f(name) 

        amqp_config.update(config) 

        res = yield Consumer.name(self.exchange_space, amqp_config) 

        yield self.exchange_space.store.put(name, amqp_config) 

        defer.returnValue(res) 

 

    @defer.inlineCallbacks 

    def new_consumer(self, name_config): 

        """ 

        @brief create consumer 

        @retval Deferred that fires a consumer instance 

        """ 

        consumer = yield Consumer.name(self.exchange_space, name_config) 

        defer.returnValue(consumer) 

 

    def queue_exists(self, name): 

        """ 

        @brief Check if a queue of the given name exists. 

        @retval A Deferred that returns True or False 

        """ 

        return messaging.check_queue_exists(self.exchange_space.client, name) 

 

    def send(self, to_name, message_data, exchange_space=None, **kwargs): 

        """ 

        Sends a message 

        """ 

        exchange_space = exchange_space or self.container.exchange_manager.exchange_space 

        return exchange_space.send(to_name, message_data, **kwargs) 

 

    def connectionLost(self, reason): 

        """ 

        Event triggered by the messaging manager when the amqp client goes 

        down. 

        The relationship between the exchange manager and the messaging 

        manager is not well defined, so it is only via 'the force' that the 

        messaging manager will understand that it should notify the 

        exchange manager of things like connectionLost 

        """ 

        self.container.exchangeConnectionLost(reason)