Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

#!/usr/bin/env python 

 

""" 

@file ion/core/process/proc_manager.py 

@author Michael Meisinger 

@brief Process Manager for capability container 

""" 

 

import types 

 

from twisted.internet import defer 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

 

from ion.core import ioninit 

from ion.core.process import process 

from ion.core.process.process import Process 

from ion.core.process.process import IProcess, ProcessDesc, ProcessInstantiator 

from ion.core.data.store import Store 

from ion.util.state_object import BasicLifecycleObject 

import ion.util.procutils as pu 

 

class ProcessManager(BasicLifecycleObject): 

    """ 

    Manager class for capability container process management. 

    """ 

 

    def __init__(self, container): 

        BasicLifecycleObject.__init__(self) 

        self.container = container 

        self.supervisor = None 

 

        # TEMP: KVS pid (str) -> Process Instance 

        self.process_registry = Store() 

        self.process_registry.kvs = {} # Give it its own backend... 

 

    # Life cycle 

 

    def on_initialize(self, config, *args, **kwargs): 

        """ 

        """ 

        self.config = config 

        return defer.succeed(None) 

 

    def on_activate(self, *args, **kwargs): 

        """ 

        @retval Deferred 

        """ 

        return defer.succeed(None) 

 

    def on_terminate(self, *args, **kwargs): 

        """ 

        @retval Deferred 

        """ 

        if self.supervisor and self.supervisor._get_state() != 'TERMINATED': 

            return self.supervisor.shutdown() 

        else: 

            return defer.succeed(None) 

 

    def on_error(self, *args, **kwargs): 

        raise RuntimeError("Illegal state change for ProcessManager") 

 

    # API 

 

    @defer.inlineCallbacks 

    def spawn_processes(self, procs, sup=None): 

        """ 

        Spawns a list of processes. 

        @param procs  list of processes (as description dict) to start up 

        @param sup  spawned Process instance acting as supervisor 

        @retval Deferred -> Process instance 

        """ 

        children = [] 

        for procDef in procs: 

            child = ProcessDesc(**procDef) 

            children.append(child) 

 

        if sup == None: 

            sup = yield self.create_supervisor() 

 

        assert IProcess.providedBy(sup), "Parent must provide IProcess" 

        assert sup._get_state() in ("READY", "ACTIVE"), "Illegal parent process state" 

 

        log.info("Spawning %s child processes for sup=[%s]" % (len(children), sup.proc_name)) 

        for child in children: 

            child_id = yield sup.spawn_child(child) 

 

        #log.debug("process_ids: "+ str(process.procRegistry.kvs)) 

 

        defer.returnValue(sup) 

 

    @defer.inlineCallbacks 

    def spawn_process(self, procdesc, parent, node=None, activate=True): 

        """ 

        @brief Spawns a process description with the initialized attributes. 

        @param procdesc a ProcessDesc instance 

        @param parent the process instance that should be set as supervisor 

        @param node the container id where process should be spawned; None for local 

        @retval Deferred -> Id with process id 

        """ 

        if not parent: 

            parent = yield self.create_supervisor() 

            procdesc.sup_process = parent 

        assert isinstance(procdesc, ProcessDesc), "procdesc must be ProcessDesc" 

        assert IProcess.providedBy(parent), "parent must be IProcess" 

 

        # The supervisor process id, system name and proc name are provided 

        # as spawn args, in addition to any give arguments. 

        spawnargs = procdesc.spawn_args or {} 

        spawnargs['proc-name'] = procdesc.proc_name 

        spawnargs['sup-id'] = parent.id.full 

        spawnargs['sys-name'] = ioninit.sys_name 

 

        log.info('Spawning name=%s on node=%s' % (procdesc.proc_name, procdesc.proc_node)) 

        if node: 

            raise RuntimeError('Cannot spawn %s on node=%s (yet)' % ( 

                    procdesc.proc_class, procdesc.proc_node)) 

        else: 

            process = yield self.spawn_process_local( 

                    module=procdesc.proc_module, 

                    spawnargs=spawnargs, 

                    activate=activate) 

 

            defer.returnValue(process.id) 

 

    @defer.inlineCallbacks 

    def spawn_process_local(self, module, space=None, spawnargs=None, activate=True): 

        """ 

        @brief Spawn a process from a module, in the local container 

        @param space is message space or None for container default space 

        """ 

        if not space: 

            space = ioninit.container_instance.exchange_manager.message_space 

        if spawnargs == None: 

            spawnargs = {} 

 

        # Importing process module 

        proc_mod = pu.get_module(module) 

 

        process = yield ProcessInstantiator.spawn_from_module( 

                module=proc_mod, 

                spawnargs=spawnargs, 

                container=self.container, 

                activate=activate) 

        yield self.register_local_process(process) 

 

        defer.returnValue(process) 

 

    # Sequence number of supervisors 

    sup_seq = 0 

 

    @defer.inlineCallbacks 

    def create_supervisor(self): 

        """ 

        Creates a supervisor process. There is only one root supervisor. 

        @retval Deferred -> supervisor Process instance 

        """ 

        if self.supervisor: 

            defer.returnValue(self.supervisor) 

 

        # Makes the boostrap a process 

        log.info("Spawning supervisor") 

        if self.sup_seq == 0: 

            supname = "bootstrap" 

        else: 

            supname = "supervisor."+str(self.sup_seq) 

 

        spawnargs = {'proc-name':supname} 

        sup = process.factory.build(spawnargs) 

        supId = yield sup.spawn() 

        yield process.procRegistry.put(supname, str(supId)) 

        self.sup_seq += 1 

        self.supervisor = sup 

        defer.returnValue(sup) 

 

    @defer.inlineCallbacks 

    def activate_process(self, parent, pid): 

        process = yield self.get_local_process(pid) 

        if process: 

            yield process.activate() 

 

    @defer.inlineCallbacks 

    def terminate_process(self, parent, pid): 

        # Must send this request from the backend receiver 

        (content, headers, msg) = yield parent.rpc_send(str(pid), 

                                                'terminate', {}, {'quiet':True}) 

        defer.returnValue(headers) 

 

    def register_local_process(self, process): 

        """ 

        @retval Deferred 

        """ 

        assert IProcess.providedBy(process), "process must be IProcess" 

        return self.process_registry.put(process.id.full, process) 

 

    def unregister_local_process(self, pid): 

        """ 

        @retval Deferred 

        """ 

        if not pid: 

            return defer.succeed(None) 

        else: 

            return self.process_registry.remove(str(pid)) 

 

    def get_local_process(self, pid): 

        """ 

        @retval Deferred -> IProcess instance 

        """ 

        if not pid: 

            return defer.succeed(None) 

        else: 

            return self.process_registry.get(str(pid))