Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

#!/usr/bin/env python 

 

""" 

@file ion/core/process/proc_manager.py 

@author Michael Meisinger 

@brief Process Manager for capability container 

""" 

 

import types 

 

from twisted.internet import defer 

from twisted.python.reflect import namedAny 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

 

from ion.core import ioninit 

from ion.core.exception import ConfigurationError 

from ion.core.intercept.interceptor import Interceptor 

from ion.core.process import process 

from ion.core.process.cprocess import ContainerProcess, IContainerProcess, Invocation 

from ion.util.state_object import BasicLifecycleObject 

import ion.util.procutils as pu 

 

class InterceptorSystem(Interceptor): 

    """ 

    Container interceptor system class. 

    """ 

 

    def __init__(self): 

        BasicLifecycleObject.__init__(self) 

 

        self.interceptors = {} 

        self.paths = {} 

 

    # Life cycle 

 

    @defer.inlineCallbacks 

    def on_initialize(self, config, *args, **kwargs): 

        """ 

        """ 

        #log.debug("Initializing InterceptorSystem from config: %s" % config) 

        yield self._init_system(config) 

        log.info("InterceptorSystem initialized: %s interceptors %s, %s paths loaded: %s" % ( 

            len(self.interceptors), self.interceptors.keys(), len(self.paths), self.paths.keys())) 

 

    def on_activate(self, *args, **kwargs): 

        """ 

        @retval Deferred 

        """ 

        return defer.succeed(None) 

 

    def on_terminate(self, *args, **kwargs): 

        """ 

        @retval Deferred 

        """ 

        return defer.succeed(None) 

 

    def on_error(self, cause= None, *args, **kwargs): 

        if cause: 

            log.error("Process error: %s" % cause) 

            pass 

        else: 

            raise RuntimeError("Illegal InterceptorSystem state change") 

            # TODO: A path is not a list, but like a FSM 

            # have priorities and alternative routes 

 

    # API 

    @defer.inlineCallbacks 

    def process(self, invocation): 

        """ 

        @param invocation container object for parameters 

        @retval invocation instance, may be modified 

        """ 

        pathname = invocation.path 

        path = self.paths.get(pathname, None) 

        if not path: 

            raise RuntimeError("Path %s unknown" % invocation.path) 

        for path_element in path: 

            invocation.path = pathname 

            intc = path_element['interceptor_instance'] 

            #log.debug("Process path %s step %s" % (invocation.path, path_element['name'])) 

            try: 

                invocation = yield defer.maybeDeferred(intc.process, invocation) 

            except Exception, ex: 

                log.exception("Error in interceptor path %s step %s" % ( 

                    invocation.path, path_element['name'])) 

                invocation.error(str(ex)) 

                raise ex 

 

            # Continuation 

            if invocation.status == Invocation.STATUS_DROP: 

                #log.debug("Process path %s step %s: DROP" % (invocation.path, path_element['name'])) 

                break 

            if invocation.status == Invocation.STATUS_DONE: 

                #log.debug("Process path %s step %s: DONE" % (invocation.path, path_element['name'])) 

                break 

        defer.returnValue(invocation) 

 

    # Helpers 

 

    @defer.inlineCallbacks 

    def _init_system(self, config): 

        if not config or not type(config) is dict: 

            raise ConfigurationError("InterceptorSystem config invalid %r" % config) 

        if not 'interceptors' in config: 

            raise ConfigurationError("InterceptorSystem config must have 'interceptors'") 

        interceptors = config['interceptors'] 

        for intname in interceptors: 

            int_config = interceptors[intname] 

            intcls = yield self._create_interceptor(intname, int_config) 

            if intname in self.interceptors: 

                raise ConfigurationError("InterceptorSystem interceptor %s already configured" % intname) 

            self.interceptors[intname] = intcls 

 

        if 'stack' in config: 

            istack = config['stack'] 

            out_path = self._create_intercept_path(istack) 

            in_path = self._reversed_intercept_path(out_path) 

            self.paths[Invocation.PATH_OUT] = out_path 

            self.paths[Invocation.PATH_IN] = in_path 

 

        if 'paths' in config: 

            raise NotImplementedError("Not implemented") 

 

    @defer.inlineCallbacks 

    def _create_interceptor(self, name, config): 

        #log.debug("Create Interceptor '%s' from config: %s" % (name, config)) 

        intcls_name = config['classname'] 

        intcls = namedAny(intcls_name) 

        if not IContainerProcess.implementedBy(intcls): 

            raise ConfigurationError("Interceptor '%s' class %s not an interceptor" % (name, intcls_name)) 

        intc = intcls(name) 

        intcargs = config.get('args', {}) 

        if not type(intcargs) is dict: 

            raise ConfigurationError("Interceptor '%s' args must be dict %r" % (name, intcargs)) 

        yield intc.initialize(**intcargs) 

        yield intc.activate() 

        #log.debug("Interceptor '%s' created and activated: %r" % (name, intc)) 

        defer.returnValue(intc) 

 

    def _create_intercept_path(self, config): 

        if not type(config) in (list, tuple): 

            raise ConfigurationError("InterceptorSystem stack must be list %r" % (config)) 

        path = [] 

        for path_elem in config: 

            if not type(path_elem) is dict: 

                raise ConfigurationError("InterceptorSystem stack element must be dict %r" % (path_elem)) 

            if not 'interceptor' in path_elem: 

                raise ConfigurationError("InterceptorSystem stack element must have interceptor %r" % (path_elem)) 

            pe_intc = path_elem['interceptor'] 

            intc = self.interceptors.get(pe_intc, None) 

            if not intc: 

                raise ConfigurationError("InterceptorSystem stack element interceptor %s not found" % (pe_intc)) 

 

            path_elem['interceptor_instance'] = intc 

            path_elem['name'] = path_elem.get('name', None) or intc.name 

            path.append(path_elem) 

        return path 

 

    def _reversed_intercept_path(self, int_path): 

        assert type(int_path) is list 

        return list(reversed(int_path))