Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

#!/usr/bin/env python 

 

""" 

@file ion/interact/conversation.py 

@author Michael Meisinger 

@brief classes for using conversations and conversation types (aka protocols, 

    interaction patterns) 

""" 

 

from twisted.python import failure 

from twisted.python.reflect import namedAny 

from zope.interface import implements, Interface 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

 

from ion.core import ioninit 

from ion.core.exception import ConversationError, ConversationTimeoutError, ConversationUnexpectedError, ConversationFailureError 

from ion.util.state_object import FSMFactory, StateObject, BasicStates 

import ion.util.procutils as pu 

 

CONF = ioninit.config(__name__) 

CF_basic_conv_types = CONF['basic_conv_types'] 

 

# Conversation type id for no conversation use. 

CONV_TYPE_NONE = "none" 

 

class IConversationType(Interface): 

    """ 

    Interface for all conversation type instances 

    """ 

    def new_conversation(): 

        pass 

 

class IConversation(Interface): 

    """ 

    Interface for all conversation instances 

    """ 

 

class ConversationType(object): 

    """ 

    @brief Represents a conversation type. Also known as protocol, interaction 

        pattern, session type. Defines ID and roles of conversation. 

        Acts as factory for the Conversation instances of a specific type. 

    """ 

    implements(IConversationType) 

 

    # Role id for the default initator of this conv 

    DEFAULT_ROLE_INITIATOR = None 

 

    # Role id for the default counterparty of this conv 

    DEFAULT_ROLE_PARTICIPANT = None 

 

    # List of state ids for conversation final states 

    FINAL_STATES = () 

 

    def __init__(self, id): 

        """ 

        @param id    Unique registry identifier of a conversation type 

        """ 

        self.id = id 

 

    def new_conversation(self, **kwargs): 

        raise NotImplementedError("Not implemented") 

 

 

class Conversation(object): 

    """ 

    @brief An instance of a conversation type. Identifies the entities by name 

    that bind to roles. 

    """ 

    implements(IConversation) 

 

    def __init__(self, conv_type, conv_id): 

        """ 

        Creates a new conversation instance. 

 

        @param conv_id    Unique registry identifier of a conversation 

        @param conv_type  ConversationType instance 

        """ 

        self.conv_id = conv_id 

        self.conv_type = conv_type 

        self.protocol = conv_type.id 

        self.role_bindings = {} 

        self.local_role = None 

        self.local_process = None 

        self.local_fsm = None 

        # Holder for a Deferred for blocking (RPC style) send/receive 

        self.blocking_deferred = None 

        # Marks a timeout in the conversation processing 

        self.timeout = None 

        self.conv_log = [] 

 

    def bind_role_local(self, role_id, process): 

        self.bind_role(role_id, process.id) 

 

        self.local_role = role_id 

        self.local_process = process 

 

        # Create an instance of the local role ConversationRole/StateObject 

        role_spec = self.conv_type.roles[role_id] 

        self.local_fsm = role_spec.role_class() 

        self.local_fsm.local_process = process 

 

    def bind_role(self, role_id, process_id): 

        """ 

        @brief Binds a process to a role id 

        """ 

        assert not role_id in self.role_bindings, "Cannot bind role %s twice" % role_id 

 

        self.role_bindings[role_id] = process_id 

 

    def get_conv_log_str(self): 

        res = "CONV_LOG[type=%s, id=%s, state=%s, @process=%s, #messages=%s:\n" % ( 

            self.protocol, self.conv_id, self.local_fsm._get_state(), self.local_process.proc_name, len(self.conv_log)) 

        for msg_rec in self.conv_log: 

            (ts, mtype, cstate, mhdrs) = msg_rec 

            hstr = "%s -> %s %s:%s:%s; uid=%s, status=%s" % (mhdrs.get('sender',None), 

                    mhdrs.get('receiver',None), mhdrs.get('protocol',None), 

                    mhdrs.get('performative',None), mhdrs.get('op',None), 

                    mhdrs.get('user-id',None), mhdrs.get('status',None)) 

            mstr = " %d %s: %s >> %s\n" % (ts, mtype, hstr, cstate) 

            res = res + mstr 

        res = res + "]" 

        return res 

 

    def __str__(self): 

        return "Conversation(%s)" % self.__dict__ 

 

class RoleSpec(object): 

    """ 

    @brief Spec for a conversation role 

    """ 

    def __init__(self, role_id, role_class): 

        self.role_id = role_id 

        self.role_class = role_class 

 

#class RoleBinding(object): 

#    """ 

#    @brief Binds a process to a role in a conversation instance. 

#    """ 

#    def __init__(self, role_id, process=None, process_id=None): 

#        self.role_id = role_id 

#        self.process = process 

#        if self.process: 

#            self.process_id = self.process.id 

#        else: 

#            self.process_id = process_id 

 

class ConversationRole(StateObject): 

    """ 

    @brief A conversation as seen from one participant (=role binding). 

        Encapsulates a FSM that keeps track of the state of the conversation 

        of the participant. 

    """ 

    def __init__(self): 

        StateObject.__init__(self) 

        fsm = self.factory.create_fsm(self) 

        self._so_set_fsm(fsm) 

 

    def _so_process(self, event, *args, **kwargs): 

        log.debug("Processing Conversation event='%s' in state='%s'" % (event,self._get_state())) 

        d = StateObject._so_process(self, event, *args, **kwargs) 

        return d 

 

    def error(self, *args, **kwargs): 

        log.error("Conversation ERROR: Exception %r %r" % (args, kwargs)) 

        #return failure.Failure(ConversationFailureError()) 

 

    def unexpected(self, *args, **kwargs): 

        log.error("Conversation ERROR: UNEXPECTED MSG") 

        #return failure.Failure(ConversationUnexpectedError()) 

 

    def timeout(self, *args, **kwargs): 

        log.error("Conversation ERROR: TIMEOUT") 

        #return failure.Failure(ConversationTimeoutError()) 

 

class ConversationTypeSpec(object): 

    """ 

    Represents a conversation type specification. Base class for specific 

    specification languages, such as Scribble, MSC etc. 

    """ 

 

class ConversationTypeFSMFactory(FSMFactory): 

    """ 

    A factory for instantiating conversation type FSMs. 

    If there are only two participants to a conversation, the same FSM can be 

    used (with different action behavior) for the state of the participant 

    conversations. 

    """ 

 

    def create_fsm(self, target, memory=None): 

        fsm = FSMFactory.create_fsm(self, target, memory) 

        fsm.post_action = True 

        return fsm 

 

    def _create_action_func(self, target, action): 

        """ 

        @retval a function with a closure with the action name 

        """ 

        def action_target(fsm): 

            return target(action, fsm) 

        return action_target 

 

 

class ConversationManager(object): 

    """ 

    @brief Manages conversation types within a container 

    """ 

 

    # @todo CHANGE: Conversation ID counter 

    convIdCnt = 0 

 

    def __init__(self): 

        # All available conversation types registry 

        # Dict conv_type_id -> class name of Conversation subclass 

        self.conv_type_registry = dict(**CF_basic_conv_types) 

 

        # Dict of ConversationType instances 

        self.conv_types = {} 

 

        for (ctid,ctcls) in self.conv_type_registry.iteritems(): 

            ct_inst = self.load_conversation_type(ctid, ctcls) 

            self.conv_types[ctid] = ct_inst 

 

        log.debug("Loaded and instantiated %s conversation types: %s" % ( 

                    len(self.conv_types),self.conv_types.keys())) 

 

    def load_conversation_type(self, ct_id, ct_cls_name): 

        ct_class = namedAny(ct_cls_name) 

        if not IConversationType.implementedBy(ct_class): 

            raise ConversationError("ConversationType id=%s classname=%s does not implement IConversationType" % (ct_id, ct_cls_name)) 

 

        ct_inst = ct_class(id=ct_id) 

        return ct_inst 

 

    def get_conversation_type(self, conv_type_id): 

        ct_inst = self.conv_types.get(conv_type_id, None) 

        if ct_inst: 

            return ct_inst 

 

        # Trying to load again 

        ct_class_name = self.conv_type_registry.get(conv_type_id, None) 

        if not ct_class_name: 

            raise ConversationError("ConversationType %s not registered" % conv_type_id) 

 

        ct_inst = self.load_conversation_type(conv_type_id, ct_class_name) 

        self.conv_types[conv_type_id] = ct_inst 

        return ct_inst 

 

    def create_conversation_id(self, prefix=''): 

        # Returns a new unique conversation id 

        self.convIdCnt += 1 

        convid = str(prefix) + "#" + str(self.convIdCnt) 

        return convid 

 

    def new_conversation(self, conv_type_id, conv_id=None): 

        ct_inst = self.get_conversation_type(conv_type_id) 

        conv_id = conv_id or self.create_conversation_id() 

 

        conv_inst = ct_inst.new_conversation(conv_type=ct_inst, conv_id=conv_id) 

        if not IConversation.providedBy(conv_inst): 

            raise ConversationError("Conversation instance %r from ConvType id=%s does not provide IConversation" % (conv_inst, conv_type_id)) 

 

        return conv_inst 

 

conv_mgr_instance = ConversationManager() 

 

class ProcessConversationManager(object): 

    """ 

    @brief Oversees a set of conversations, e.g. within a process instance 

    """ 

 

    def __init__(self, process): 

        self.process = process 

        self.conversations = {} 

        self.conv_mgr = conv_mgr_instance 

 

    def msg_send(self, message): 

        """ 

        @brief Trigger the FSM for a to-be-sent message and delegate all checking 

            to the callback action function 

        @param message An in-memory standard message object 

        """ 

        conv = self.get_conversation(message['headers']['conv-id']) 

        perf = message['performative'] 

        if conv and conv.local_fsm: 

            log.debug("msg_send(): Processing performative '%s'" % perf) 

            return conv.local_fsm._so_process(perf, message) 

        else: 

            log.debug("msg_send(): NO FSM. Ignoring performative '%s'" % perf) 

 

    def msg_received(self, message): 

        """ 

        @brief Trigger the FSM for a received message and delegate all processing 

            to the callback action function 

        @param message An in-memory standard message object 

        """ 

        #log.debug("msg_received(): %s" % message) 

        conv = message['conversation'] 

        perf = message['performative'] 

        #log.debug("msg_received(): Processing performative '%s'" % perf) 

        return conv.local_fsm._so_process(perf, message) 

 

    def create_conversation_id(self): 

        return self.conv_mgr.create_conversation_id(prefix=self.process.id.full) 

 

    def new_conversation(self, conv_type_id, conv_id=None): 

        conv_id = conv_id or self.create_conversation_id() 

        conv_inst = self.conv_mgr.new_conversation(conv_type_id, conv_id) 

        self.conversations[conv_inst.conv_id] = conv_inst 

        return conv_inst 

 

    def get_conversation(self, conv_id): 

        return self.conversations.get(conv_id, None) 

 

    def get_or_create_conversation(self, conv_id, message, initiator=False): 

        """ 

        @brief Gets cached Conversation instance by conv-id header or creates 

            new instance for for type by protocol header. 

        @param conv_id the conversation id extracted from a message 

        @param message the standard message callback object 

        @param initiator True of this message is being sent, False if received 

        """ 

        conv = self.conversations.get(conv_id, None) 

 

        # If not existing, create new Conversation instance based on protocol header 

        if not conv: 

            conv_type = message['headers'].get('protocol', 'generic') 

 

            log.debug("[%s] NEW local conversation from conv-id=%s: type=%s" % ( 

                    self.process.proc_name, conv_id, conv_type)) 

            conv = self.new_conversation(conv_type, conv_id) 

 

            # Bind roles 

            sender = message['headers'].get('sender', None) 

            if initiator: 

                conv.bind_role_local(conv.conv_type.DEFAULT_ROLE_INITIATOR, self.process) 

                conv.bind_role(conv.conv_type.DEFAULT_ROLE_PARTICIPANT, sender) 

                log.debug("Binding roles initiator(local)=%s, participant=%s" % (self.process.id, sender)) 

            else: 

                conv.bind_role(conv.conv_type.DEFAULT_ROLE_INITIATOR, sender) 

                conv.bind_role_local(conv.conv_type.DEFAULT_ROLE_PARTICIPANT, self.process) 

                log.debug("Binding roles initiator=%s, participant(local)=%s" % (sender, self.process.id)) 

 

        return conv 

 

    def log_conv_message(self, conv, message, msgtype): 

        # Tuple of Timestamp (MS), type, message 

        if conv is None: 

            return 

        hdrs = message.get('headers',{}) 

        if hdrs and type(hdrs) is dict: 

            mhdrs = hdrs.copy() 

            if 'content' in mhdrs: 

                del mhdrs['content'] 

        else: 

            mhdrs = {} 

        msg_rec = (pu.currenttime_ms(), msgtype, conv.local_fsm._get_state(), mhdrs) 

        conv.conv_log.append(msg_rec) 

 

    def check_conversation_state(self, conv): 

        """ 

        @brief Check a conversation state after an event (send, receive) for 

            final and error state. 

        """ 

        if conv is None: 

            return 

        conv_id = conv.conv_id 

        #log.debug("check_conversation_state(), conv=%s, conv_id=%s, state=%s" % (conv, conv_id, conv.local_fsm._get_state())) 

        # Check for final state 

        if conv.local_fsm._get_state() in conv.conv_type.FINAL_STATES: 

            del self.conversations[conv_id] 

            log.info("Conversation FINAL: id=%s. Active conversations: %s" % ( 

                conv_id, len(self.conversations))) 

            # Removed for OOIION-335 Logging message very confusing 

            # log.info("Conversation FINAL log:\n%s" % (conv.get_conv_log_str())) 

 

        # Create a tombstone for later messages and timeouts with this conv_id 

 

        # GC tombstones and conversations