Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

#!/usr/bin/env python 

 

""" 

@file ion/services/dm/distribution/eventmonitor.py 

@author Dave Foster <dfoster@asascience.com> 

@brief Event Monitoring Service 

""" 

 

from ion.core.object import object_utils 

from ion.core.messaging.message_client import MessageClient 

from twisted.internet import defer 

from ion.core.process.service_process import ServiceProcess, ServiceClient 

from ion.core.process.process import ProcessFactory 

from ion.services.dm.distribution.publisher_subscriber import SubscriberFactory 

from ion.services.dm.distribution.events import EventSubscriber 

from uuid import uuid4 

import time 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

 

EVENTS_EXCHANGE_POINT="events.topic" 

 

EVENTMONITOR_SUBSCRIBE_MESSAGE_TYPE     = object_utils.create_type_identifier(object_id=2335, version=1) 

EVENTMONITOR_SUBSCRIBE_RESPONSE_TYPE    = object_utils.create_type_identifier(object_id=2336, version=1) 

EVENTMONITOR_UNSUBSCRIBE_MESSAGE_TYPE   = object_utils.create_type_identifier(object_id=2337, version=1) 

EVENTMONITOR_GETDATA_MESSAGE_TYPE       = object_utils.create_type_identifier(object_id=2338, version=1) 

EVENTMONITOR_DATA_MESSAGE_TYPE          = object_utils.create_type_identifier(object_id=2339, version=1) 

EVENTMONITOR_SUBDATA_TYPE               = object_utils.create_type_identifier(object_id=2340, version=1) 

 

class EventMonitorService(ServiceProcess): 

 

    # Declaration of service 

    declare = ServiceProcess.service_declare(name='event_monitor', 

                                             version='0.1.0', 

                                             dependencies=["pubsub"]) 

 

    def slc_init(self, *args, **kwargs): 

        self._subs = {} 

        self._subfactory = SubscriberFactory(process=self) #, handler=self._handle_msg) 

        self._mc = MessageClient(proc=self) 

        ServiceProcess.slc_init(self, *args, **kwargs) 

 

    def _handle_msg(self, session_id, subid, msg): 

        log.debug("message for you sir %s %s %s" % (session_id, subid, str(msg['content'].datetime))) 

        assert self._subs.has_key(session_id) and self._subs[session_id]['subscribers'].has_key(subid) 

 

        msg['content'].Repository.persistent = True 

 

        self._subs[session_id]['subscribers'][subid]['msgs'].append(msg) 

 

    def _bump_timestamp(self, session_id): 

        assert self._subs.has_key(session_id) 

        curtime = time.time() 

        self._subs[session_id]['last_request_time'] = curtime 

        return curtime 

 

    @defer.inlineCallbacks 

    def op_subscribe(self, content, headers, msg): 

        """ 

        Requests a new subscription. 

        Expects the content to be an EVENTMONITOR_SUBSCRIBE_MESSAGE_TYPE with a session_id and information about the subscription. 

        """ 

 

        # extract message contents 

        session_id      = content.session_id 

        #subscriber_type = eval(content.subscriber_type) # TODO: safer plz 

        event_id        = content.event_id 

        origin          = content.origin 

 

        # create new subscription id 

        subid           = str(uuid4())[:6] 

 

        # create the subscriber 

        sub = yield self._subfactory.build(subscriber_type=EventSubscriber, 

                                           event_id=event_id, 

                                           origin=origin, 

                                           handler=lambda m: self._handle_msg(session_id, subid, m)) 

 

        # store this subscriber locally (TODO: for now) 

        if not self._subs.has_key(session_id): 

            self._subs[session_id] = { 'last_request_time' : '', 

                                       'subscribers' : {} } 

 

        self._subs[session_id]['subscribers'][subid] = { 'subscriber': sub, 'msgs': [] } 

        self._bump_timestamp(session_id) 

 

        # generate response 

        response = yield self._mc.create_instance(EVENTMONITOR_SUBSCRIBE_RESPONSE_TYPE) 

        response.session_id = session_id 

        response.subscription_id = subid 

 

        yield self.reply_ok(msg, response) 

 

    @defer.inlineCallbacks 

    def op_unsubscribe(self, content, headers, msg): 

        """ 

        Terminates an existing subscription. 

        Requires a session_id, and if no subscription_id is specified, will remove all subscribers for that session_id. 

        """ 

 

        # extract message contents 

        session_id      = content.session_id 

        subscription_id = content.subscription_id 

 

        # try to look it up 

        termsubs = [] 

        if self._subs.has_key(session_id): 

            if subscription_id is None: 

                termsubs.extend([y['subscriber'] for y in [x for x in self._subs[session_id]['subscribers'].values()]]) 

                del(self._subs[session_id]) 

            else: 

                if self._subs[session_id]['subscribers'].has_key(subscription_id): 

                    termsubs.append(self._subs[session_id]['subscribers'][subscription_id]['subscriber']) 

                    del(self._subs[session_id]['subscribers'][subscription_id]) 

 

        # terminate collected active subscribers 

        for sub in termsubs: 

            sub.terminate() 

 

        yield self.reply_ok(msg) 

 

    @defer.inlineCallbacks 

    def op_getdata(self, content, headers, msg): 

 

        # extract possible message contents 

        session_id      = content.session_id 

        timestamp       = content.timestamp 

        subscriber_ids  = content.subscriber_id  # may be empty 

 

        # generate response. We give back a nearly empty response if the session id does not exist 

        response = yield self._mc.create_instance(EVENTMONITOR_DATA_MESSAGE_TYPE) 

        response.session_id = session_id 

 

        if self._subs.has_key(session_id): 

            if not timestamp or len(timestamp) == 0: 

                timestamp = self._subs[session_id]['last_request_time'] 

                self._bump_timestamp(session_id) 

 

            try: 

                timestamp = float(timestamp) 

            except: 

                timestamp = 0.0 

 

            log.debug("get_data(): filtering against timestamp [%s]" % str(timestamp)) 

 

            for subid, subdata in self._subs[session_id]['subscribers'].iteritems(): 

 

                # skip if we have a list of sub ids to give back and this subid is not in the list 

                if len(subscriber_ids) > 0 and not subid in subscriber_ids: 

                    continue 

 

                dataobj = response.data.add() 

                dataobj.subscription_id = subid 

                dataobj.subscription_desc = subdata['subscriber']._binding_key #"none for now" 

                for event in [ev for ev in subdata['msgs'] if ev['content'].datetime >= timestamp]: 

                    link = dataobj.events.add() 

                    link.SetLink(event['content'].MessageObject) 

 

        yield self.reply_ok(msg, response) 

 

class EventMonitorServiceClient(ServiceClient): 

 

    def __init__(self, proc=None, **kwargs): 

        if not 'targetname' in kwargs: 

            kwargs['targetname'] = "event_monitor" 

        ServiceClient.__init__(self, proc, **kwargs) 

 

    @defer.inlineCallbacks 

    def subscribe(self, msg): 

        yield self._check_init() 

 

        (content, headers, rmsg) = yield self.rpc_send('subscribe', msg) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def unsubscribe(self, msg): 

        yield self._check_init() 

 

        (content, headers, rmsg) = yield self.rpc_send('unsubscribe', msg) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def getdata(self, msg): 

        yield self._check_init() 

 

        (content, headers, rmsg) = yield self.rpc_send('getdata', msg) 

        defer.returnValue(content) 

 

factory = ProcessFactory(EventMonitorService)