Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

#!/usr/bin/env python 

 

 

""" 

@file ion/services/dm/preservation/cassandra_manager_agent.py 

@author David Stuebe 

@author Matt Rodriguez 

@brief agent controlling preservation of OOI Data 

""" 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

from twisted.internet import defer 

 

from ion.core.process.process import ProcessFactory 

from ion.core.process.service_process import ServiceProcess, ServiceClient 

from ion.core.object import object_utils 

from ion.core.data.cassandra import CassandraDataManager, CassandraStorageResource 

 

 

from ion.services.coi.resource_registry.resource_client import ResourceClient 

 

from ion.core.messaging.message_client import MessageClient 

 

from ion.core import ioninit 

CONF = ioninit.config(__name__) 

 

host_list_type = object_utils.create_type_identifier(object_id=2501, version=1) 

simple_password_type = object_utils.create_type_identifier(object_id=2502, version=1) 

cassandra_credential_type =  object_utils.create_type_identifier(object_id=2503, version=1) 

columndef_type =  object_utils.create_type_identifier(object_id=2508, version=1) 

 

 

# Persistent Technology Resource: 

cassandra_cluster_type =  object_utils.create_type_identifier(object_id=2504, version=1) 

 

# Persistent Archive Resource: 

cassandra_keyspace_type =  object_utils.create_type_identifier(object_id=2506, version=1) 

 

# Cache Resource: 

cassandra_column_family_type =  object_utils.create_type_identifier(object_id=2507, version=1) 

resource_response_type = object_utils.create_type_identifier(object_id=12, version=1) 

 

 

 

class CassandraManagerServiceException(Exception): 

    """ 

    Exceptions that originate in the CassandraManagerService class 

    """ 

 

class CassandraManagerService(ServiceProcess): 

    """ 

    @brief Cassandra Manager Agent interface 

 

    @TODO change to inherit from agent process 

    The agent uses a single manager client connection to cassandra to configure 

    cassandra persistent archive and cache resources.  

     

    Resources can be created by the agent 

    Resources can be modified by the agent 

    Resources can be deleted by the agent 

     

    The agent is also used during a boot strap procedure...= 

    """ 

 

    # Declaration of service 

    declare = ServiceProcess.service_declare(name='cassandra_manager_agent', version='0.1.0', dependencies=[]) 

 

    def __init__(self, *args, **kwargs): 

        # Service class initializer. Basic config, but no yields allowed. 

        ServiceProcess.__init__(self, *args, **kwargs) 

 

        self._host = self.spawn_args.get('bootstrap_args',  CONF.getValue('host')) 

        self._port = self.spawn_args.get('bootstrap_args', CONF.getValue('port')) 

        self._username = self.spawn_args.get('bootstrap_args', CONF.getValue('cassandra_username')) 

        self._password = self.spawn_args.get('bootstrap_args',CONF.getValue('cassandra_password')) 

        # Create a Resource Client  

        self.rc = ResourceClient(proc=self) 

        self.mc = MessageClient(proc=self) 

        log.info('CassandraManagerAgent.__init__()') 

 

 

 

    @defer.inlineCallbacks 

    def slc_activate(self, *args): 

        """ 

        Activation can be automatic when the process is spawned or triggered by 

        a message from the client. 

         

        First, default to bootstrapping from the spawn args. Create all resources 

        by describing the cluster! 

         

        Second, add hooks to override the spawn args and take the storage resource 

        reference from a message in op_activate - connecting to an already active 

        system! 

         

        """ 

        #Hard code the storage resource for now. Eventually pass all this into spawn_args 

        if self._host is None: 

            raise CassandraManagerServiceException("The hostname for the Cassandra cluster is not set.") 

 

        if self._port is None: 

            raise CassandraManagerServiceException("The port for the Cassandra cluster is not set.") 

 

        if self._username is None: 

            raise CassandraManagerServiceException("The username for the credentials to authenticate to the Cassandra cluster is not set.") 

 

        if self._password is None: 

            raise CassandraManagerServiceException("The password for the credentials to authenticate to the Cassandra cluster is not set.") 

 

        ### Create a persistence_technology resource - for cassandra a CassandraCluster object 

        cassandra_cluster = yield self.rc.create_instance(cassandra_cluster_type, ResourceName="Cassandra cluster", ResourceDescription="OOI Cassandra cluster") 

        #persistent_technology_repository, cassandra_cluster  = self.wb.init_repository(cassandra_cluster_type) 

 

        # Set only one host and port in the host list for now 

        cas_host = cassandra_cluster.hosts.add() 

        cas_host.host = self._host 

        cas_host.port = self._port 

 

        ### Create a Credentials resource - for cassandra a SimplePassword object 

        #cache_repository, simple_password  = self.wb.init_repository(simple_password_type) 

        simple_password = yield self.rc.create_instance(simple_password_type, ResourceName="Cassandra credentials", ResourceDescription="OOI Cassandra credentials") 

        simple_password.username = self._username 

        simple_password.password = self._password 

 

        storage_resource = CassandraStorageResource(cassandra_cluster, credentials=simple_password) 

        manager = CassandraDataManager(storage_resource) 

 

        yield self.register_life_cycle_object(manager) 

 

        self.manager = manager 

        #yield self._bootstrap() 

 

 

    @defer.inlineCallbacks 

    def _bootstrap(self): 

        """ 

        Query the cassandra cluster creating resources for each keyspace  

        and column family. 

         

        @note the persistent archive resources do not have reference to the cache resources. 

        """ 

        log.info("In CassandraManagerService._bootstrap method") 

        desc = yield self.manager._describe_keyspaces() 

        log.info(str(desc)) 

        for ks in desc: 

            persistent_archive = yield self.rc.create_instance(cassandra_keyspace_type, ResourceName=ks.name, ResourceDescription="description of " + ks.name) 

            persistent_archive.name = ks.name 

            for cf in ks.cf_defs: 

                cache = yield self.rc.create_instance(cassandra_column_family_type, ResourceName=cf.name, ResourceDescription="description of " + cf.name) 

                cache.name = cf.name 

                column_index = 0 

                for col in cf.column_metadata: 

                    column = yield self.rc.create_instance(columndef_type, ResourceName=col.name, ResourceDescription="description of " + col.name) 

                    column.column_name = col.name 

                    column.validation_class = col.validation_class 

                    if col.index_name is not None: 

                        column.index_name = col.index_name 

                    if col.index_type is not None: 

                        column.index_type = col.index_type 

                    cache.column_metadata.add() 

                    cache.column_metadata[column_index] = self.rc.reference_instance(column) 

                    column_index = column_index + 1 

 

 

    def __set_persistent_archive_resource_attrs(self, request, resource): 

        """ 

        @param request is the request message passed into the service 

        @param resource is the ion resource that is to be configured by copying attributes from the request 

        @param attrs is a list of attribute names 

        @brief copy the attributes from the request to the ION resource. 

        @todo this should happen in a __setattr__ method 

        """ 

        if not request.IsFieldSet('configuration'): 

            raise CassandraManagerServiceException("The request is not properly formatted. It must have a configuration attribute") 

 

        attrs = ["name", "strategy_class", "replication_factor"] 

        for attr in attrs: 

            if request.configuration.IsFieldSet(attr): 

                setattr(resource, attr, getattr(request.configuration, attr)) 

            else: 

                resource.ClearField(attr) 

        return resource 

 

    def __set_cache_resource_attrs(self, request, resource): 

        """ 

        @param request is the request message passed into the service 

        @param resource is the ion resource that is to be configured by copying attributes from the request 

        @param attrs is a list of attribute names 

        @brief copy the attributes from the request to the ION resource. 

        @todo this should happen in a __setattr__ method 

        """ 

        if not request.IsFieldSet('cache_configuration'): 

            raise CassandraManagerServiceException("The request is not properly formatted. It must have a cache_configuration attribute") 

 

        attrs = ["name"] 

        for attr in attrs: 

            if request.cache_configuration.IsFieldSet(attr): 

                setattr(resource, attr, getattr(request.cache_configuration, attr)) 

            else: 

                resource.ClearField(attr) 

        return resource 

 

    @defer.inlineCallbacks 

    def op_create_persistent_archive(self, request, headers, msg): 

        """ 

        Service operation: define a new archive object 

        """ 

        log.info("Called CassandraManagerService.op_create_persistent_archive") 

        persistent_archive = request.configuration 

        log.info("request.configuration %s" % (request.configuration,)) 

        persistent_archive_resource = yield self.rc.create_instance(cassandra_keyspace_type, ResourceName=persistent_archive.name, 

                                  ResourceDescription="A description") 

 

 

        log.info("Created resource") 

        #Set fields of the persistent_archive_resource from persistent_archive 

        self.__set_persistent_archive_resource_attrs(request, persistent_archive_resource) 

 

        yield self.rc.put_instance(persistent_archive_resource, "A commit message") 

        log.info("Put resource into datastore") 

        yield self.manager.create_persistent_archive(persistent_archive) 

 

        response = yield self.mc.create_instance(resource_response_type, MessageName="create_persistent_archive_response") 

 

 

 

        response.resource_reference = self.rc.reference_instance(persistent_archive_resource) 

 

        # pass the current configuration 

        response.configuration = persistent_archive_resource.ResourceObject 

 

        # pass the result of the create operation... 

        response.result = 'Created' 

 

        # The following line shows how to reply to a message 

        yield self.reply_ok(msg, response) 

 

 

    @defer.inlineCallbacks 

    def op_update_persistent_archive(self, request, headers, msg): 

        """ 

        Service operation: update the persistent_archive 

         

        """ 

        log.info("In op_update_persistent_archive") 

        persistent_archive = request.configuration 

        persistent_archive_resource = yield self.rc.get_instance(request.resource_reference) 

        #Check to see if the resource has changed 

        self.__set_persistent_archive_resource_attrs(request, persistent_archive_resource) 

        log.info("put instance") 

        yield self.rc.put_instance(persistent_archive_resource) 

        #Do the business logic 

        yield self.manager.update_persistent_archive(persistent_archive) 

 

        log.info("created response") 

        response = yield self.mc.create_instance(resource_response_type, MessageName="update_persistent_archive_response") 

        response.resource_reference = self.rc.reference_instance(persistent_archive_resource) 

 

        response.configuration = persistent_archive_resource.ResourceObject 

        response.result = 'Updated' 

        yield self.reply_ok(msg, response) 

 

    @defer.inlineCallbacks 

    def op_delete_persistent_archive(self, request, headers, msg): 

        """ 

        Service operation: delete the archive 

        """ 

 

        log.info("request: " + str(request)) 

        persistent_archive = request.configuration 

        persistent_archive_resource = yield self.rc.get_instance(request.resource_reference) 

        #We want to delete this resource from the registry here 

 

        yield self.manager.remove_persistent_archive(persistent_archive) 

        response = yield self.mc.create_instance(resource_response_type, MessageName="delete_persistent_archive_response") 

        response.resource_reference = self.rc.reference_instance(persistent_archive_resource) 

 

        response.configuration = persistent_archive_resource.ResourceObject 

        response.result = 'Deleted' 

        yield self.reply_ok(msg, response) 

 

    @defer.inlineCallbacks 

    def op_create_cache(self, request, headers, msg): 

        """ 

        Service operation: define a new persistent archive object 

         

        These methods need a way to pass the persistent_archive and cache objects together in the 

        request.  

        """ 

        keyspace = request.persistent_archive.name 

        column_family = request.cache_configuration.name 

 

        cache_resource = yield self.rc.create_instance(cassandra_column_family_type, ResourceName=request.cache_configuration.name, ResourceDescription="Apt description") 

        log.info("Created resource") 

        #Set fields of the persistent_archive_resource from persistent_archive 

        self.__set_cache_resource_attrs(request, cache_resource) 

 

 

        yield self.rc.put_instance(cache_resource, "A commit message") 

        log.info("Put resource into datastore") 

        log.info("Creating column family %s.%s" % (keyspace, column_family)) 

        yield self.manager.create_cache(request.persistent_archive, request.cache_configuration) 

 

        response = yield self.mc.create_instance(resource_response_type, MessageName="create_persistent_archive_response") 

 

 

        response.resource_reference = self.rc.reference_instance(cache_resource) 

 

        # pass the current configuration 

        response.configuration = cache_resource.ResourceObject 

 

        # pass the result of the create operation... 

        response.result = 'Created' 

 

        # The following line shows how to reply to a message 

        yield self.reply_ok(msg, response) 

 

 

    @defer.inlineCallbacks 

    def op_update_cache(self, request, headers, msg): 

        """ 

        Service operation: update a cache object 

         

        """ 

        log.info("In op_update_cache method") 

 

        cache_resource = yield self.rc.get_instance(request.resource_reference) 

 

        self.__set_cache_resource_attrs(request, cache_resource) 

 

        """ 

        Copying the column_metadata into the resource.  

        In the future we may want to copy the column_metadata from the request into the  

        cache resource. This will allow us to keep track of the different columns in the ION 

        resource registry.  

        cache = request.cache_configuration 

        index = 0         

        for column in cache.column_metadata: 

            

            cache_resource.column_metadata.add() 

            cache_resource.column_metadata[index] = column 

            index = index + 1 

        """ 

        yield self.rc.put_instance(cache_resource, "A commit message") 

 

        #Do the business logic 

        yield self.manager.update_cache(request.persistent_archive, request.cache_configuration) 

 

        log.info("created response") 

        response = yield self.mc.create_instance(resource_response_type, MessageName="update_cache_response") 

        response.resource_reference = self.rc.reference_instance(cache_resource) 

 

        response.configuration = cache_resource.ResourceObject 

        response.result = 'Updated' 

        yield self.reply_ok(msg, response) 

 

 

    @defer.inlineCallbacks 

    def op_delete_cache(self, request, headers, msg): 

        """ 

        Service operation: remove the cache object 

        """ 

 

        #yield self.rc.put_instance(cache_resource, "A commit message") 

 

        #Do the business logic 

        yield self.manager.remove_cache(request.persistent_archive, request.cache_configuration) 

 

        log.info("created response") 

        response = yield self.mc.create_instance(resource_response_type, MessageName="delete_cache_response") 

        response.result = 'Deleted' 

        yield self.reply_ok(msg, response) 

 

# Spawn of the process using the module name 

factory = ProcessFactory(CassandraManagerService) 

 

 

class CassandraManagerClient(ServiceClient): 

    def __init__(self, proc=None, **kwargs): 

        if not 'targetname' in kwargs: 

            kwargs['targetname'] = 'cassandra_manager_agent' 

        ServiceClient.__init__(self, proc, **kwargs) 

 

    @defer.inlineCallbacks 

    def create_persistent_archive(self, persistent_archive): 

        """ 

        @brief create a new archive 

        @param persistent_archive is an ion resource which defines the properties of a Keyspace 

        """ 

        log.info("Called CassandraManagerClient.create_persistent_archive") 

        (content, headers, msg) = yield self.rpc_send('create_persistent_archive', persistent_archive) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def update_persistent_archive(self, persistent_archive): 

        """ 

        @brief update an archive 

        @param persistent_archive is an ion resource which defines the properties of a Keyspace 

        """ 

        (content, headers, msg) = yield self.rpc_send('update_persistent_archive', persistent_archive) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def delete_persistent_archive(self, persistent_archive): 

        """ 

        @brief remove an archive 

        @param persistent_archive is an ion resource which defines the properties of a Keyspace 

        """ 

        (content, headers, msg) = yield self.rpc_send('delete_persistent_archive', persistent_archive) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def create_cache(self, cassandra_request): 

        """ 

        @brief create a new archive 

        @param cassandra_request is an ion resource which defines the properties of a Keyspace 

        """ 

        log.info("Called CassandraManagerClient.create_cache") 

        (content, headers, msg) = yield self.rpc_send('create_cache', cassandra_request) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def update_cache(self, cassandra_request): 

        """ 

        @brief update an archive 

        @param cassandra_request is an ion resource which defines the properties of a Keyspace 

        """ 

        (content, headers, msg) = yield self.rpc_send('update_cache', cassandra_request) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def delete_cache(self, cassandra_request): 

        """ 

        @brief remove an archive 

        @param cassandra_request is an ion resource which defines the properties of a Keyspace 

        """ 

        (content, headers, msg) = yield self.rpc_send('delete_cache', cassandra_request) 

        defer.returnValue(content)