Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

#!/usr/bin/env python 

 

""" 

@file ion/integration/ais/manage_data_resource_subscription/manage_data_resource_subscription.py 

@author Dave Everett, Bill Bollenbacher 

@brief The worker class that implements the subscribeDataResource function for the AIS  (workflow #105) 

""" 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

import logging 

from twisted.internet import defer 

import time 

 

from ion.core.exception import ReceivedApplicationError, ApplicationError 

 

from ion.services.coi.resource_registry.association_client import AssociationClient, AssociationClientError 

from ion.services.coi.datastore_bootstrap.ion_preload_config import HAS_A_ID, \ 

                                                                    TYPE_OF_ID, \ 

                                                                    DISPATCHER_RESOURCE_TYPE_ID 

 

from ion.services.coi.resource_registry.resource_client import ResourceClientError 

 

from ion.services.dm.distribution.publisher_subscriber import PublisherFactory 

from ion.services.dm.distribution.events import NewSubscriptionEventPublisher, DelSubscriptionEventPublisher 

from ion.services.dm.inventory.association_service import AssociationServiceClient 

from ion.services.dm.inventory.association_service import PREDICATE_OBJECT_QUERY_TYPE, IDREF_TYPE 

from ion.util.iontime import IonTime 

 

from ion.integration.ais.notification_alert_service import NotificationAlertServiceClient 

 

from ion.integration.ais.common.spatial_temporal_bounds import SpatialTemporalBounds 

 

from ion.core.object import object_utils 

 

from ion.integration.ais.ais_object_identifiers import AIS_RESPONSE_MSG_TYPE, \ 

                                                       AIS_REQUEST_MSG_TYPE, \ 

                                                       AIS_RESPONSE_ERROR_TYPE, \ 

                                                       SUBSCRIBE_DATA_RESOURCE_REQ_TYPE, \ 

                                                       SUBSCRIBE_DATA_RESOURCE_RSP_TYPE, \ 

                                                       GET_SUBSCRIPTION_LIST_RESP_TYPE, \ 

                                                       DELETE_SUBSCRIPTION_REQ_TYPE, \ 

                                                       DELETE_SUBSCRIPTION_RSP_TYPE 

 

 

#fixme, don't need all of these 

 

PREDICATE_REFERENCE_TYPE           = object_utils.create_type_identifier(object_id=25, version=1) 

DISPATCHER_RESOURCE_TYPE           = object_utils.create_type_identifier(object_id=7002, version=1) 

DISPATCHER_WORKFLOW_RESOURCE_TYPE  = object_utils.create_type_identifier(object_id=7003, version=1) 

 

RESOURCE_CFG_REQUEST_TYPE = object_utils.create_type_identifier(object_id=10, version=1) 

""" 

from ion-object-definitions/net/ooici/core/message/resource_request.proto 

message ResourceConfigurationRequest{ 

    enum _MessageTypeIdentifier { 

      _ID = 10; 

      _VERSION = 1; 

    } 

 

    // The identifier for the resource to configure 

    optional net.ooici.core.link.CASRef resource_reference = 1; 

 

    // The desired configuration object 

    optional net.ooici.core.link.CASRef configuration = 2; 

""" 

 

RESOURCE_CFG_RESPONSE_TYPE = object_utils.create_type_identifier(object_id=12, version=1) 

""" 

from ion-object-definitions/net/ooici/core/message/resource_request.proto 

message ResourceConfigurationResponse{ 

    enum _MessageTypeIdentifier { 

      _ID = 12; 

      _VERSION = 1; 

    } 

 

    // The identifier for the resource to configure 

    optional net.ooici.core.link.CASRef resource_reference = 1; 

 

    // The desired configuration object 

    optional net.ooici.core.link.CASRef configuration = 2; 

 

    optional string result = 3; 

} 

""" 

 

 

class ManageDataResourceSubscription(object): 

 

    def __init__(self, ais): 

        log.debug('ManageDataResourceSubscription.__init__()') 

        self.mc  = ais.mc 

        self.rc  = ais.rc 

        self.ac  = AssociationClient(proc=ais) 

        self.asc = AssociationServiceClient(proc=ais) 

 

        self.ais = ais 

        # Lazy initialize this when it is needed 

        #self.pfn = PublisherFactory(publisher_type=NewSubscriptionEventPublisher, process=ais) 

        self.pfn = None 

 

        # Lazy initialize this when it is needed 

        #self.pfd = PublisherFactory(publisher_type=DelSubscriptionEventPublisher, process=ais) 

        self.pfd = None 

 

        self.nac = NotificationAlertServiceClient(proc=ais) 

        self.metadataCache = ais.getMetadataCache() 

 

 

    @defer.inlineCallbacks 

    def update(self, msg): 

        """ 

        @brief update the subscription to a data resource  

        @param msg GPB,   

        @GPB{Input,9209,1} 

        @GPB{Returns,9210,1} 

        @retval success 

        """ 

        log.info('ManageDataResourceSubscription.update()\n') 

        # repackage the subscription info into a one item list for the delete() call 

        reqMsg = yield self.mc.create_instance(AIS_REQUEST_MSG_TYPE) 

        reqMsg.message_parameters_reference = reqMsg.CreateObject(DELETE_SUBSCRIPTION_REQ_TYPE) 

        reqMsg.message_parameters_reference.subscriptions.add(); 

        reqMsg.message_parameters_reference.subscriptions[0].user_ooi_id  = msg.message_parameters_reference.subscriptionInfo.user_ooi_id 

        reqMsg.message_parameters_reference.subscriptions[0].data_src_id  = msg.message_parameters_reference.subscriptionInfo.data_src_id 

        Response = yield self.delete(reqMsg) 

        if Response.MessageType != AIS_RESPONSE_ERROR_TYPE: 

            Response = yield self.create(msg) 

        defer.returnValue(Response) 

 

 

    @defer.inlineCallbacks 

    def create(self, msg): 

        """ 

        @brief subscribe to a data resource  

        @param msg GPB,  

        @GPB{Input,9203,1} 

        @GPB{Returns,9204,1} 

        @retval success 

        """ 

        if  self.ais.AnalyzeTiming != None: 

            self.ais.TimeStamps.StartTime = time.time() 

            self.ais.TimeStamps.LastTime = self.ais.TimeStamps.StartTime 

            log.warning('ManageDataResourceSubscription.create: started at ' + str(self.ais.TimeStamps.StartTime)) 

 

        # check that the GPB is correct type & has a payload 

        result = yield self._CheckRequest(msg) 

        if result != None: 

            result.error_str = "AIS.ManageDataResourceSubscription.create: " + result.error_str 

            defer.returnValue(result) 

 

        # check that subscriptionInfo is present in GPB 

        if not msg.message_parameters_reference.IsFieldSet('subscriptionInfo'): 

             # build AIS error response 

             Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

             Response.error_num = Response.ResponseCodes.BAD_REQUEST 

             Response.error_str = "AIS.ManageDataResourceSubscription.create: Required field [subscriptionInfo] not found in message" 

             defer.returnValue(Response) 

 

        # check that AisDatasetMetadataType is present in GPB 

        if not msg.message_parameters_reference.IsFieldSet('datasetMetadata'): 

             # build AIS error response 

             Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

             Response.error_num = Response.ResponseCodes.BAD_REQUEST 

             Response.error_str = "AIS.ManageDataResourceSubscription.create: Required field [datasetMetadata] not found in message" 

             defer.returnValue(Response) 

 

        # check that ooi_id is present in GPB 

        if not msg.message_parameters_reference.subscriptionInfo.IsFieldSet('user_ooi_id'): 

             # build AIS error response 

             Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

             Response.error_num = Response.ResponseCodes.BAD_REQUEST 

             Response.error_str = "AIS.ManageDataResourceSubscription.create: Required field [user_ooi_id] not found in message" 

             defer.returnValue(Response) 

 

        if not msg.message_parameters_reference.subscriptionInfo.IsFieldSet('data_src_id'): 

             # build AIS error response 

             Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

             Response.error_num = Response.ResponseCodes.BAD_REQUEST 

             Response.error_str = "AIS.ManageDataResourceSubscription.create: Required field [data_src_id] not found in message" 

             defer.returnValue(Response) 

 

        # check that subscription type enum is present in GPB 

        if not msg.message_parameters_reference.subscriptionInfo.IsFieldSet('subscription_type'): 

             # build AIS error response 

             Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

             Response.error_num = Response.ResponseCodes.BAD_REQUEST 

             Response.error_str = "AIS.ManageDataResourceSubscription.create: Required field [subscription_type] not found in message" 

             defer.returnValue(Response) 

 

        userID = msg.message_parameters_reference.subscriptionInfo.user_ooi_id 

        msg.message_parameters_reference.subscriptionInfo.date_registered = IonTime().time_ms 

 

        try: 

            log.debug("create: calling notification alert service addSubscription()") 

            yield self.nac.addSubscription(msg) 

            if  self.ais.AnalyzeTiming != None: 

                log.warning('ManageDataResourceSubscription.create: added subscription ' + self.ais.TimeStamp()) 

 

            Response = yield self.mc.create_instance(AIS_RESPONSE_MSG_TYPE) 

            Response.message_parameters_reference.add() 

 

            # 

            # Now determine the subscription type; if dispatcher, we need to create 

            # a dispatcher workflow.  But first: 

            # 1. Find the dispatcher associated with this user. 

            # 2. Find the dispatcher workflow associated with this subscription. 

            # 3. Delete the dispatcher workflow. 

            # 

            if ((msg.message_parameters_reference.subscriptionInfo.subscription_type == msg.message_parameters_reference.subscriptionInfo.SubscriptionType.DISPATCHER) or 

               (msg.message_parameters_reference.subscriptionInfo.subscription_type == msg.message_parameters_reference.subscriptionInfo.SubscriptionType.EMAILANDDISPATCHER)): 

 

                # 

                # There should be a dispatcher associated with this user; find it now. 

                # 

 

                log.info('Getting user resource instance') 

                try: 

                    self.userRes = yield self.rc.get_instance(userID) 

                except ResourceClientError: 

                    errString = 'Error getting instance of userID: ' + userID 

                    log.error(errString) 

                    # build AIS error response 

                    Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

                    Response.error_num = Response.ResponseCodes.INTERNAL_SERVER_ERROR 

                    Response.error_str = "AIS.ManageDataResourceSubscription.create: " + errString 

                    defer.returnValue(Response) 

                log.info('Got user resource instance: ' + self.userRes.ResourceIdentity) 

                self.userID = self.userRes.ResourceIdentity 

                if  self.ais.AnalyzeTiming != None: 

                    log.warning('ManageDataResourceSubscription.create: got user instance ' + self.ais.TimeStamp()) 

 

                dispatcherID = yield self.__findDispatcher(self.userRes) 

                if (dispatcherID is None): 

                    errString = 'Dispatcher not found for userID' + self.userID 

                    log.error(errString) 

                    # build AIS error response 

                    Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

                    Response.error_num = Response.ResponseCodes.NOT_FOUND 

                    Response.error_str = "AIS.ManageDataResourceSubscription.create: " + errString 

                    defer.returnValue(Response) 

                else: 

                    log.info('FOUND DISPATCHER: ' + dispatcherID) 

                if  self.ais.AnalyzeTiming != None: 

                    log.warning('ManageDataResourceSubscription.create: found dispatcher ' + self.ais.TimeStamp()) 

 

                # 

                # Create a dispatcher workflow 

                # 

                yield self.__createDispatcherWorkflow(msg.message_parameters_reference, dispatcherID) 

                if  self.ais.AnalyzeTiming != None: 

                    log.warning('ManageDataResourceSubscription.create: created workflow ' + self.ais.TimeStamp()) 

 

            Response.message_parameters_reference[0] = Response.CreateObject(SUBSCRIBE_DATA_RESOURCE_RSP_TYPE) 

            Response.message_parameters_reference[0].success  = True 

 

            defer.returnValue(Response) 

 

        except ReceivedApplicationError, ex: 

            log.info('ManageDataResourceSubscription.createDataResourceSubscription(): Error attempting to addSubscription(): %s' %ex.msg_content.MessageResponseBody) 

 

            Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

 

            Response.error_num =  ex.msg_content.MessageResponseCode 

            Response.error_str =  "AIS.ManageDataResourceSubscription.create: " + ex.msg_content.MessageResponseBody 

            defer.returnValue(Response) 

 

 

    @defer.inlineCallbacks 

    def __findDispatcher(self, userRes): 

 

        # get the user's associations 

        Associations = yield self.ac.find_associations(subject=userRes) 

        log.debug('Found ' + str(len(Associations)) + ' associations for user ' + userRes.ResourceIdentity) 

 

        # get the dispatcher resources out of the Association Service 

        request = yield self.mc.create_instance(PREDICATE_OBJECT_QUERY_TYPE) 

        pair = request.pairs.add() 

 

        # Set the predicate search term 

        pref = request.CreateObject(PREDICATE_REFERENCE_TYPE) 

        pref.key = TYPE_OF_ID 

        pair.predicate = pref 

 

        # Set the Object search term 

        type_ref = request.CreateObject(IDREF_TYPE) 

        type_ref.key = DISPATCHER_RESOURCE_TYPE_ID 

        pair.object = type_ref 

 

        Dispatchers = yield self.asc.get_subjects(request) 

        log.debug('Found ' + str(len(Dispatchers.idrefs)) + ' dispatchers.') 

 

        for Association in Associations: 

            for Dispatcher in Dispatchers.idrefs: 

                log.info('a=%s, d=%s'%(str(Association.ObjectReference.key), str(Dispatcher.key))) 

                if Association.ObjectReference.key == Dispatcher.key: 

                    defer.returnValue(Dispatcher.key) 

        defer.returnValue(None) 

 

 

    @defer.inlineCallbacks 

    def __createDispatcherWorkflow(self, createInfo, dispatcherID): 

 

        if  self.ais.AnalyzeTiming != None: 

            log.warning('ManageDataResourceSubscription.__createDispatcherWorkflow: started at ' + self.ais.TimeStamp()) 

 

        log.debug('__createDispatcherWorkflow') 

 

        dispatcherRes = yield self.rc.get_instance(dispatcherID) 

        subscriptionInfo = createInfo.subscriptionInfo 

        datasetInfo = createInfo.datasetMetadata 

        if  self.ais.AnalyzeTiming != None: 

            log.warning('ManageDataResourceSubscription.__createDispatcherWorkflow: got dispatcher instance ' + self.ais.TimeStamp()) 

 

        # 

        # Create the dispatcher workflow resource 

        # 

        dwfRes = yield self.rc.create_instance(DISPATCHER_WORKFLOW_RESOURCE_TYPE, ResourceName = 'DispatcherWorkflow') 

        workflowID = dwfRes.ResourceIdentity 

        dwfRes.dataset_id = datasetInfo.data_resource_id 

        dwfRes.user_ooi_id = subscriptionInfo.user_ooi_id 

        dwfRes.workflow_path = subscriptionInfo.dispatcher_script_path 

        yield self.rc.put_instance(dwfRes) 

        if  self.ais.AnalyzeTiming != None: 

            log.warning('ManageDataResourceSubscription.__createDispatcherWorkflow: put workflow instance ' + self.ais.TimeStamp()) 

 

        log.debug('Creating association between dispatcherID: ' + dispatcherID + ' and workflowID: ' + workflowID) 

 

        # 

        # Create an association between the workflow and the dispatcher 

        # 

        try: 

            association = yield self.ac.create_association(dispatcherRes, HAS_A_ID, dwfRes) 

            if association not in dispatcherRes.ResourceAssociationsAsSubject: 

                log.error('Error: subject not in association!') 

            if association not in dwfRes.ResourceAssociationsAsObject: 

                log.error('Error: object not in association') 

 

            # 

            # Put the association in datastore 

            # 

            log.debug('Storing association: ' + str(association)) 

            yield self.rc.put_instance(association) 

            if  self.ais.AnalyzeTiming != None: 

                log.warning('ManageDataResourceSubscription.__createDispatcherWorkflow: put association ' + self.ais.TimeStamp()) 

 

        except AssociationClientError, ex: 

            errString = 'Error creating assocation between dispatcherID: ' + dispatcherID + ' and workflowID: ' + workflowID + '. ex: ' + str(ex) 

            log.error(errString) 

            # build AIS error response 

            Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

            Response.error_num = Response.ResponseCodes.INTERNAL_SERVER_ERROR 

            Response.error_str = errString 

            defer.returnValue(Response) 

 

 

        if self.pfn is None: 

            pubfact = PublisherFactory(publisher_type=NewSubscriptionEventPublisher, process=self.ais) 

            self.pfn = yield pubfact.build() 

            if  self.ais.AnalyzeTiming != None: 

                log.warning('ManageDataResourceSubscription.__createDispatcherWorkflow: built publisher factory ' + self.ais.TimeStamp()) 

 

 

        # Publish the new subscription notification 

        yield self.pfn.create_and_publish_event(dispatcher_workflow = dwfRes.ResourceObject, origin = dispatcherID) 

        if  self.ais.AnalyzeTiming != None: 

            log.warning('ManageDataResourceSubscription.__createDispatcherWorkflow: published event ' + self.ais.TimeStamp()) 

 

        defer.returnValue(None) 

 

 

    @defer.inlineCallbacks 

    def delete(self, msg): 

        """ 

        @brief delete the subscription to a data resource  

        @param msg GPB,  

        @GPB{Input,9205,1} 

        @GPB{Returns,9206,1} 

        @retval success 

        """ 

        if  self.ais.AnalyzeTiming != None: 

            self.ais.TimeStamps.StartTime = time.time() 

            self.ais.TimeStamps.LastTime = self.ais.TimeStamps.StartTime 

            log.warning('ManageDataResourceSubscription.delete: started at ' + str(self.ais.TimeStamps.StartTime)) 

 

        # check that the GPB is correct type & has a payload 

        result = yield self._CheckRequest(msg) 

        if result != None: 

            result.error_str = "AIS.ManageDataResourceSubscription.delete: " + result.error_str 

            defer.returnValue(result) 

 

        # check that subscriptionInfo is present in GPB 

        if not msg.message_parameters_reference.IsFieldSet('subscriptions'): 

             # build AIS error response 

             Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

             Response.error_num = Response.ResponseCodes.BAD_REQUEST 

             Response.error_str = "AIS.ManageDataResourceSubscription.delete: Required field [subscriptions] not found in message" 

             defer.returnValue(Response) 

 

        for Subscription in msg.message_parameters_reference.subscriptions: 

            # check that user_ooi_id is present in GPB 

            if not Subscription.IsFieldSet('user_ooi_id'): 

                # build AIS error response 

                Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE, MessageName='AIS error response') 

                Response.error_num = Response.ResponseCodes.BAD_REQUEST 

                Response.error_str = "AIS.ManageDataResourceSubscription.delete: Required field [user_ooi_id] not found in message" 

                defer.returnValue(Response) 

 

            # check that data_src_id is present in GPB 

            if not Subscription.IsFieldSet('data_src_id'): 

                # build AIS error response 

                Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE, MessageName='AIS error response') 

                Response.error_num = Response.ResponseCodes.BAD_REQUEST 

                Response.error_str = "AIS.ManageDataResourceSubscription.delete: Required field [data_src_id] not found in message" 

                defer.returnValue(Response) 

 

            reqMsg = yield self.mc.create_instance(AIS_REQUEST_MSG_TYPE) 

            reqMsg.message_parameters_reference = reqMsg.CreateObject(SUBSCRIBE_DATA_RESOURCE_REQ_TYPE) 

            reqMsg.message_parameters_reference.subscriptionInfo.user_ooi_id = Subscription.user_ooi_id 

            reqMsg.message_parameters_reference.subscriptionInfo.data_src_id = Subscription.data_src_id 

 

            try: 

                log.debug("delete: calling notification alert service getSubscription()") 

                subscription = yield self.nac.getSubscription(reqMsg) 

                log.info('getSubscription returned:\n %s'%subscription.message_parameters_reference[0].subscriptionListResults[0]) 

                if  self.ais.AnalyzeTiming != None: 

                    log.warning('ManageDataResourceSubscription.delete: got subscription ' + self.ais.TimeStamp()) 

 

                log.debug("delete: calling notification alert service removeSubscription()") 

                yield self.nac.removeSubscription(reqMsg) 

                if  self.ais.AnalyzeTiming != None: 

                    log.warning('ManageDataResourceSubscription.delete: removed subscription ' + self.ais.TimeStamp()) 

 

                # Now determine if subscription type includes a dispatcher.  If so, we need to delete 

                # the dispatcher workflow by: 

                #   1. Finding the dispatcher associated with this user. 

                #   2. Finding the dispatcher workflow associated with this subscription. 

                #   3. Deleting the dispatcher workflow. 

 

                SubscriptionInfo = subscription.message_parameters_reference[0].subscriptionListResults[0].subscriptionInfo 

                if ((SubscriptionInfo.subscription_type == SubscriptionInfo.SubscriptionType.DISPATCHER) or 

                    (SubscriptionInfo.subscription_type == SubscriptionInfo.SubscriptionType.EMAILANDDISPATCHER)): 

                    log.info("delete: deleting dispatcher workflow") 

 

                    log.info('Getting user resource instance') 

                    UserID = reqMsg.message_parameters_reference.subscriptionInfo.user_ooi_id 

                    try: 

                        self.userRes = yield self.rc.get_instance(UserID) 

                    except ResourceClientError: 

                        errString = 'Error getting instance of userID: ' + UserID 

                        log.error(errString) 

                        # build AIS error response 

                        Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

                        Response.error_num = Response.ResponseCodes.INTERNAL_SERVER_ERROR 

                        Response.error_str = "AIS.ManageDataResourceSubscription.delete: " + errString 

                        defer.returnValue(Response) 

                    log.info('Got user resource instance: ' + self.userRes.ResourceIdentity) 

                    if  self.ais.AnalyzeTiming != None: 

                        log.warning('ManageDataResourceSubscription.delete: got user instance ' + self.ais.TimeStamp()) 

 

                    # get the user's dispatcher 

                    dispatcherID = yield self.__findDispatcher(self.userRes) 

                    if (dispatcherID is None): 

                        # build AIS error response 

                        Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

                        Response.error_num = Response.ResponseCodes.NOT_FOUND 

                        errString = 'Dispatcher not found for userID' + UserID 

                        Response.error_str = "AIS.ManageDataResourceSubscription.delete: " + errString 

                        defer.returnValue(Response) 

                    else: 

                        log.info('FOUND DISPATCHER %s for user %s'%(dispatcherID, UserID)) 

 

                    if  self.ais.AnalyzeTiming != None: 

                        log.warning('ManageDataResourceSubscription.delete: found dispatcher ' + self.ais.TimeStamp()) 

                    # delete the workflow 

                    Reply = yield self.__deleteDispatcherWorkflow(SubscriptionInfo, dispatcherID) 

                    if  self.ais.AnalyzeTiming != None: 

                        log.warning('ManageDataResourceSubscription.delete: removed workflow ' + self.ais.TimeStamp()) 

                    defer.returnValue(Reply) 

 

            except ReceivedApplicationError, ex: 

                log.info('ManageDataResourceSubscription.delete(): Error attempting to remove Subscription(): %s' %ex.msg_content.MessageResponseBody) 

                Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

                Response.error_num =  ex.msg_content.MessageResponseCode 

                Response.error_str =  "AIS.ManageDataResourceSubscription.delete: " + ex.msg_content.MessageResponseBody 

                defer.returnValue(Response) 

 

            except ApplicationError, ex: 

                log.info('ManageDataResourceSubscription.delete(): Error attempting to remove Subscription(): %s' %ex) 

                Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

                Response.error_num =  "AIS.ManageDataResourceSubscription.delete: " + ex.response_code 

                Response.error_str =  str(ex) 

                defer.returnValue(Response) 

 

        Response = yield self.mc.create_instance(AIS_RESPONSE_MSG_TYPE) 

        Response.message_parameters_reference.add() 

        Response.message_parameters_reference[0] = Response.CreateObject(DELETE_SUBSCRIPTION_RSP_TYPE) 

        Response.message_parameters_reference[0].success  = True 

        defer.returnValue(Response) 

 

 

    @defer.inlineCallbacks 

    def __deleteDispatcherWorkflow(self, SubscriptionInfo, dispatcherID): 

 

        log.debug('__deleteDispatcherWorkflow') 

 

        dispatcherRes = yield self.rc.get_instance(dispatcherID) 

        (Association, wkflRes) = yield self.__findWorkflowAssociation(dispatcherRes, SubscriptionInfo) 

        if Association == None: 

            errString = 'Error finding workflow for user ' + SubscriptionInfo.user_ooi_id + \ 

                        ' and data resource ' + SubscriptionInfo.data_src_id + \ 

                        ' on dispatcher ' + dispatcherID 

            log.error(errString) 

            # build AIS error response 

            Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

            Response.error_num = Response.ResponseCodes.INTERNAL_SERVER_ERROR 

            Response.error_str = errString 

            defer.returnValue(Response) 

 

        Association.SetNull() 

        wkflRes._set_life_cycle_state(wkflRes.RETIRED) 

        try: 

           yield self.rc.put_resource_transaction([wkflRes]) 

        except ApplicationError, ex: 

            log.info('ManageDataResourceSubscription.__deleteDispatcherWorkflow(): Error attempting to retire workflow & association: %s' %ex) 

            Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

            Response.error_num =  ex.msg_content.MessageResponseCode 

            Response.error_str =  ex.msg_content.MessageResponseBody 

            defer.returnValue(Response) 

 

        # 

        # Create the dispatcher workflow for delete event  # 

        dwfRes = yield self.rc.create_instance(DISPATCHER_WORKFLOW_RESOURCE_TYPE, ResourceName = 'Delete DispatcherWorkflow') 

        dwfRes.dataset_id = SubscriptionInfo.data_src_id 

        dwfRes.workflow_path = SubscriptionInfo.dispatcher_script_path 

        # Publish the delete subscription notification 

 

        if self.pfd is None: 

            pubfact = PublisherFactory(publisher_type=DelSubscriptionEventPublisher, process=self.ais) 

            self.pfd = yield pubfact.build() 

 

        yield self.pfd.create_and_publish_event(dispatcher_workflow = dwfRes.ResourceObject, origin = dispatcherID) 

 

        Response = yield self.mc.create_instance(AIS_RESPONSE_MSG_TYPE) 

        Response.message_parameters_reference.add() 

        Response.message_parameters_reference[0] = Response.CreateObject(DELETE_SUBSCRIPTION_RSP_TYPE) 

        Response.message_parameters_reference[0].success  = True 

        defer.returnValue(Response) 

 

 

    @defer.inlineCallbacks 

    def __findWorkflowAssociation(self, dispatcherRes, SubscriptionInfo): 

 

        log.debug('__findWorkflowAssociation') 

        # get the dispatcher's associations 

        Associations = yield self.ac.find_associations(subject=dispatcherRes) 

        log.debug('Found ' + str(len(Associations)) + ' associations for dispatcher ' + dispatcherRes.ResourceIdentity) 

 

        # search for workflow with same userID and dataResourceID as in subscription 

        for Association in Associations: 

            log.debug('Asso = \n'+str(Association)) 

            try: 

                log.debug('getting '+str(Association.ObjectReference.key)) 

                Ref = yield self.rc.get_instance(Association.ObjectReference.key) 

            except ResourceClientError: 

                log.error('Error getting instance of Resource: ' + Association.ObjectReference.key) 

                continue 

            if log.getEffectiveLevel() <= logging.DEBUG: 

                log.debug('Ref = \n'+str(Ref)) 

            if Ref.ResourceObjectType != DISPATCHER_WORKFLOW_RESOURCE_TYPE: 

                continue 

            if ((Ref.user_ooi_id == SubscriptionInfo.user_ooi_id) and 

                (Ref.dataset_id == SubscriptionInfo.data_src_id)): 

                defer.returnValue([Association, Ref]) 

        defer.returnValue([None, None]) 

 

 

    @defer.inlineCallbacks 

    def find(self, msg): 

        """ 

        @brief find all subscriptions for a data resource 

        @param msg GPB,  

        @GPB{Input,9203,1} 

        @GPB{Returns,9204,1} 

        @retval success 

        """ 

        log.info('ManageDataResourceSubscription.findDataResourceSubscriptions()') 

 

        # check that the GPB is correct type & has a payload 

        result = yield self._CheckRequest(msg) 

        if result != None: 

            log.error("AIS.findDataResourceSubscriptions: %s " %(result.error_str)) 

            result.error_str = "AIS.findDataResourceSubscriptions " + result.error_str 

            defer.returnValue(result) 

 

 

        try: 

            log.debug('find: Calling NAS.getSubscriptionList service') 

            reply = yield self.nac.getSubscriptionList(msg) 

            numSubsReturned = len(reply.message_parameters_reference[0].subscriptionListResults) 

            log.debug('getSubscriptionList returned: ' + str(numSubsReturned) + ' subscriptions.') 

        except ReceivedApplicationError, ex: 

            log.info('AIS.ManageDataResourceSubscription.find(): Error calling NAS.getSubscriptionList: %s' %ex) 

            Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE) 

            Response.error_num =  ex.msg_content.MessageResponseCode 

            Response.error_str =  ex.msg_content.MessageResponseBody 

            defer.returnValue(Response) 

 

        # 

        # Instantiate a bounds object, and load it up with the given bounds 

        # info 

        # 

        bounds = SpatialTemporalBounds() 

        bounds.loadBounds(msg.message_parameters_reference.dataBounds) 

 

        # create response message 

        respMsg = yield self.mc.create_instance(AIS_RESPONSE_MSG_TYPE) 

        respMsg.message_parameters_reference.add() 

        respMsg.message_parameters_reference[0] = respMsg.CreateObject(GET_SUBSCRIPTION_LIST_RESP_TYPE) 

 

        # 

        # Now iterate through the list, filtering by the bounds.  If no metadata 

        # is found, log an error (shouldn't happen) 

        # 

        j = 0 

        for result in reply.message_parameters_reference[0].subscriptionListResults: 

            dSetResID = result.datasetMetadata.data_resource_id 

            dSetMetadata = yield self.metadataCache.getDSetMetadata(dSetResID) 

            if dSetMetadata is None: 

                log.error('Metadata not found for dataset: %s' %(dSetResID)) 

            else: 

                log.debug('Metadata found for dataset: %s' %(dSetResID)) 

                if bounds.isInBounds(dSetMetadata): 

                    respMsg.message_parameters_reference[0].subscriptionListResults.add() 

                    self.__loadSubscriptionListResultsMsg(respMsg.message_parameters_reference[0].subscriptionListResults[j], result) 

                    j = j + 1 

 

        defer.returnValue(respMsg) 

 

 

    def __loadSubscriptionListResultsMsg(self, respMsg, result): 

        # 

        # Private utility method to build a subscription list response message. 

        #  

        respMsg.subscriptionInfo.user_ooi_id = result.subscriptionInfo.user_ooi_id 

        respMsg.subscriptionInfo.data_src_id = result.subscriptionInfo.data_src_id 

        respMsg.subscriptionInfo.subscription_type = result.subscriptionInfo.subscription_type 

        respMsg.subscriptionInfo.email_alerts_filter = result.subscriptionInfo.email_alerts_filter 

        respMsg.subscriptionInfo.dispatcher_alerts_filter = result.subscriptionInfo.dispatcher_alerts_filter 

        respMsg.subscriptionInfo.dispatcher_script_path = result.subscriptionInfo.dispatcher_script_path 

        respMsg.subscriptionInfo.date_registered = result.subscriptionInfo.date_registered 

 

        respMsg.datasetMetadata.user_ooi_id = result.datasetMetadata.user_ooi_id 

        respMsg.datasetMetadata.data_resource_id = result.datasetMetadata.data_resource_id 

        respMsg.datasetMetadata.title = result.datasetMetadata.title 

        respMsg.datasetMetadata.institution = result.datasetMetadata.institution 

        respMsg.datasetMetadata.source = result.datasetMetadata.source 

        respMsg.datasetMetadata.references = result.datasetMetadata.references 

        respMsg.datasetMetadata.summary = result.datasetMetadata.summary 

        respMsg.datasetMetadata.conventions = result.datasetMetadata.conventions 

        respMsg.datasetMetadata.comment = result.datasetMetadata.comment 

        respMsg.datasetMetadata.ion_time_coverage_start = result.datasetMetadata.ion_time_coverage_start 

        respMsg.datasetMetadata.ion_time_coverage_end = result.datasetMetadata.ion_time_coverage_end 

        respMsg.datasetMetadata.ion_geospatial_lat_min = result.datasetMetadata.ion_geospatial_lat_min 

        respMsg.datasetMetadata.ion_geospatial_lat_max = result.datasetMetadata.ion_geospatial_lat_max 

        respMsg.datasetMetadata.ion_geospatial_lon_min = result.datasetMetadata.ion_geospatial_lon_min 

        respMsg.datasetMetadata.ion_geospatial_lon_max = result.datasetMetadata.ion_geospatial_lon_max 

        respMsg.datasetMetadata.ion_geospatial_vertical_min = result.datasetMetadata.ion_geospatial_vertical_min 

        respMsg.datasetMetadata.ion_geospatial_vertical_max = result.datasetMetadata.ion_geospatial_vertical_max 

        respMsg.datasetMetadata.ion_geospatial_vertical_positive = result.datasetMetadata.ion_geospatial_vertical_positive 

        respMsg.datasetMetadata.download_url = result.datasetMetadata.download_url 

 

 

    @defer.inlineCallbacks 

    def _CheckRequest(self, request): 

        # Check for correct request protocol buffer type 

        if request.MessageType != AIS_REQUEST_MSG_TYPE: 

            # build AIS error response 

            Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE, MessageName='AIS error response') 

            Response.error_num = Response.ResponseCodes.BAD_REQUEST 

            Response.error_str = 'Bad message type receieved, ignoring' 

            defer.returnValue(Response) 

 

        # Check payload in message 

        if not request.IsFieldSet('message_parameters_reference'): 

            # build AIS error response 

            Response = yield self.mc.create_instance(AIS_RESPONSE_ERROR_TYPE, MessageName='AIS error response') 

            Response.error_num = Response.ResponseCodes.BAD_REQUEST 

            Response.error_str = "Required field [message_parameters_reference] not found in message" 

            defer.returnValue(Response) 

 

        defer.returnValue(None)