Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

#!/usr/bin/env python 

 

""" 

@file ion/play/hello_resource.py 

@author David Stuebe 

@brief An example service definition that can be used as template for resource management. 

""" 

 

import ion.util.ionlog 

from ion.core.process.process import ProcessFactory 

from ion.core.process.service_process import ServiceProcess, ServiceClient 

from ion.core import ioninit 

from twisted.internet import defer 

 

import ion.services.coi.exchange.resource_wrapper as res_wrapper 

from ion.services.coi.exchange.resource_wrapper import ServiceHelper, ClientHelper 

from ion.services.coi.exchange.broker_controller import BrokerController 

from ion.services.coi.exchange.exchange_types import ExchangeTypes 

 

CONF = ioninit.config(__name__) 

log = ion.util.ionlog.getLogger(__name__) 

 

 

class EMSError(Exception): 

    """ 

    An error class for the ems... 

    """ 

 

class ExchangeManagementService(ServiceProcess): 

 

 

 

    # Declaration of service 

    declare = ServiceProcess.service_declare(name='exchange_management', 

                                             version='0.1.0', 

                                             dependencies=[]) 

 

    def __init__(self, *args, **kwargs): 

        ServiceProcess.__init__(self, *args, **kwargs) 

 

    @defer.inlineCallbacks 

    def slc_init(self): 

        log.info("ExchangeManagementService.slc_init(self)") 

        self.helper = ServiceHelper(self) 

        self.controller = BrokerController() 

        self.exchange_types = ExchangeTypes(self.controller) 

        yield self.controller.start() 

 

 

 

 

    @defer.inlineCallbacks 

    def slc_deactivate(self): 

        log.info("ExchangeManagementService.slc_terminate(self)") 

        yield self.controller.stop() 

 

 

    @defer.inlineCallbacks 

    def op_create_object(self, object, headers, msg): 

        """ 

        For testing purposes only.   

        """ 

        log.debug('op_create_object()') 

        object = yield self.helper.create_object( 

                    object, 

                    "TestObject", 

                    "This is not a valid system object." 

        ) 

        response = yield self.helper.push_object(object) 

        yield self.reply_ok(msg, response.resource_reference) 

 

 

    @defer.inlineCallbacks 

    def op_get_object(self, sha1, headers, msg): 

        """ 

        For testing purposes only.   

        """ 

        log.debug('op_get_object()') 

        object = yield self.helper.get_object(sha1) 

        yield self.reply_ok(msg, object) 

 

 

    # EXCHANGESPACE CRUD 

 

    @defer.inlineCallbacks 

    def op_create_exchangespace(self, exchangespace, headers, msg): 

        """ 

        Creates an ExchangeSpace distributed resource from the parameter  

        exchangespace.  The following restrictions are enforced:  request.name  

        must be defined, must be a uniquely named ExchangeSpace, and must  

        not already exist in the system.  request.description must not be 

        a trivial string and should provide a useful description of the 

        ExchangeSpace.         

        """ 

        log.debug('op_create_exchangespace()') 

 

        # Object creation 

        object = yield self.helper.create_object(exchangespace, "Name", "Description") 

 

        # Field validation 

        try: 

            name = exchangespace.configuration.name.strip() 

            description = exchangespace.configuration.description 

            if len(name) == 0: 

                raise res_wrapper.ExchangeManagementError("exchangespace.name is invalid") 

        except res_wrapper.ExchangeManagementError, err: 

            yield self.reply_err(msg, str(err)) 

            return 

 

        # Field population 

        object.name = name 

        object.description = description 

 

        # Response 

        response = yield self.helper.push_object(object) 

        log.debug('Created exchangespace.  id: %s', response.resource_reference.key) 

        yield self.reply_ok(msg, response) 

 

 

 

    @defer.inlineCallbacks 

    def op_create_exchangename(self, exchangename, headers, msg): 

        """ 

        Creates an ExchangeSpace distributed resource from the parameter  

        exchangespace.  The following restrictions are enforced:  request.name  

        must be defined, must be a uniquely named ExchangeSpace, and must  

        not already exist in the system.  request.description must not be 

        a trivial string and should provide a useful description of the 

        ExchangeSpace.         

         

        net.ooici.services.coi.exchange_management.proto defines 

        the following Exchange types: 

            PROCESS = 1; 

            SERVICE = 2; 

            EXCHANGE_POINT = 3; 

            QUEUE = 4; 

 

        """ 

        log.debug('op_create_exchangename()') 

 

        # Object creation 

        object = yield self.helper.create_object(exchangename, "Name", "Description") 

 

        # Field validation 

        try: 

            name = exchangename.configuration.name.strip() 

            description = exchangename.configuration.description 

            exchangespace = exchangename.configuration.exchangespace.strip() 

            if len(name) == 0: 

                raise res_wrapper.ExchangeManagementError("exchangename.name is required") 

            if len(exchangespace) == 0: 

                raise res_wrapper.ExchangeManagementError("exchangename.exchangespace is required") 

 

        except res_wrapper.ExchangeManagementError, err: 

            yield self.reply_err(msg, str(err)) 

            return 

 

        # Field population 

        object.name = name 

        object.description = description 

 

        #if object.type 

        yield self.exchange_types.create_exchange_point(exchangespace, name) 

 

 

        # Response 

        response = yield self.helper.push_object(object) 

        log.debug('Created exchangename.  id: %s', response.resource_reference.key) 

        yield self.reply_ok(msg, response) 

 

 

    @defer.inlineCallbacks 

    def op_create_queue(self, queue, headers, msg): 

        """ 

        Creates a queue and binds it to the appropriate namespace and exchange. 

        """ 

        q = queue.MessageObject 

        desc = q.configuration.description 

        qname = q.configuration.name 

        xn = q.configuration.exchangename 

        xs = q.configuration.exchangespace 

 

        self.controller.create_queue(xs + "." + xn) 

 

        log.debug('op_create_queue()') 

 

        # Object creation 

        yield self.reply_ok(msg, None) 

 

 

    @defer.inlineCallbacks 

    def op_create_binding(self, binding, headers, msg): 

        """ 

        Creates a queue and binds it to the appropriate namespace and exchange. 

        """ 

        b = binding.MessageObject 

        desc = b.configuration.description 

        bname = b.configuration.name 

        xn = b.configuration.exchangename 

        xs = b.configuration.exchangespace 

        topic = b.configuration.topic 

        q = b.configuration.queuename 

 

        self.controller.bind( 

                name = xs + "." + xn + "." + q, 

                exchange = xs + "." + xn, 

                routing_key = topic 

        ) 

 

        log.debug('op_create_queue()') 

 

        # Object creation 

        yield self.reply_ok(msg, None) 

 

 

    @defer.inlineCallbacks 

    def op_update_exchangename(self, request, headers, msg): 

        """ 

        Updates an ExchangeSpace distributed resource using the parameter  

        request.  

        """ 

        log.info('op_update_exchangename: ') 

        yield self.reply_ok(msg) 

 

 

    @defer.inlineCallbacks 

    def op_set_exchangename_life_cycle(self, request, headers, msg): 

        """ 

        Sets the ExchangeName resource life cycle.  All changes are subject  

        to ownership and permission check. 

        """ 

        log.info('op_set_exchangename_life_cycle: ') 

        yield self.reply_ok(msg) 

 

 

 

class ExchangeManagementClient(ServiceClient): 

 

    def __init__(self, proc=None, **kwargs): 

        self.helper = ClientHelper(proc) 

        if not 'targetname' in kwargs: 

            kwargs['targetname'] = "exchange_management" 

        ServiceClient.__init__(self, proc, **kwargs) 

 

 

    @defer.inlineCallbacks 

    def _create_object(self, msg): 

        """ 

        Used for testing purposes only. 

        """ 

        yield self._check_init() 

        (content, headers, msg) = yield self.rpc_send('create_object', msg) 

        defer.returnValue(content) 

 

 

    @defer.inlineCallbacks 

    def _get_object(self, msg): 

        """ 

        Used for testing purposes only. 

        """ 

        yield self._check_init() 

        (content, headers, msg) = yield self.rpc_send('get_object', msg) 

        defer.returnValue(content) 

 

 

 

    @defer.inlineCallbacks 

    def create_exchangespace(self, 

            name, 

            description 

            ): 

        """ 

        Creates an ExchangeSpace. 

        @param name 

                a string uniquely identifying the ExchangeSpace  

                in all scopes and contexts. 

        @param description  

                a free text string containing a description of  

                the ExchangeSpace. 

        """ 

        yield self._check_init() 

        msg = yield self.helper.create_object(res_wrapper.exchangespace_type) 

        msg.configuration.name = name 

        msg.configuration.description = description 

 

        (content, headers, msg) = yield self.rpc_send('create_exchangespace', msg) 

        defer.returnValue(content) 

 

 

    @defer.inlineCallbacks 

    def create_exchangename( 

            self, 

            name, 

            description, 

            exchangespace, 

            type='EXCHANGE_POINT' 

        ): 

            """ 

            Creates an ExchangeName. 

            @param name  

                    a string uniquely identifying the ExchangeName  

                    in the scope of the ExchangeSpace. 

            @param description  

                    a free text string containing a description of  

                    the ExchangeName. 

            @param exchangespace 

                    a string uniquely identifying the ExchangeSpace 

                    to which this ExchangeName will belong.  This  

                    must be previously defined with a call to  

                    create_exchangespace() 

            @param type 

                    a string that must contain one of the following 

                    constants:  'EXCHANGE_POINT', 'PROCESS', 'SERVICE'. 

            """ 

            yield self._check_init() 

 

            msg = yield self.helper.create_object(res_wrapper.exchangename_type) 

            msg.configuration.name = name 

            msg.configuration.description = description 

            msg.configuration.exchangespace = exchangespace 

            if type == 'EXCHANGE_POINT': 

                msg.configuration.type = msg.configuration.Type.EXCHANGE_POINT 

            elif type == 'PROCESS': 

                msg.configuration.type = msg.configuration.Type.PROCESS 

            elif type == 'SERVICE': 

                msg.configuration.type = msg.configuration.Type.SERVICE 

            else: 

                raise EMSError('Invalid type specified in create_exchangename operation') 

 

            (content, headers, msg) = yield self.rpc_send('create_exchangename', msg) 

            defer.returnValue(content) 

 

 

 

    @defer.inlineCallbacks 

    def create_queue( 

            self, 

            name, 

            description, 

            exchangespace, 

            exchangename 

            ): 

            """ 

            Creates a Queue. 

            @param name  

                    a string uniquely identifying the Queue  

                    in the scope of the ExchangeSpace and  

                    ExchangeName. 

            @param description  

                    a free text string containing a description of  

                    the ExchangeName. 

            @param exchangespace 

                    a string uniquely identifying the ExchangeSpace 

                    to which ExchangeName belongs.  This must be  

                    previously defined with a call to create_exchangespace() 

            @param exchangename 

                    a string uniquely identifying the ExchangeName 

                    to which this queue will be bound.  This must be  

                    previously defined with a call to create_exchangename() 

            """ 

            yield self._check_init() 

 

            msg = yield self.helper.create_object(res_wrapper.queue_type) 

            msg.configuration.name = name 

            msg.configuration.description = description 

            msg.configuration.exchangespace = exchangespace 

            msg.configuration.exchangename = exchangename 

 

            (content, headers, msg) = yield self.rpc_send('create_queue', msg) 

            defer.returnValue(content) 

 

 

    @defer.inlineCallbacks 

    def create_binding( 

            self, 

            name, 

            description, 

            exchangespace, 

            exchangename, 

            queuename, 

            topic 

        ): 

            """ 

            Creates a Binding. 

            @param name  

                    a string uniquely identifying the Queue  

                    in the scope of the ExchangeSpace and  

                    ExchangeName. 

            @param description  

                    a free text string containing a description of  

                    the ExchangeName. 

            @param exchangespace 

                    a string uniquely identifying the ExchangeSpace 

                    to which ExchangeName belongs.  This must be  

                    previously defined with a call to create_exchangespace() 

            @param exchangename 

                    a string uniquely identifying the ExchangeName 

                    to which this queue will be bound.  This must be  

                    previously defined with a call to create_exchangename() 

            """ 

            yield self._check_init() 

 

            msg = yield self.helper.create_object(res_wrapper.binding_type) 

 

            msg.configuration.name = name 

            msg.configuration.description = description 

            msg.configuration.exchangespace = exchangespace 

            msg.configuration.exchangename = exchangename 

            msg.configuration.queuename = queuename 

            msg.configuration.topic = topic 

 

            (content, headers, msg) = yield self.rpc_send('create_queue', msg) 

            defer.returnValue(content) 

 

 

 

factory = ProcessFactory(ExchangeManagementService)