Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

811

812

813

814

815

816

817

818

819

820

821

822

823

824

825

826

827

828

829

830

831

832

833

834

835

836

837

838

839

840

841

842

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947

948

949

950

951

952

953

954

955

956

957

958

959

960

961

962

963

964

965

966

967

968

969

970

971

972

973

974

975

976

977

978

979

980

981

982

983

984

985

986

987

988

989

990

991

992

993

994

995

996

997

998

999

1000

1001

1002

1003

1004

1005

1006

1007

1008

1009

1010

1011

1012

1013

1014

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060

1061

1062

1063

1064

1065

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1086

1087

1088

1089

1090

1091

1092

1093

1094

1095

1096

1097

1098

1099

1100

1101

1102

1103

1104

1105

1106

1107

1108

1109

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119

1120

1121

1122

1123

1124

1125

1126

1127

1128

1129

1130

1131

1132

1133

1134

1135

1136

1137

1138

1139

1140

1141

1142

1143

1144

1145

1146

1147

1148

1149

1150

1151

1152

1153

1154

1155

1156

1157

1158

1159

1160

1161

1162

1163

1164

1165

1166

1167

1168

1169

1170

1171

1172

1173

1174

1175

1176

1177

1178

1179

1180

1181

1182

1183

1184

1185

1186

1187

1188

1189

1190

1191

1192

1193

1194

1195

1196

1197

1198

1199

1200

1201

1202

1203

1204

1205

1206

1207

1208

1209

1210

1211

1212

1213

1214

1215

1216

1217

1218

1219

1220

1221

1222

1223

1224

1225

1226

1227

1228

1229

1230

1231

1232

1233

1234

1235

1236

1237

1238

1239

1240

1241

1242

1243

1244

1245

1246

1247

1248

1249

1250

1251

1252

1253

1254

1255

1256

1257

1258

1259

1260

1261

1262

1263

1264

1265

1266

1267

1268

1269

1270

1271

1272

1273

1274

1275

1276

1277

1278

1279

1280

1281

1282

1283

1284

1285

1286

1287

1288

1289

1290

1291

1292

1293

1294

1295

1296

1297

1298

1299

1300

1301

1302

1303

1304

1305

1306

1307

1308

1309

1310

1311

1312

1313

1314

1315

1316

1317

1318

1319

1320

1321

1322

1323

1324

1325

1326

1327

1328

1329

1330

1331

1332

1333

1334

1335

1336

1337

1338

1339

1340

1341

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1352

1353

1354

1355

1356

1357

1358

1359

1360

1361

1362

1363

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1374

1375

1376

1377

1378

1379

1380

1381

1382

1383

1384

1385

1386

1387

1388

1389

1390

1391

1392

1393

1394

1395

1396

1397

1398

1399

1400

1401

1402

1403

1404

1405

1406

1407

1408

1409

1410

1411

1412

1413

1414

1415

1416

1417

1418

1419

1420

1421

1422

1423

1424

1425

1426

1427

1428

1429

1430

1431

1432

1433

1434

1435

1436

1437

1438

1439

1440

1441

1442

1443

1444

1445

1446

1447

1448

1449

1450

1451

1452

1453

1454

1455

1456

1457

1458

1459

1460

1461

1462

1463

1464

1465

1466

1467

1468

1469

1470

1471

1472

1473

1474

1475

1476

1477

1478

1479

1480

1481

1482

1483

1484

1485

1486

1487

1488

1489

1490

1491

1492

1493

1494

1495

1496

1497

1498

1499

1500

1501

1502

1503

1504

1505

1506

1507

1508

1509

1510

1511

1512

1513

1514

1515

1516

1517

1518

1519

1520

1521

1522

1523

1524

1525

1526

1527

1528

1529

1530

1531

1532

1533

1534

1535

1536

1537

1538

1539

1540

1541

1542

1543

1544

1545

1546

1547

1548

1549

1550

1551

1552

1553

1554

1555

1556

1557

1558

1559

1560

1561

1562

1563

1564

1565

1566

1567

1568

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578

1579

1580

1581

1582

1583

1584

1585

1586

1587

1588

1589

1590

1591

1592

1593

1594

1595

1596

1597

1598

1599

1600

1601

1602

1603

1604

1605

1606

1607

1608

1609

1610

1611

1612

1613

1614

1615

1616

1617

1618

1619

1620

1621

1622

1623

1624

1625

1626

1627

1628

1629

1630

1631

1632

1633

1634

1635

1636

1637

1638

1639

1640

1641

1642

1643

1644

1645

1646

1647

1648

1649

1650

1651

1652

1653

1654

1655

1656

1657

1658

1659

1660

1661

1662

1663

1664

1665

1666

1667

1668

1669

1670

1671

1672

1673

1674

1675

1676

1677

1678

1679

1680

1681

1682

1683

1684

1685

1686

1687

1688

1689

1690

1691

1692

1693

1694

1695

1696

1697

1698

1699

1700

1701

1702

1703

1704

1705

1706

1707

1708

1709

1710

1711

1712

1713

1714

1715

1716

1717

1718

1719

1720

1721

1722

1723

1724

1725

1726

1727

1728

1729

1730

1731

1732

1733

1734

1735

1736

1737

1738

1739

1740

1741

1742

1743

1744

1745

1746

1747

1748

1749

1750

1751

1752

1753

1754

1755

1756

1757

1758

1759

1760

1761

1762

1763

1764

1765

1766

1767

1768

1769

1770

1771

1772

1773

1774

1775

1776

1777

1778

1779

1780

1781

1782

1783

1784

1785

1786

1787

1788

1789

1790

1791

1792

1793

1794

1795

1796

1797

1798

1799

1800

1801

1802

1803

1804

1805

1806

1807

1808

1809

1810

1811

1812

1813

1814

1815

1816

1817

1818

1819

1820

1821

1822

1823

1824

1825

1826

1827

1828

1829

1830

1831

1832

1833

1834

1835

1836

1837

1838

1839

1840

1841

1842

1843

1844

1845

1846

1847

1848

1849

1850

1851

1852

1853

1854

1855

1856

1857

1858

1859

1860

1861

1862

1863

1864

1865

1866

1867

1868

1869

1870

1871

1872

1873

1874

1875

1876

1877

1878

1879

1880

1881

1882

1883

1884

1885

1886

1887

1888

1889

1890

1891

1892

1893

1894

1895

1896

1897

1898

1899

1900

1901

1902

1903

1904

1905

1906

1907

1908

1909

1910

1911

1912

1913

1914

1915

1916

1917

1918

1919

1920

1921

1922

1923

1924

1925

1926

1927

1928

1929

1930

1931

1932

1933

1934

1935

1936

1937

1938

1939

1940

1941

1942

1943

1944

1945

1946

1947

1948

1949

1950

1951

1952

1953

1954

1955

1956

1957

1958

1959

1960

1961

1962

1963

1964

1965

1966

1967

1968

1969

1970

1971

1972

1973

1974

1975

1976

1977

1978

1979

1980

1981

1982

1983

1984

1985

1986

1987

1988

1989

1990

1991

1992

1993

1994

1995

1996

1997

1998

1999

2000

2001

2002

2003

2004

2005

2006

2007

2008

2009

2010

2011

2012

2013

2014

2015

2016

2017

2018

2019

2020

2021

2022

2023

2024

2025

2026

2027

2028

2029

2030

2031

2032

2033

2034

2035

2036

2037

2038

2039

2040

2041

2042

2043

2044

2045

2046

2047

2048

2049

2050

2051

2052

2053

2054

2055

2056

2057

2058

2059

2060

2061

2062

2063

2064

2065

2066

2067

2068

2069

2070

2071

2072

2073

2074

2075

2076

2077

2078

2079

2080

2081

2082

2083

2084

2085

2086

2087

2088

2089

2090

2091

2092

2093

2094

2095

2096

2097

2098

2099

2100

2101

2102

2103

2104

2105

2106

2107

2108

2109

2110

2111

2112

2113

2114

2115

2116

2117

2118

2119

2120

2121

2122

2123

2124

2125

2126

2127

2128

2129

2130

2131

2132

2133

2134

2135

2136

2137

2138

2139

2140

2141

2142

2143

2144

2145

2146

2147

2148

2149

2150

2151

2152

2153

2154

2155

2156

2157

2158

2159

2160

2161

2162

2163

2164

2165

2166

2167

2168

#!/usr/bin/env python 

 

""" 

@file ion/services/coi/datastore.py 

@author David Stuebe 

@author Matt Rodriguez 

@author Dave Foster <dfoster@asascience.com> 

@TODO Deal with preload issue - must be possible to run preload with partial datastore. IE if predicates are already 

there but resource types are not... 

 

""" 

import logging 

from ion.core.object.object_utils import CDM_ARRAY_INT32_TYPE, CDM_ARRAY_INT64_TYPE, CDM_ARRAY_UINT64_TYPE, CDM_ARRAY_FLOAT32_TYPE, CDM_ARRAY_FLOAT64_TYPE, CDM_ARRAY_STRING_TYPE, CDM_ARRAY_OPAQUE_TYPE, CDM_ARRAY_UINT32_TYPE, ARRAY_STRUCTURE_TYPE, sha1_to_hex 

from ion.util.cache import LRUDict 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

from twisted.internet import defer 

 

import ion.util.procutils as pu 

from ion.core.process.process import ProcessFactory 

from ion.core.process.service_process import ServiceProcess, ServiceClient 

from ion.core.exception import ReceivedError, ApplicationError 

 

from ion.services.coi.resource_registry import resource_client 

from ion.core.messaging.message_client import MessageClient 

from types import FunctionType 

import math 

 

from ion.core.object import object_utils 

from ion.core.object import gpb_wrapper, repository 

from ion.core.object.workbench import WorkBench, WorkBenchError, PUSH_MESSAGE_TYPE, PULL_MESSAGE_TYPE, PULL_RESPONSE_MESSAGE_TYPE, BLOBS_REQUSET_MESSAGE_TYPE, BLOBS_MESSAGE_TYPE, GET_OBJECT_REQUEST_MESSAGE_TYPE, GET_OBJECT_REPLY_MESSAGE_TYPE, GPBTYPE_TYPE, DATA_REQUEST_MESSAGE_TYPE, DATA_REPLY_MESSAGE_TYPE, DATA_CHUNK_MESSAGE_TYPE, GET_LCS_REQUEST_MESSAGE_TYPE, GET_LCS_RESPONSE_MESSAGE_TYPE 

from ion.core.data import store 

from ion.core.data import cassandra 

#from ion.core.data import cassandra_bootstrap 

from ion.core.data.store import Query 

 

 

from ion.core.data.storage_configuration_utility import BLOB_CACHE, COMMIT_CACHE 

from ion.core.data.storage_configuration_utility import COMMIT_INDEXED_COLUMNS 

from ion.core.data.storage_configuration_utility import REPOSITORY_KEY, BRANCH_NAME 

 

from ion.core.data.storage_configuration_utility import SUBJECT_KEY, SUBJECT_BRANCH, SUBJECT_COMMIT 

from ion.core.data.storage_configuration_utility import PREDICATE_KEY, PREDICATE_BRANCH, PREDICATE_COMMIT 

from ion.core.data.storage_configuration_utility import OBJECT_KEY, OBJECT_BRANCH, OBJECT_COMMIT, STORAGE_PROVIDER, PERSISTENT_ARCHIVE 

 

from ion.core.data.storage_configuration_utility import KEYWORD, VALUE, RESOURCE_OBJECT_TYPE, RESOURCE_LIFE_CYCLE_STATE, get_cassandra_configuration 

 

 

from ion.services.coi.datastore_bootstrap.ion_preload_config import ION_DATASETS, ION_PREDICATES, ION_RESOURCE_TYPES, ION_IDENTITIES, ION_DATA_SOURCES, ION_ROLES, AUTHENTICATED_ROLE_ID 

from ion.services.coi.datastore_bootstrap.ion_preload_config import ID_CFG, TYPE_CFG, PREDICATE_CFG, PRELOAD_CFG, NAME_CFG, DESCRIPTION_CFG, CONTENT_CFG, CONTENT_ARGS_CFG, LCS_CFG, COMMISSIONED 

from ion.services.coi.datastore_bootstrap.ion_preload_config import ION_PREDICATES_CFG, ION_DATASETS_CFG, ION_RESOURCE_TYPES_CFG, ION_IDENTITIES_CFG, root_name, HAS_A_ID, ADMIN_ROLE_ID 

 

from ion.services.coi.datastore_bootstrap.ion_preload_config import TypeMap, ANONYMOUS_USER_ID, ROOT_USER_ID, OWNED_BY_ID, ION_AIS_RESOURCES, ION_AIS_RESOURCES_CFG, OWNER_ID, HAS_ROLE_ID 

 

from ion.core import ioninit 

CONF = ioninit.config(__name__) 

 

 

LINK_TYPE = object_utils.create_type_identifier(object_id=3, version=1) 

COMMIT_TYPE = object_utils.create_type_identifier(object_id=8, version=1) 

MUTABLE_TYPE = object_utils.create_type_identifier(object_id=6, version=1) 

STRUCTURE_ELEMENT_TYPE = object_utils.create_type_identifier(object_id=1, version=1) 

 

ASSOCIATION_TYPE = object_utils.create_type_identifier(object_id=13, version=1) 

TERMINOLOGY_TYPE = object_utils.create_type_identifier(object_id=14, version=1) 

IDREF_TYPE = object_utils.create_type_identifier(object_id=4, version=1) 

 

RESOURCE_TYPE = object_utils.create_type_identifier(object_id=1102, version=1) 

 

CDM_BOUNDED_ARRAY_TYPE = object_utils.create_type_identifier(object_id=10021, version=1) 

 

class NDArrayWrap(object): 

    """ 

    Helper object which wraps an ndarray GPB object. 

    The NDArrayWrap is designed to be stored in an LRUDict, as it exposes __sizeof__, clear, and 

    a property to load/retrieve the ndarray's value. 

    """ 

    def __init__(self, key, repo, bounds, itembytes, getblobs): 

        """ 

        Constructor. Needs references to several pieces of information to correctly get an ndarray 

        and calculate its size. 

 

        @param  key         The ndarray's key. 

        @param  repo        A reference to the repository to load objects into. 

        @param  bounds      The bounds of the ndarray. Used to calc size. 

        @param  itembytes   Number of bytes per item. Based on the array's data type. 

        @param  getblobs    A reference to the workbench's _get_blobs callable. 

        """ 

        self._key = key 

        self._repo = repo 

        self._getblobs = getblobs 

 

        self._ndarray = None 

        if len(bounds) == 0: 

            self._size = itembytes      # scalar value, just one itembytes size 

        else: 

            self._size = reduce(lambda x,y:x*y, [x.size for x in bounds]) * itembytes 

 

    def __sizeof__(self): 

        """ 

        Returns the calculated size of this ndarray. 

        """ 

        return self._size 

 

    def clear(self): 

        """ 

        This method is called by an LRUDict when it is being removed (due to size constraints etc). 

        Removes this ndarray from the associated repo to free memory. 

        """ 

        log.debug("NDArrayWrap object clearing") 

        # remove from repo's index_hash if it exists 

        if self._repo.index_hash.has_key(self._key): 

            del self._repo.index_hash[self._key] 

 

    @defer.inlineCallbacks 

    def _get_value(self): 

        """ 

        Loads/retrieves an ndarray. Lazy-loads the ndarray from the datastore. Access this via 

        the value property. 

        """ 

        if self._ndarray is None: 

            ndblobs = yield self._getblobs(self._repo, [self._key], lambda x: True) 

            self._repo.index_hash.update(ndblobs) 

 

            self._ndarray = self._repo._load_element(self._repo.index_hash[self._key]) 

 

        defer.returnValue(self._ndarray.value) 

 

    value = property(_get_value) 

 

class NDArrayLRUDict(LRUDict): 

    """ 

    Custom least-recently-used dictionary cache object for holding NDarrays. 

    This dictionary is used by DataStoreWorkbench's op_extract_data method. 

 

    Should only store NDArrayWrap objects. This derived class is mainly to provide a 

    helper method to get an ndarray whether it exists in the cache, is loaded, anything. 

    """ 

    def __init__(self, limit, repo): 

        """ 

        Constructor. Sets repo ref, initializes LRUDict with use_size set to true. 

        """ 

        self._repo = repo 

 

        LRUDict.__init__(self, limit, use_size=True) 

 

    @defer.inlineCallbacks 

    def get_ndarray_value(self, key, bounds, itembytes, getblobs): 

        """ 

        Gets an ndarray's value, whether that ndarray is loaded, in the cache, or what have you. 

        Even if the ndarray is actually too large to store in the cache, it will still give you 

        back the ndarray object to work with this one time. 

        """ 

        if not self.has_key(key): 

            ndarray = NDArrayWrap(key, self._repo, bounds, itembytes, getblobs) 

            self[key] = ndarray 

            log.debug("LRUDict loading, item size %d, lru now %d items %d bytes total" % (ndarray._size, len(self.keys()), self.total_size)) 

        else: 

            ndarray = self.get(key) 

 

        value = yield ndarray.value 

        defer.returnValue(value) 

 

class DataStoreWorkBenchError(WorkBenchError): 

    """ 

    An Exception class for errors in the data store workbench 

    """ 

 

class DataStoreWorkbench(WorkBench): 

 

 

    def __init__(self, process, blob_store, commit_store, cache_size=10**8): 

 

        WorkBench.__init__(self, process, cache_size) 

 

        self._blob_store = blob_store 

        self._commit_store = commit_store 

 

 

    def pull(self, *args, **kwargs): 

 

        raise NotImplementedError("The Datastore Service can not Pull") 

 

    def push(self, *args, **kwargs): 

 

        raise NotImplementedError("The Datastore Service can not Push") 

 

    @defer.inlineCallbacks 

    def _get_blobs(self, repo, startkeys, filtermethod=None): 

        """ 

        Common blob fetching helper method. 

        Used by checkout and pull. 

 

        @param  repo            Repository for the response. 

        @param  startkeys       The keys that should start the fetching process. 

        @param  filtermethod    A callable to be applied to all children of fetched items. If the callable returns true, 

                                the item is included. 

 

        @returns                A dictionary of keys => blobs. 

        """ 

        # Slightly different machinary here than in the workbench - Could be made more similar? 

        blobs={} 

        keys_to_get=set(startkeys) 

        def_filter = lambda x: True 

        filtermethod = filtermethod or def_filter 

 

        objects = {} 

        new_links_to_get = set() 

        while len(keys_to_get) > 0: 

            new_links_to_get.clear() 

 

            def_list = [] 

            #@TODO - put some error checking here so that we don't overflow due to a stupid request! 

            for key in keys_to_get: 

                # Short cut if we have already got it! 

                wse = repo.index_hash.get(key) 

 

                if wse: 

                    blobs[wse.key]=wse 

                    # get the object 

 

                    obj = repo._load_element(wse) 

 

                    objects[key] = obj 

 

                    # only add new items to get if they meet our criteria, meaning they are not in the excluded type list 

                    new_links_to_get.update(obj.ChildLinks) 

                else: 

                    def_list.append((self._blob_store.get(key), key)) 

 

 

            result_list = yield defer.DeferredList([x[0] for x in def_list]) 

            dl_fails = filter(lambda x: not x[1][0], enumerate(result_list)) 

            if len(dl_fails) > 0: 

                msg = "Errors (%s) getting link from blob store\n\n" % len(dl_fails) 

                for idx, d_res in dl_fails: 

                    msg += "Key: %s, Failure: %s\n" % (sha1_to_hex(def_list[idx][1]), str(d_res[1])) 

 

                raise DataStoreWorkBenchError(msg) 

 

            # now, let's check for Nones to get a summary of errors 

            dl_nones = filter(lambda x: x[1][1] is None, enumerate(result_list)) 

            if len(dl_nones) > 0: 

                msg = "Blobs not found in blob store (%d)" % len(dl_nones) 

                for idx, d_res in dl_nones: 

                    msg += "Key: %s" % sha1_to_hex(def_list[idx][1]) 

 

                raise DataStoreWorkBenchError(msg) 

 

            for result, blob in result_list: 

                # these should never happen becuase we check for them above, but leaving them in for now... 

                assert result==True, 'Error getting link from blob store!' 

                assert blob is not None, 'Blob not found in blob store!' 

 

                wse = gpb_wrapper.StructureElement.parse_structure_element(blob) 

                blobs[wse.key]=wse 

 

                # Add it to the repository index 

                repo.index_hash[wse.key] = wse 

 

                # load the object so we can find its children 

                obj = repo._load_element(wse) 

 

                new_links_to_get.update(obj.ChildLinks) 

 

            keys_to_get.clear() 

            for link in new_links_to_get: 

                if not blobs.has_key(link.key) and filtermethod(link): 

                    keys_to_get.add(link.key) 

 

            for obj in objects.itervalues(): 

                obj.Invalidate() 

 

            objects.clear() 

 

        defer.returnValue(blobs) 

        #return blobs 

 

    @defer.inlineCallbacks 

    def _resolve_repo_state(self, repository_key, fail_if_not_found=True): 

        """ 

        @returns Repo. 

        """ 

 

        log.info('_resolve_repo_state: start') 

 

        repo = self.get_repository(repository_key) 

        if repo is None: 

            #if it does not exist make a new one 

            log.debug('Repository is not loaded - get it from the persistent store') 

 

            repo = repository.Repository(repository_key=repository_key) 

            self.put_repository(repo) 

        else: 

            log.debug('Repository is loaded - merge it with the state in the persistent store') 

 

 

        # Must reconstitute the head and merge with existing 

        mutable_cls = object_utils.get_gpb_class_from_type_id(MUTABLE_TYPE) 

        new_head = repo._wrap_message_object(mutable_cls(), addtoworkspace=False) 

        new_head.repositorykey = repository_key 

 

        q = Query() 

        q.add_predicate_eq(REPOSITORY_KEY, repository_key) 

 

        rows = yield self._commit_store.query(q) 

 

        if fail_if_not_found and len(rows) == 0: 

            self.clear_repository(repo) 

            raise DataStoreWorkBenchError('Repository Key "%s" not found in Datastore' % repository_key, 404)   # @TODO: constant 

 

        log.debug('Found %d commits in the store' % len(rows)) 

 

        for key, columns in rows.items(): 

 

            if key not in repo.index_hash: 

                blob = columns[VALUE] 

                wse = gpb_wrapper.StructureElement.parse_structure_element(blob) 

                repo.index_hash[key] = wse 

            else: 

                wse = repo.index_hash.get(key) 

 

 

            if columns[BRANCH_NAME]: 

                # If this appears to be a head commit 

 

                # Deal with the possiblity that more than one branch points to the same commit 

                branch_names = columns[BRANCH_NAME].split(',') 

 

 

                for name in branch_names: 

 

                    for branch in new_head.branches: 

                        # if the branch already exists in the new_head just add a commitref 

                        if branch.branchkey == name: 

                            link = branch.commitrefs.add() 

                            break 

                    else: 

                        # If not add a new branch 

                        branch = new_head.branches.add() 

                        branch.branchkey = name 

                        link = branch.commitrefs.add() 

 

                    if key not in repo._commit_index: 

                        cref = repo._load_element(wse) 

 

                        ### DO NOT ADD IT TO THE COMMIT INDEX - THE STATE OF THE COMMIT INDEX IS USED IN UPDATING TO THE HEAD! 

                        #repo._commit_index[cref.MyId]=cref 

                        cref.ReadOnly = True 

                    else: 

                        cref = repo._commit_index.get(key) 

 

                    link.SetLink(cref) 

                    link.isleaf=False 

 

                # Check to make sure the mutable is upto date with the commits... 

 

        # Do the update! 

        self._update_repo_to_head(repo, new_head) 

 

 

        log.info('_resolve_repo_state: complete') 

 

        # return repository 

        defer.returnValue(repo) 

 

    @defer.inlineCallbacks 

    def op_pull(self,request, headers, msg): 

        """ 

        The operation which responds to a pull request 

 

        The pull is much higher heat that I would like - it requires decoding the serialized blobs. 

        We should consider storing the child links of each element external to the element - but then the put is 

        high heat... Not sure what to do. 

        """ 

 

        log.info('op_pull!') 

 

        if not hasattr(request, 'MessageType') or request.MessageType != PULL_MESSAGE_TYPE: 

            raise DataStoreWorkBenchError('Invalid pull request. Bad Message Type!', request.ResponseCodes.BAD_REQUEST) 

 

        repo = yield self._resolve_repo_state(request.repository_key) 

        repo.cached = True 

 

        #### 

        # Back to boiler plate op_pull 

        #### 

 

        my_commits = self.list_repository_commits(repo) 

 

        puller_has = request.commit_keys 

 

        puller_needs = set(my_commits).difference(puller_has) 

 

        response = yield self._process.message_client.create_instance(PULL_RESPONSE_MESSAGE_TYPE) 

 

        # Create a structure element and put the serialized content in the response 

        head_element = self.serialize_mutable(repo._dotgit) 

        # Pull out the structure element and use it as the linked object in the message. 

        obj = response.Repository._wrap_message_object(head_element._element) 

 

        response.repo_head_element = obj 

 

        for commit_key in puller_needs: 

            commit_element = repo.index_hash.get(commit_key) 

            if commit_element is None: 

                raise DataStoreWorkBenchError('Repository commit object not found in op_pull', request.ResponseCodes.NOT_FOUND) 

            link = response.commit_elements.add() 

            obj = response.Repository._wrap_message_object(commit_element._element) 

            link.SetLink(obj) 

 

        if request.get_head_content: 

 

            keys = [x.GetLink('objectroot').key for x in repo.current_heads()] 

 

 

            def filtermethod(x): 

                """ 

                Returns true if the passed in link's type is not in the excluded_types list of the passed in message. 

                """ 

                return (x.type not in request.excluded_types) 

 

            blobs = yield self._get_blobs(response.Repository, keys, filtermethod) 

 

            #log.critical( "OBJ GRAPH") 

            #import objgraph 

            #objgraph.show_growth() 

 

            #def log_repo(): 

            #    log.critical("CALLING Clear!!!") 

            #setattr(response.Repository,'noisy_clear', log_repo) 

 

 

            for element in blobs.itervalues(): 

 

 

                link = response.blob_elements.add() 

                obj = response.Repository._wrap_message_object(element._element) 

                link.SetLink(obj) 

 

                #def log_wrapper(): 

                #    log.critical("CALLING INVALIDATE!!!\n%s" % obj.Debug()) 

                #setattr(obj, 'noisy_invalidate',log_wrapper) 

 

 

        yield self._process.reply_ok(msg, content=response) 

 

        log.info('op_pull: Complete!') 

 

 

 

    @defer.inlineCallbacks 

    def op_push(self, pushmsg, headers, msg): 

        """ 

        The Operation which responds to a push. 

 

        Operation does not complete until transfer is complete! 

        """ 

        log.info('op_push!') 

 

        if not hasattr(pushmsg, 'MessageType') or pushmsg.MessageType != PUSH_MESSAGE_TYPE: 

            raise DataStoreWorkBenchError('Invalid push request. Bad Message Type!', pushmsg.ResponseCodes.BAD_REQUEST) 

 

        # A dictionary of the new commits received in the push - sorted by repository 

        new_commits={} 

 

        # A list of the blobs received - does not matter what repo they are in - just jam them into the store 

        new_blob_keys =[] 

 

 

        for repostate in pushmsg.repositories: 

 

 

            repo = yield self._resolve_repo_state(repostate.repository_key, fail_if_not_found=False) 

            repo.cached = True 

 

            repo_keys = set(self.list_repository_blobs(repo)) 

 

            # add a new entry in the new_commits dictionary to store the commits of the push for this repo 

            new_commits[repo.repository_key] = [] 

 

            # Get the set of keys in repostate that are not in repo_keys 

            need_keys = set(repostate.blob_keys).difference(repo_keys) 

 

            workbench_keys = set(self._workbench_cache.keys()) 

 

            local_keys = workbench_keys.intersection(need_keys) 

 

 

            def_commit_list = [] 

            def_blob_list = [] 

            key_list = [] 

            for key in local_keys: 

                try: 

                    repo.index_hash.get(key) 

                    need_keys.remove(key) 

                    continue 

                except KeyError, ke: 

                    log.info('Key disappeared - get it from the remote after all') 

 

                # @TODO Assumption is that this check is less costly than getting it from the remote service 

                key_list.append(key) 

                def_commit_list.append((self._commit_store.has_key(key), key)) 

                def_blob_list.append((self._blob_store.has_key(key), key)) 

 

            if key_list: 

                result_commit_list = yield defer.DeferredList([x[0] for x in def_commit_list]) 

                result_blob_list = yield defer.DeferredList([x[0] for x in def_blob_list]) 

 

                # find issues with either list 

                cl_fails = filter(lambda x: not x[1][0], enumerate(result_commit_list)) 

                bl_fails = filter(lambda x: not x[1][0], enumerate(result_blob_list)) 

 

                if len(cl_fails) > 0 or len(bl_fails) > 0: 

                    msg = "Push had errors (%d) on blob/commit store has_key:\n\n" % len(cl_fails) + len(bl_fails) 

                    for idx, cfail in cl_fails: 

                        msg += "C Key: %s, Failure %s\n" % (sha1_to_hex(def_commit_list[idx][1]), str(cfail[1])) 

                    for idx, bfail in bl_fails: 

                        msg += "B Key: %s, Failure %s\n" % (sha1_to_hex(def_blob_list[idx][1]), str(cfail[1])) 

 

                    # no local modifications at this point - don't need to clear 

                    raise DataStoreWorkBenchError(msg) 

 

                # Remove 

                for key, res1, have_blob, res2, have_commit in zip(key_list, result_blob_list, result_commit_list): 

 

                    if have_blob or have_commit: 

                        need_keys.remove(key) 

 

            if len(need_keys) > 0: 

                blobs_request = yield self._process.message_client.create_instance(BLOBS_REQUSET_MESSAGE_TYPE) 

                blobs_request.blob_keys.extend(need_keys) 

 

                try: 

                    blobs_msg = yield self.fetch_blobs(headers.get('reply-to'), blobs_request) 

                except ReceivedError, re: 

 

                   log.debug('ReceivedError', str(re)) 

 

                   # no local modifications at this point - don't need to clear 

                   raise DataStoreWorkBenchError('Fetch Objects returned an exception! "%s"' % re.msg_content) 

 

 

                for se in blobs_msg.blob_elements: 

                    # Put the new objects in the repository 

                    element = gpb_wrapper.StructureElement(se.GPBMessage) 

                    repo.index_hash[element.key] = element 

 

                    if element.type == COMMIT_TYPE: 

                        new_commits[repo.repository_key].append(element.key) 

                    else: 

                        new_blob_keys.append(element.key) 

 

 

            # Move over the new head object 

            head_element = gpb_wrapper.StructureElement(repostate.repo_head_element.GPBMessage) 

            new_head = repo._load_element(head_element) 

            new_head.Modified = True 

            new_head.MyId = repo.new_id() 

 

            # Now merge the state! 

            self._update_repo_to_head(repo,new_head) 

 

        # Put any new blobs 

        def_list = [] 

        for key in new_blob_keys: 

 

            element = self._workbench_cache.get(key) 

 

            def_list.append(self._blob_store.put(key, element.serialize())) 

 

        # we need to check problems in the put here 

        dl_res = yield defer.DeferredList(def_list) 

        dl_fails = filter(lambda x: not x[1][0], enumerate(dl_res)) 

        if len(dl_fails) > 0: 

            msg = "Errors (%s) putting blob to blob store\n\n" % len(dl_fails) 

            for idx, d_res in dl_fails: 

                msg += "%s\nFailure: %s\n\n" % (str(self._workbench_cache.get(new_blob_keys[idx])), str(d_res[1])) 

 

            for repostate in pushmsg.repositories: 

                self.clear_repository_key(repostate.repository_key) 

 

            raise DataStoreWorkBenchError(msg) 

 

        # now put any new commits that are not at the head 

        def_list = [] 

 

        # list of the keys which are no longer heads 

        clear_head_list=[] 

 

        # list of the new heads to push at the same time 

        new_head_list=[] 

        for repo_key, commit_keys in new_commits.items(): 

            # Get the updated repository 

            repo = self.get_repository(repo_key) 

 

            # any objects in the data structure that were transmitted have already 

            # been updated now it is time to set update the commits 

            # 

 

            branch_names = [] 

            for branch in repo.branches: 

                branch_names.append(branch.branchkey) 

 

            head_keys = [] 

            for cref in repo.current_heads(): 

                head_keys.append( cref.MyId ) 

 

            for key in commit_keys: 

 

                # Set the repository name for the commit 

                attributes = {REPOSITORY_KEY : str(repo_key)} 

                # Set a default branch name to empty 

                attributes[BRANCH_NAME] = '' 

 

                cref = repo._commit_index.get(key) 

 

                # it may not have been loaded during the update process - if not load it now. 

                if not cref: 

                    element = repo.index_hash.get(key) 

                    cref = repo._load_element(element) 

 

                link = cref.GetLink('objectroot') 

                # Extract the GPB Message for comparison with type objects! 

                root_type = link.type.GPBMessage 

 

 

                if root_type == ASSOCIATION_TYPE: 

 

                    attributes[SUBJECT_KEY] = cref.objectroot.subject.key 

                    attributes[SUBJECT_BRANCH] = cref.objectroot.subject.branch 

                    attributes[SUBJECT_COMMIT] = cref.objectroot.subject.commit 

 

                    attributes[PREDICATE_KEY] = cref.objectroot.predicate.key 

                    attributes[PREDICATE_BRANCH] = cref.objectroot.predicate.branch 

                    attributes[PREDICATE_COMMIT] = cref.objectroot.predicate.commit 

 

                    attributes[OBJECT_KEY] = cref.objectroot.object.key 

                    attributes[OBJECT_BRANCH] = cref.objectroot.object.branch 

                    attributes[OBJECT_COMMIT] = cref.objectroot.object.commit 

 

                elif root_type == RESOURCE_TYPE: 

 

 

                    attributes[RESOURCE_OBJECT_TYPE] = cref.objectroot.resource_type.key 

                    attributes[RESOURCE_LIFE_CYCLE_STATE] = str(cref.objectroot.lcs) 

 

 

                elif  root_type == TERMINOLOGY_TYPE: 

                    attributes[KEYWORD] = cref.objectroot.word 

 

                # get the wrapped structure element to put in... 

                wse = self._workbench_cache.get(key) 

 

 

                if key not in head_keys: 

 

                    defd = self._commit_store.put(key = key, 

                                       value = wse.serialize(), 

                                       index_attributes = attributes) 

                    def_list.append((defd, key, wse)) 

 

                else: 

 

                    # We know it is a head - but we need to get the branch name again 

                    for branch in  repo.branches: 

                        # If this is currently the head commit - set the branch name attribute 

                        if cref in branch.commitrefs: 

                            # If this is currently the head commit - set the branch name 

                            if attributes[BRANCH_NAME] == '': 

                                attributes[BRANCH_NAME] = branch.branchkey 

                            else: 

                                attributes[BRANCH_NAME] = ','.join([attributes[BRANCH_NAME],branch.branchkey]) 

 

 

 

                    new_head_list.append({'key':key, 'value':wse.serialize(), 'index_attributes':attributes}) 

 

            # Get the current head list 

            q = Query() 

            q.add_predicate_eq(REPOSITORY_KEY, repo_key) 

            q.add_predicate_gt(BRANCH_NAME, '') 

 

            rows = yield self._commit_store.query(q) 

 

            for key, columns in rows.items(): 

                if key not in head_keys: 

                    clear_head_list.append(key) 

 

                    # Any commit which is currently a head will have the correct branch names set. 

                    # Just delete the branch names for the ones that are no longer heads. 

 

        dl_res = yield defer.DeferredList([x[0] for x in def_list]) 

        dl_fails = filter(lambda x: not x[1][0], enumerate(dl_res)) 

        if len(dl_fails) > 0: 

            msg = "Errors (%s) putting commit to store\n\n" % len(dl_fails) 

            for idx, d_res in dl_fails: 

                _, ckey, cwse = def_list[idx] 

                msg += "Key: %s\nElement: %s\nFailure: %s" % (sha1_to_hex(ckey), str(cwse), str(d_res[1])) 

 

            for repostate in pushmsg.repositories: 

                self.clear_repository_key(repostate.repository_key) 

 

            raise DataStoreWorkBenchError(msg) 

 

        def_list = [] 

        for new_head in new_head_list: 

 

            def_list.append(self._commit_store.put(**new_head)) 

 

        dl_res = yield defer.DeferredList(def_list) 

        dl_fails = filter(lambda x: not x[1][0], enumerate(dl_res)) 

        if len(dl_fails) > 0: 

            msg = "Errors (%s) putting new_head_list commit to store\n\n" % len(dl_fails) 

            for idx, d_res in dl_fails: 

                nhlkey = new_head_list[idx]['key'] 

                nhlval = new_head_list[idx]['value'] 

                msg += "Key: %s\nElement: %s\nFailure: %s" % (sha1_to_hex(nhlkey), str(nhlval), str(d_res[1])) 

 

            for repostate in pushmsg.repositories: 

                self.clear_repository_key(repostate.repository_key) 

 

            raise DataStoreWorkBenchError(msg) 

 

        def_list = [] 

        for key in clear_head_list: 

 

            def_list.append((self._commit_store.update_index(key=key, index_attributes={BRANCH_NAME:''}), key)) 

 

        dl_res = yield defer.DeferredList([x[0] for x in def_list]) 

        dl_fails = filter(lambda x: not x[1][0], enumerate(dl_res)) 

        if len(dl_fails) > 0: 

            msg = "Errors (%s) updating index to commit store\n\n" % len(dl_fails) 

            for idx, d_res in dl_fails: 

                key = def_list[idx][1] 

                msg += "Key: %s, Failure: %s" % (sha1_to_hex(key), str(d_res[1])) 

 

            for repostate in pushmsg.repositories: 

                self.clear_repository_key(repostate.repository_key) 

 

            raise DataStoreWorkBenchError(msg) 

 

        #import pprint 

        #print 'After update to heads' 

        #pprint.pprint(self._commit_store.kvs) 

 

 

        response = yield self._process.message_client.create_instance(MessageContentTypeID=None) 

        response.MessageResponseCode = response.ResponseCodes.OK 

 

        # The following line shows how to reply to a message 

        yield self._process.reply_ok(msg, response) 

        log.info('op_push: Complete!') 

 

    @defer.inlineCallbacks 

    def op_get_lcs(self, request, headers, msg): 

        ''' 

        test: get list of ids, return list of tuples of ids -> LCSs 

        ''' 

 

        log.info("op_get_lcs") 

 

        if not hasattr(request, 'MessageType') or request.MessageType != GET_LCS_REQUEST_MESSAGE_TYPE: 

            raise DataStoreWorkBenchError('Invalid put blobs request. Bad Message Type!', request.ResponseCodes.BAD_REQUEST) 

 

        response = yield self._process.message_client.create_instance(GET_LCS_RESPONSE_MESSAGE_TYPE) 

 

        for repo_key in request.keys: 

            q = Query() 

            q.add_predicate_eq(REPOSITORY_KEY, repo_key) 

            q.add_predicate_gt(BRANCH_NAME, '') 

 

            rows = yield self._commit_store.query(q) 

            if len(rows) == 0: 

                from net.ooici.core.message.ion_message_pb2 import NOT_FOUND 

                raise DataStoreWorkBenchError("op_get_lcs: Repo key (%s) has no rows in store" % repo_key, response_code=NOT_FOUND) 

            if len(rows) > 1: 

                # more than one branch - divergent state, but they may all agree on the LCS, so check that! 

                lcses = [x[RESOURCE_LIFE_CYCLE_STATE] for x in rows.values()] 

                lcsset = set(lcses) 

                if len(lcsset) > 0: 

                    raise DataStoreWorkBenchError("op_get_lcs: Multiple branch heads with differing LCS found for repo key (%s), cannot determine newest" % repo_key) 

 

            key_lcs_pair = response.key_lcs_pairs.add() 

            key_lcs_pair.key = repo_key 

            key_lcs_pair.lcs = int(rows.values()[0][RESOURCE_LIFE_CYCLE_STATE]) 

 

            if log.getEffectiveLevel() <= logging.DEBUG: 

                log.debug("repo_key: %s, LCS: %s" % (repo_key, str(key_lcs_pair.lcs))) 

 

        yield self._process.reply_ok(msg, response) 

        log.info("/op_get_lcs") 

 

    @defer.inlineCallbacks 

    def op_put_blobs(self, request, headers, message): 

        log.info("op_put_blobs") 

        if not hasattr(request, 'MessageType') or request.MessageType != BLOBS_MESSAGE_TYPE: 

            raise DataStoreWorkBenchError('Invalid put blobs request. Bad Message Type!', request.ResponseCodes.BAD_REQUEST) 

 

        def_list = [] 

        for blob in request.blob_elements: 

            def_list.append((self._blob_store.put(blob.key, blob.SerializeToString()), blob.key)) 

 

        dl_res = yield defer.DeferredList([x[0] for x in def_list]) 

        dl_fails = filter(lambda x: not x[1][0], enumerate(dl_res))     # extract, with indexes, which items in the deferred list have False as first member of result tuple 

        if len(dl_fails) > 0: 

            msg = "Failures (%d) putting to blob store:\n\n" % len(dl_fails) 

            for idx, res in dl_fails: 

                msg += "Key: %s, Failure: %s\n" % (sha1_to_hex(def_list[idx][1]), str(res[1])) 

            raise DataStoreWorkBenchError(msg) 

 

        yield self._process.reply_ok(message) 

        log.info("op_put_blobs: Complete!") 

 

    @defer.inlineCallbacks 

    def op_fetch_blobs(self, request, headers, message): 

        """ 

        Send the object back to a requester if you have it! 

        @TODO Update to new message pattern! 

 

        """ 

 

        if not hasattr(request, 'MessageType') or request.MessageType != BLOBS_REQUSET_MESSAGE_TYPE: 

            raise DataStoreWorkBenchError('Invalid fetch objects request. Bad Message Type!', request.ResponseCodes.BAD_REQUEST) 

 

        log.info('op_fetch_blobs: %d Keys' % len(request.blob_keys)) 

 

        response = yield self._process.message_client.create_instance(BLOBS_MESSAGE_TYPE) 

 

        def_list = [] 

        for key in request.blob_keys: 

            element = self._workbench_cache.get(key) 

 

            if element is not None: 

                link = response.blob_elements.add() 

                obj = response.Repository._wrap_message_object(element._element) 

 

                link.SetLink(obj) 

 

                continue 

 

            def_list.append((self._blob_store.get(key), key)) 

 

        res_list = yield defer.DeferredList([x[0] for x in def_list]) 

        dl_fails = filter(lambda x: not x[1][0], enumerate(res_list))     # extract, with indexes, which items in the deferred list have False as first member of result tuple 

        if len(dl_fails) > 0: 

            msg = "Failures (%d) fetching blobs from store:\n\n" % len(dl_fails) 

            for idx, res in dl_fails: 

                msg += "Key: %s, Failure: %s\n" % (sha1_to_hex(def_list[idx][1]), str(res[1])) 

            raise DataStoreWorkBenchError(msg) 

 

        for result, blob in res_list: 

            if blob is None: 

                raise DataStoreWorkBenchError('Invalid fetch objects request. Key Not Found!', request.ResponseCodes.NOT_FOUND) 

 

            element = gpb_wrapper.StructureElement.parse_structure_element(blob) 

            link = response.blob_elements.add() 

            obj = response.Repository._wrap_message_object(element._element) 

 

            link.SetLink(obj) 

 

 

 

        yield self._process.reply_ok(message, response) 

 

        log.info('op_fetch_blobs: Complete!') 

 

    @defer.inlineCallbacks 

    def flush_initialization_to_backend(self): 

        """ 

        Flush any repositories in the backend to the the workbench backend storage 

        """ 

        def_list=[] 

        for repo in self._repos.itervalues(): 

 

            def_list.append((self.flush_repo_to_backend(repo), repo.repository_key)) 

 

        # this is a deferred list of deferred lists 

        odl_res = yield defer.DeferredList([x[0] for x in def_list]) 

 

        # odl_res should always return True for each entry - we're more concerned about them underneath having failures 

        for idx, idl_res in enumerate(odl_res): 

            repo_key = def_list[idx][1] 

            # get failures from this 

            # idl_res is: (True, [(True, bla), (True, bla)...]) 

            idl_fails = filter(lambda x: not x[0], idl_res[1]) 

 

            # we are not concerned with comprehensive errors here, just error out on the first problem we find 

            if len(idl_fails) > 0: 

                raise DataStoreWorkBenchError("flush_initialization_to_backend encountered an error on repository %s" % repo_key) 

 

        #import pprint 

        #print 'After update to heads' 

        #pprint.pprint(self._commit_store.kvs) 

        log.info("Number of repositories:  %s" % len(self._repos)) 

        log.info("Number of blobs: %s " % len(self._workbench_cache)) 

 

        num_commit_keys = map(lambda repo: len(repo._commit_index.keys()), self._repos.values()) 

        log.info("Number of commits: %s " % sum(num_commit_keys)) 

 

        # Now clear the in memory workbench 

        self.clear() 

 

 

 

    def flush_repo_to_backend(self, repo): 

        """ 

        Flush any repositories in the backend to the the workbench backend storage 

        """ 

 

        # This is simpler than a push - all of these are guaranteed to be new objects! 

        def_list = [] 

        for key, element in repo.index_hash.items(): 

 

            def_list.append(self._blob_store.put(key, element.serialize())) 

 

 

        # any objects in the data structure that were transmitted have already 

        # been updated now it is time to set update the commits 

        # 

 

        commit_keys = repo._commit_index.keys() 

 

 

        branch_names = [] 

        for branch in repo.branches: 

            branch_names.append(branch.branchkey) 

 

        head_keys = [] 

        for cref in repo.current_heads(): 

            head_keys.append( cref.MyId ) 

 

        for key in commit_keys: 

 

            # Set the repository name for the commit 

            attributes = {REPOSITORY_KEY : str(repo.repository_key)} 

            # Set a default branch name to empty 

            attributes[BRANCH_NAME] = '' 

 

            cref = repo._commit_index.get(key) 

 

            link = cref.GetLink('objectroot') 

            root_type = link.type.GPBMessage 

 

            if root_type == ASSOCIATION_TYPE: 

                attributes[SUBJECT_KEY] = cref.objectroot.subject.key 

                attributes[SUBJECT_BRANCH] = cref.objectroot.subject.branch 

                attributes[SUBJECT_COMMIT] = cref.objectroot.subject.commit 

 

                attributes[PREDICATE_KEY] = cref.objectroot.predicate.key 

                attributes[PREDICATE_BRANCH] = cref.objectroot.predicate.branch 

                attributes[PREDICATE_COMMIT] = cref.objectroot.predicate.commit 

 

                attributes[OBJECT_KEY] = cref.objectroot.object.key 

                attributes[OBJECT_BRANCH] = cref.objectroot.object.branch 

                attributes[OBJECT_COMMIT] = cref.objectroot.object.commit 

 

            elif root_type == RESOURCE_TYPE: 

 

                attributes[RESOURCE_OBJECT_TYPE] = cref.objectroot.resource_type.key 

                attributes[RESOURCE_LIFE_CYCLE_STATE] = str(cref.objectroot.lcs) 

 

 

            elif  root_type == TERMINOLOGY_TYPE: 

                attributes[KEYWORD] = cref.objectroot.word 

 

            # get the wrapped structure element to put in... 

            wse = self._workbench_cache.get(key) 

 

 

            if key not in head_keys: 

 

                defd = self._commit_store.put(key = key, 

                                   value = wse.serialize(), 

                                   index_attributes = attributes) 

                def_list.append(defd) 

 

            else: 

 

                # We know it is a head - but we need to get the branch name again 

                for branch in  repo.branches: 

                    # If this is currently the head commit - set the branch name attribute 

                    if cref in branch.commitrefs: 

                        # If this is currently the head commit - set the branch name 

                        if attributes[BRANCH_NAME] == '': 

                            attributes[BRANCH_NAME] = branch.branchkey 

                        else: 

                            attributes[BRANCH_NAME] = ','.join([attributes[BRANCH_NAME],branch.branchkey]) 

 

 

                # Now commit it! 

                defd = self._commit_store.put(key = key, 

                                   value = wse.serialize(), 

                                   index_attributes = attributes) 

                def_list.append(defd) 

 

        # this deferred list will be checked by the flush_initialization_to_backend method 

        return defer.DeferredList(def_list) 

 

 

 

 

    @defer.inlineCallbacks 

    def test_existence(self,repo_key): 

        """ 

        For use in initialization - test to see if the repository already exists in the backend 

        """ 

 

        q = Query() 

        q.add_predicate_eq(REPOSITORY_KEY, repo_key) 

 

        rows = yield self._commit_store.query(q) 

 

        defer.returnValue(len(rows)>0) 

 

    @defer.inlineCallbacks 

    def op_extract_data(self, request, headers, message): 

        """ 

        DataRequestMessage / DataReplyMessage 

        """ 

        log.info("op_extract_data") 

 

        if not hasattr(request, 'MessageType') or request.MessageType != DATA_REQUEST_MESSAGE_TYPE: 

            raise DataStoreWorkBenchError('Invalid extract_data request. Bad Message Type!', request.ResponseCodes.BAD_REQUEST) 

 

        # verify there are no 0 strides in the request 

        if 0 in [x.stride for x in request.request_bounds if x.IsFieldSet('stride')]:   # the if makes it so that unset strides don't even get in the list 

            raise DataStoreWorkBenchError('Stride of 0 specified in request_bounds!') 

 

        response = yield self._process.message_client.create_instance(DATA_REPLY_MESSAGE_TYPE) 

 

        log.debug("Extract data request bounds: %s", ["%d+%d,%d" % (x.origin, x.size, x.stride) for x in request.request_bounds]) 

 

        # create an anonymous repo to load things into 

        repo = self.create_repository(root_type=ARRAY_STRUCTURE_TYPE) 

 

        filterlist = [CDM_ARRAY_INT32_TYPE, 

                      CDM_ARRAY_UINT32_TYPE, 

                      CDM_ARRAY_INT64_TYPE, 

                      CDM_ARRAY_UINT64_TYPE, 

                      CDM_ARRAY_FLOAT32_TYPE, 

                      CDM_ARRAY_FLOAT64_TYPE, 

                      CDM_ARRAY_STRING_TYPE, 

                      CDM_ARRAY_OPAQUE_TYPE] 

 

        # get some blobs into the repo 

        blobs = yield self._get_blobs(repo, [request.structure_array_ref], lambda x: x not in filterlist) 

        repo.index_hash.update(blobs) 

 

        # get element pointed to by key 

        se = repo.index_hash[request.structure_array_ref] 

        assert se 

        obj = repo._load_element(se) 

 

        repo.load_links(obj, filterlist) 

 

        # update repository's excluded_types 

        for extype in filterlist: 

            if extype not in repo.excluded_types: 

                repo.excluded_types.append(extype) 

 

        # now onto the fun.  let's traverse all the bounded arrays we find! 

 

        log.debug("op_extract_data: obj has %d bounded arrays" % len(obj.bounded_arrays)) 

 

        # get the type of bounded array we have here 

        assert len(obj.bounded_arrays) > 0 

 

        # a list of matching bounded arrays 

        bounded_includes_list = [] 

        targetshape = [x.size for x in request.request_bounds] 

 

        # =================================================================== 

        # STEP 1: Match bounded arrays 

        # =================================================================== 

 

        # iterate bounded arrays in this object 

        for ba in obj.bounded_arrays: 

 

            # need to be the same rank 

            if not len(ba.bounds) == len(request.request_bounds): 

                raise DataStoreWorkBenchError("Bounds dimensionality mismatch: this ba has %d dims, our request has %d" % (len(ba.bounds), len(request.request_bounds))) 

 

            target_range = [] 

            src_range = [] 

 

            # this for loop is doing a few things: 

            # - checking to see if the ranges intersect for each dimension.  If one does not intersect, the whole array 

            #   is rejected. 

            # - computing the intersection slices for each of those dimensions if they do intersect. 

            # - if we make it through the for without a rejection on a dimension, the else: clause is run, which marks 

            #   an array as being a required to copy array along with the ranges in both target and source. 

            for reqbounds, babounds in zip(request.request_bounds, ba.bounds): 

 

                #log.debug("Cur bounds: %d+%d, Req bounds: %d+%d" % (babounds.origin, babounds.size, reqbounds.origin, reqbounds.size)) 

 

                # this bounds is below our target range 

                # requested end is lower than this bounds start (exclusive) 

                if reqbounds.origin + reqbounds.size <= babounds.origin: 

                    break 

 

                # this bounds is above our target range 

                # requested start is higher than this bounds end (exclusive) 

                if reqbounds.origin >= babounds.origin + babounds.size: 

                    break 

 

                # compute intersections and offsets into request and src bounded arrays 

                isec_start = max(babounds.origin, reqbounds.origin) 

                isec_end = min(babounds.origin + babounds.size, reqbounds.origin + reqbounds.size) 

 

                effective_start = isec_start - reqbounds.origin 

                effective_end = isec_end - reqbounds.origin 

 

                ba_start = isec_start - babounds.origin 

                ba_end = isec_end - babounds.origin 

 

                # append those intersections - each entry represents a range into a dimension 

                target_range.append((effective_start, effective_end)) 

                src_range.append((ba_start, ba_end)) 

 

            else: 

                # all bounds are included, this bounded array is good to go 

                # format: (bounded array, target range of data (multidim), source bounded array range (multidim)) 

                bounded_includes_list.append((ba, target_range, src_range)) 

 

        # sidestep: figure out size of each item in a BA, and set chunk factor to 5mb (default, configurable) 

 

        # our default, safe assumptions say to expect the biggest 

        # we don't actually know what to put for CDM_ARRAY_STRING_TYPE as that varies and CDM_ARRAY_OPAQUE_TYPE, 

        # could be many things. 

        ITEM_SIZE = 8 

 

        if len(bounded_includes_list) > 0: 

            ndarray_type = bounded_includes_list[0][0].GetLink('ndarray').type 

            # @TODO: cmon, the in syntax doesn't use the correct __eq__ overload or whatever? this is silly. 

            if ndarray_type.object_id in [CDM_ARRAY_INT32_TYPE.object_id, CDM_ARRAY_UINT32_TYPE.object_id, CDM_ARRAY_FLOAT32_TYPE.object_id]: 

                ITEM_SIZE = 4 

            elif ndarray_type.object_id in [CDM_ARRAY_INT64_TYPE.object_id, CDM_ARRAY_UINT64_TYPE.object_id, CDM_ARRAY_FLOAT64_TYPE.object_id]: 

                ITEM_SIZE = 8 

 

        # max size for a data chunk AND the LRU dict 

        LRU_DICT_LIMIT = int(CONF.getValue('extract_cache_size', 5 * 1024 * 1024)) 

 

        # @TODO: Bug OOIION-159 is preventing us from setting a proper chunk limit of 5mb. 

        #                       The overflow point appears to be 16482 -> 16483, which in bytes, looks suspiciously 

        #                       like an arithmetic overflow somewhere. 

 

        CHUNK_FACTOR = 15000 #LRU_DICT_LIMIT / ITEM_SIZE       # chunk factor is expressed in # of items, not bytes 

        log.debug("LRU Cache Limit set at %d bytes, CHUNK_FACTOR is %d elements" % (LRU_DICT_LIMIT, CHUNK_FACTOR)) 

 

        # =================================================================== 

        # STEP 2: Compress/Optimize bounded_includes_list for overlap 

        # =================================================================== 

        # @TODO: not needed for R1 

 

        # =================================================================== 

        # STEP 3: Generate a list of matching strips from each BA 

        # =================================================================== 

        striplist = [] 

        strides = [x.stride or 1 for x in request.request_bounds] 

 

        for batuple in bounded_includes_list: 

            ba, targetranges, srcranges = batuple 

 

            # is this a scalar? is there anything to slice? 

            if len(targetshape) == 0: 

                striplist.append((ba, (0, 1), (0, 1), 1, 1)) 

            else: 

                # get slices out of it 

                # get dims of this bounded array 

                ba_shape = [x.size for x in ba.bounds] 

 

                for targetslice, srcslice, laststridelen in self._get_slices(targetshape, ba_shape, targetranges, srcranges, strides): 

                    striplist.append((ba, targetslice, srcslice, targetslice[1]-targetslice[0], laststridelen)) 

 

        log.debug("Number of uncompressed strips: %d" % len(striplist)) 

 

        # =================================================================== 

        # STEP 4: Sort that list of matching strips by start index in target array to start index + length 

        # =================================================================== 

 

        sorted_striplist = sorted(striplist, lambda x, y: x[1][1] < y[1][0]) 

 

        # =================================================================== 

        # STEP 5: Compress any contiguous strips from the same BAs 

        # =================================================================== 

        compressed_striplist = [] 

        accumstrip = None 

        for stripitem in sorted_striplist: 

            ba, targetslice, srcslice, leng, laststridelen = stripitem 

 

            # check for end condition 

            if accumstrip and ba != accumstrip[0]: 

                # push current strip into compressed striplist 

                compressed_striplist.append(accumstrip) 

                accumstrip = None 

 

            # brand new strip, either after we just pushed or starting this loop 

            if accumstrip is None: 

                accumstrip = stripitem[:] 

                continue 

 

            # see if we have a contiguous strip 

            # - current item's source slice start index is the same as the accumulated strip's source end index 

            # - strides are the same (should always be, this is more sanity check) 

            # - the possible accumulated strip's total length is over the max size for chunking messages 

            if srcslice[0] == accumstrip[2][1] and \ 

               laststridelen == accumstrip[4] and \ 

               accumstrip[3] + leng <= CHUNK_FACTOR: 

                # update accumstrip 

                accumstrip = (accumstrip[0], (accumstrip[1][0], targetslice[1]), (accumstrip[2][0], srcslice[1]), srcslice[1] - accumstrip[2][0], accumstrip[4]) 

            else: 

                # not contiguous?  push into list and forget it 

                compressed_striplist.append(accumstrip) 

                accumstrip = stripitem[:] 

 

        # catch the last one 

        if accumstrip is not None: 

            compressed_striplist.append(accumstrip) 

 

        log.debug("Number of compressed strips: %d" % len(compressed_striplist)) 

 

        # =================================================================== 

        # STEP 5b: find overlapping strips and omit them. 

        # =================================================================== 

 

        # @TODO this is extremely naive and would benefit from better BA analysis/compression in step 1 or 2 

        # it's also O(n^2) which is crap. 

 

        non_overlap_striplist = [] 

 

        # rule: if its in the striplist already, it wins. 

        for stripitem in compressed_striplist: 

            ba, targetslice, srcslice, leng, laststridelen = stripitem 

 

            # check to see if this targetslice has been taken care of already 

            for existing_stripitem in non_overlap_striplist: 

                nba, ntargetslice, nsrcslice, nleng, nlaststridelen = existing_stripitem 

 

                # since we're going linearly, we really only have to check the start of this new targetslice 

                if targetslice[0] >= ntargetslice[0] and targetslice[0] < ntargetslice[1]: 

                    # we have an intersection, figure out length of intersection 

                    intlen = ntargetslice[1] - targetslice[0] 

                    log.debug("intersection: %d, %d in %d, %d, len of %d" % (targetslice[0], targetslice[1], ntargetslice[0], ntargetslice[1], intlen)) 

 

                    # decision point: if the whole intersection is covered already, throw it out 

                    if targetslice[0] + intlen >= targetslice[1]: 

                        log.debug("whole strip covered, not adding it") 

                        break # skips else clause of for 

                    else: 

                        # split the current strip item up 

                        targetslice = (targetslice[0] + intlen, targetslice[1]) 

                        srcslice = (srcslice[0] + intlen * laststridelen, srcslice[1])  # src slice is always given without striding applied, so 

                                                                                        # when we update after a split we have to take that stride 

                                                                                        # into account.  If we've shaved off intlen items in target 

                                                                                        # coordinates, we need to shave off intlen * stride in source. 

                        leng = targetslice[1] - targetslice[0] 

 

                        log.debug("split slice into %d,%d -> %d,%d length %d" % (targetslice[0], targetslice[1], srcslice[0], srcslice[1], leng)) 

            else: 

                # no break, means we either don't intersect at all, or we split up to not intersect 

                #log.debug("adding slice") 

                newstripitem = (ba, targetslice, srcslice, leng, laststridelen) 

                non_overlap_striplist.append(newstripitem) 

 

        if log.getEffectiveLevel() <= logging.DEBUG: 

            lennonoverlap = len(non_overlap_striplist) 

            lencstriplist = len(compressed_striplist) 

            log.debug("Number of non-overlapping strips: %d (%d eliminated)" % (lennonoverlap, lencstriplist - lennonoverlap)) 

 

        # replace compressed striplist to work below 

        compressed_striplist = non_overlap_striplist 

 

        # =================================================================== 

        # STEP 6: Generate a list of extractions using heuristics, an "extraction plan" 

        # =================================================================== 

 

        # list of [list of indicies into compressed_striplist] 

        extraction_plan = [] 

 

        # simple: relying on the LRU cache to free up BAs when done, and that datasets will be laid out in a sane 

        # variety fastest varying dimension will never be broken up over multiple BAs unless dimensionality is one, 

        # we can just assemble each step of the plan to be the maximum chunk size we can fit. 

        curstep = [] 

        curlen = 0 

        for csidx, cstrip in enumerate(compressed_striplist): 

 

            # always need at least one strip 

            if len(curstep) == 0: 

                curstep.append(csidx) 

                curlen += cstrip[3] 

                continue 

 

            # can we fit the new strip? 

            if curlen + cstrip[3] <= CHUNK_FACTOR: 

                curstep.append(csidx) 

            else: 

                extraction_plan.append(curstep) 

                curstep = [csidx] 

                curlen = cstrip[3] 

 

        # catch any leftovers 

        if len(curstep) > 0: 

            extraction_plan.append(curstep) 

 

        # =================================================================== 

        # STEP 7: Perform extractions 

        # =================================================================== 

 

        # create a least-recently-used cache for ndarrays, using 5mb as the default max size 

        ndarray_cache = NDArrayLRUDict(LRU_DICT_LIMIT, repo) 

 

        try: 

            for exidx, exstep in enumerate(extraction_plan): 

                curstrips = [] 

                for sidx in exstep: 

                    curstrips.append(compressed_striplist[sidx]) 

 

                # get the start index.. should be in the first item 

                targetstartidx = curstrips[0][1][0] 

 

                # calculate number of elements we are going to output in this chunk, create temp storage for it 

                elemcount = reduce(lambda x, y: x+y, [x[3] for x in curstrips]) 

                targetndarray = [None] * elemcount 

 

                log.debug("Extraction step %d, # strips: %d, element count: %d, start index: %d" % (exidx, len(curstrips), elemcount, targetstartidx)) 

 

                # ok, now we can perform the extractions on this step 

                targetoffset = 0 

                for curstrip in curstrips: 

                    ba, targetidxs, srcidxs, leng, stride = curstrip 

 

                    # get/possibly load from ndarray_cache 

                    ndobjval = yield ndarray_cache.get_ndarray_value(ba.GetLink('ndarray').key, ba.bounds, ITEM_SIZE, self._get_blobs) 

 

                    srcslice = ndobjval[srcidxs[0]:srcidxs[1]] 

                    if stride == 1: 

                        targetslice = srcslice 

                    else: 

                        targetslice = [d for i, d in enumerate(srcslice) if i % stride == 0] 

 

                    #log.debug("SETTING TNDARRAY[%d:%d]" % (targetoffset, targetoffset+leng)) 

                    targetndarray[targetoffset:targetoffset+leng] = targetslice 

 

                    # add length to target offset 

                    targetoffset += leng 

 

                # ensure we filled this chunk 

                nonelist = [i for i,d in enumerate(targetndarray) if d is None] 

                if len(nonelist) > 0: 

                    log.error("extract_data: Nones found in targetndarray prior to send: %s" % str(nonelist)) 

                    raise DataStoreWorkBenchError("Data extraction did not properly fill in all members of response ndarray!") 

 

                # SEND THIS CHUNK 

 

                # create new message to send 

                chunkmsg = yield self._process.message_client.create_instance(DATA_CHUNK_MESSAGE_TYPE) 

                chunkmsg.seq_number = exidx 

                chunkmsg.seq_max = len(extraction_plan) 

 

                # set info in this chunk 

                chunkmsg.start_index = targetstartidx 

                chunkmsg.done = exidx == len(extraction_plan) - 1       # last chunk message?  set the done flag 

 

                # create the ndarray in this chunk 

                chunkndarray = chunkmsg.CreateObject(curstrips[0][0].GetLink('ndarray').type) 

 

                # these lines blow up with a TypeError if we screwed up the bounds and didn't fill in the targetarray fully, 

                # aka it contains Nones 

                chunkndarray.value[0:elemcount] = targetndarray[:] 

                chunkmsg.ndarray = chunkndarray 

 

                # send this message to the passed in routing key 

                yield self._send_data_chunk(request.data_routing_key, chunkmsg) 

        except Exception, ex: 

            class FakeMsg(object): 

                pass 

            fakemsg = FakeMsg() 

            fakemsg.payload = { 'reply-to': request.data_routing_key, 

                                'protocol': 'rpc'} 

            yield self._process.reply_err(fakemsg, exception=ex) 

            raise ex 

 

        self._process.reply_ok(message, response) 

        log.info("/op_extract_data") 

 

    @defer.inlineCallbacks 

    def _send_data_chunk(self, data_routing_key, chunkmsg): 

        """ 

        Sends a data chunk message (from op_extract_data).  This is split out to facilitate 

        testing via monkeypatching this method. 

        """ 

        log.debug("_send_data_chunk to %s" % data_routing_key) 

        yield self._process.send(data_routing_key, 'noop', chunkmsg) 

 

 

    def _double_xrange(self, start1, end1, start2, end2): 

        """ 

        This is a method just like xrange, but it operates on two diff ranges at once. 

        Used by extract_data when mapping slice ranges for data extraction. 

 

        @NOTE: This is a generator method, not to be confused with one that needs defer.inlineCallbacks decoration! 

        """ 

        counter1 = start1 

        counter2 = start2 

        while counter1 < end1 and counter2 < end2: 

            yield (counter1, counter2) 

 

            counter1 += 1 

            counter2 += 1 

 

    def _get_slices(self, targetdimextents, srcdimextents, targetranges, srcranges, strides, targetidx=0, srcidx=0): 

        """ 

        Returns a tuple of slice ranges (as tuples) that you can use to extract data from slices 

        inside an ndarray. This is used by extract_data. 

 

        @NOTE: This is a generator method, not to be confused with one that needs defer.inlineCallbacks decoration! 

 

        @param  targetdimextents    The dimensional extents of the target bounded array. 

        @param  srcdimextents       The dimensional extents of the source bounded array. 

        @param  targetranges        A list of tuples (one tuple per dimension), specifying a range in each 

                                    dimension that we are storing into. 

        @param  srcranges           A list of tuples (one tuple per dimension), specifying a range in each 

                                    dimension that we are pulling data out of. 

        @param  strides             A list of stride amounts for each dimension. Should be the same length 

                                    as targetranges/srcranges. The last dimension is not factored in here, but is 

                                    returned as the last member of the return tuple. 

        @param  targetidx           An accumulated index value into the target bounded array which gives the 

                                    current index calculated on each recursive call. 

        @param  srcidx              An accumulated index value into the source bounded array which gives the 

                                    current index calculated on each recursive call. 

 

        @returns                    On each yield, a tuple containing two tuples and an integer: a range to the target WITH 

                                    striding applied to all but the last dimension, a range to copy from the source WITHOUT 

                                    striding applied, and the stride length of the last dimension.  

        """ 

 

        # make sure we have sane things here 

        assert len(targetdimextents) == len(srcdimextents) 

        assert len(strides) == len(targetdimextents) 

 

        # build index extent arrays 

        targetidxextents = [None] * len(targetdimextents) 

        srcidxextents = [None] * len(srcdimextents) 

 

        targetidxextents[-1] = 1 

        srcidxextents[-1] = 1 

 

        # reduce target dim extents to extents after applying stride 

        striddentargetextents = [int(math.ceil(targetdimextents[i]/float(strides[i]))) for i in xrange(len(targetdimextents))] 

 

        # calculate index extents into target array, using stridden extents 

        for x in range(len(targetidxextents)-2, -1, -1): 

            targetidxextents[x] = reduce(lambda x,y: x*y, striddentargetextents[x+1:]) 

 

        # calculate index extents into source array 

        for x in range(len(srcidxextents)-2, -1, -1): 

            srcidxextents[x] = reduce(lambda x, y: x*y, srcdimextents[x+1:]) 

 

        def recslice(trs, srs, ts, ss, tstrides, cts=0, css=0, rc=0): 

            """ 

            Recursive slice finder. 

            @param  trs     Target ranges. 

            @param  srs     Source ranges. 

            @param  ts      Target slice (last dimension). 

            @param  ss      Source slice (last dimension). 

            @param  tstrides List of strides in all dimensions. 

            @param  cts     Current target sum, aka index into target array. 

            @param  css     Current source sum, aka index into source array. 

            @param  rc      Recursion count, used to index into targetidxextents/srcidxextents. 

            """ 

            if len(trs) == 0: 

                # exit case: traversed all dimensions, we're on the last dimension, extract our slices 

 

                slicetup = ((int(math.ceil(cts+ts[0]/float(tstrides[0]))), 

                             int(math.ceil(cts+ts[1]/float(tstrides[0])))), 

                             (css+ss[0], css+ss[1]), 

                             tstrides[0]) 

                #log.debug("slicetuple: %s" % str(slicetup)) 

                yield slicetup 

            else: 

                # iterative case: look at current dimension, co-iterate over the ranges in target/src, 

                #                 recurse into recslice again one dimension up until we run out. 

                ctr = trs[0] 

                csr = srs[0] 

                cstride = tstrides[0] 

 

                for tv, sv in self._double_xrange(ctr[0], ctr[1], csr[0], csr[1]): 

                    if tv % cstride == 0: 

                        for xx in recslice(trs[1:], 

                                           srs[1:], 

                                           ts, 

                                           ss, 

                                           tstrides[1:], 

                                           cts+(tv * targetidxextents[rc]),     # calculate actual index offset here and pass it 

                                           css+(sv * srcidxextents[rc]),        # calculate actual index offset here and pass it 

                                           rc+1): 

                            yield xx 

                    else: 

                        log.debug("IGNOREING STRIDED OUT DIM tv/sv %d,%d len dims %d" % (tv, sv, len(trs))) 

 

        # iterate through all recslice generated slicepairs 

        for x in recslice(targetranges[:-1], 

                          srcranges[:-1], 

                          targetranges[-1], 

                          srcranges[-1], 

                          strides[:]): 

            yield x 

 

    @defer.inlineCallbacks 

    def op_get_object(self, request, headers, message): 

        log.info('op_get_object') 

 

        if not hasattr(request, 'MessageType') or request.MessageType != GET_OBJECT_REQUEST_MESSAGE_TYPE: 

            raise DataStoreWorkBenchError('Invalid get_object request. Bad Message Type!', request.ResponseCodes.BAD_REQUEST) 

 

        response = yield self._process.message_client.create_instance(GET_OBJECT_REPLY_MESSAGE_TYPE) 

 

        key = request.object_id.key 

        branchname = request.object_id.branch or 'master' 

        repo = yield self._resolve_repo_state(key)    # gets latest repo state from cassandra 

        assert repo 

        commit = None 

 

        # do we have a treeish to resolve? 

        if request.object_id.treeish: 

            commit = repo.resolve_treeish(request.object_id.treeish, branchname) 

 

            # have to monkey patch from the datastore so it doesn't try to contact itself! 

            @defer.inlineCallbacks 

            def _fake_checkout_remote(commit, excluded_types): 

                link = commit.GetLink('objectroot') 

                yield self._get_blobs(repo, [link.key], lambda x: x.type not in excluded_types) 

 

                # expects to return the root object, so load it again 

                element = repo.index_hash[link.key] 

                root_obj = repo._load_element(element) 

 

                defer.returnValue(root_obj) 

 

            oldcheckout = repo._checkout_remote_commit 

            repo._checkout_remote_commit = _fake_checkout_remote 

 

            yield repo.checkout(branchname, commit_id=commit.MyId) 

 

            repo._checkout_remote_commit = oldcheckout 

 

        repo.cached = True 

 

        # @TODO: use first head for now 

        #comms = repo.current_heads() 

        #if hasattr(repo, "_current_branch"): 

        #    commit = repo._current_branch.commitrefs[0] 

        #else: 

        if commit is None: 

            commit = repo.current_heads()[0] 

 

        log.debug("GetObject: using commit %s" % sha1_to_hex(commit.MyId)) 

 

        link = commit.GetLink('objectroot') 

 

        # get blobs, update into response repository so we don't have to copy 

        blobs = yield self._get_blobs(response.Repository, [link.key], lambda x: x.type not in request.excluded_object_types) 

        response.Repository.index_hash.update(blobs) 

        repo.index_hash.update(blobs) 

 

        # load root object + links 

        element = response.Repository.index_hash[link.key] 

        root_obj = response.Repository._load_element(element) 

 

        excluded_types = [x.GPBMessage for x in request.excluded_object_types] 

        response.Repository.load_links(root_obj, excluded_types) 

 

        # update repository's excluded_types 

        for extype in excluded_types: 

            if extype not in response.Repository.excluded_types: 

                response.Repository.excluded_types.append(extype) 

 

        # fill in response, respond 

        response.retrieved_object = root_obj 

        for ex_type in request.excluded_object_types: 

            link = response.excluded_object_types.add() 

            newex = response.CreateObject(GPBTYPE_TYPE) 

            newex.object_id = ex_type.object_id 

            newex.version = ex_type.version 

            link.SetLink(newex) 

 

        yield self._process.reply_ok(message, response) 

 

        log.info("/op_get_object") 

 

class DataStoreError(ApplicationError): 

    """ 

    An exception class for the data store 

    """ 

 

 

class DataStoreService(ServiceProcess): 

    """ 

    The data store is not yet persistent. At the moment all its stored objects 

    are kept in a python dictionary, part of the work bench. This service will 

    be modified to use a persistent store - a set of cache instances to which 

    it will dump data from push ops and retrieve data for pull and fetch ops. 

    """ 

    # Declaration of service 

    declare = ServiceProcess.service_declare(name='datastore', 

                                             version='0.1.0', 

                                             dependencies=[]) 

 

    # The type_map is a map from object type to resource type built from the ion_preload_configs 

    # this is a temporary device until the resource registry is fully architecturally operational. 

    type_map = TypeMap() 

 

 

 

    def __init__(self, *args, **kwargs): 

        # Service class initializer. Basic config, but no yields allowed. 

 

        ServiceProcess.__init__(self, *args, **kwargs) 

 

        self._backend_cls_names = {} 

        self._backend_cls_names[COMMIT_CACHE] = self.spawn_args.get(COMMIT_CACHE, CONF.getValue(COMMIT_CACHE, default='ion.core.data.store.IndexStore')) 

        self._backend_cls_names[BLOB_CACHE] = self.spawn_args.get(BLOB_CACHE, CONF.getValue(BLOB_CACHE, default='ion.core.data.store.Store')) 

 

        self._cache_size = self.spawn_args.get('cache_size', CONF.getValue('cache_size', default=10**8)) 

 

        self._backend_classes={} 

 

        log.info('conf username:%s' % CONF.getValue("username")) 

        self._username = self.spawn_args.get("username", CONF.getValue("username", None)) 

        self._password = self.spawn_args.get("password", CONF.getValue("password",None)) 

 

        self._backend_classes[COMMIT_CACHE] = pu.get_class(self._backend_cls_names[COMMIT_CACHE]) 

        assert store.IIndexStore.implementedBy(self._backend_classes[COMMIT_CACHE]), \ 

            'The back end class to store commit objects passed to the data store does not implement the required IIndexSTORE interface.' 

 

        self._backend_classes[BLOB_CACHE] = pu.get_class(self._backend_cls_names[BLOB_CACHE]) 

        assert store.IStore.implementedBy(self._backend_classes[BLOB_CACHE]), \ 

            'The back end class to store blob objects passed to the data store does not implement the required ISTORE interface.' 

 

        # Declare some variables to hold the store instances 

 

        self.c_store = None 

        self.b_store = None 

 

        # Get the configuration for cassandra - may or may not be used depending on the backend class 

        self._storage_conf = get_cassandra_configuration() 

 

        # Get the arguments for preloading the datastore 

        self.preload = {ION_PREDICATES_CFG:True, 

                        ION_RESOURCE_TYPES_CFG:True, 

                        ION_IDENTITIES_CFG:True, 

                        ION_DATASETS_CFG:False, 

                        ION_AIS_RESOURCES_CFG:False} 

 

        self.preload.update(CONF.getValue(PRELOAD_CFG, {})) 

        self.preload.update(self.spawn_args.get(PRELOAD_CFG, {})) 

 

 

 

        log.info('DataStoreService.__init__()') 

 

 

    @defer.inlineCallbacks 

    def slc_init(self): 

        # Service life cycle state. Initialize service here. Can use yields. 

        if issubclass(self._backend_classes[COMMIT_CACHE], cassandra.CassandraStore): 

            #raise NotImplementedError('Startup for cassandra store is not yet complete') 

            log.info("Instantiating Cassandra Index Store: %s" % self._backend_classes[COMMIT_CACHE]) 

 

            storage_provider = self._storage_conf[STORAGE_PROVIDER] 

            keyspace = self._storage_conf[PERSISTENT_ARCHIVE]['name'] 

 

            self.c_store = self._backend_classes[COMMIT_CACHE](self._username, self._password, storage_provider, keyspace, COMMIT_CACHE) 

 

            yield self.c_store.initialize() 

            yield self.c_store.activate() 

 

            # Can not afford to lazy initialize this value - the first call is a deferred list! 

            query_attributes =  yield self.c_store.get_query_attributes() 

            self.c_store._query_attribute_names = set(query_attributes) 

 

            yield self.register_life_cycle_object(self.c_store) 

 

        else: 

 

            log.info("Clearing The In Memeory Index Store") 

 

            self._backend_classes[COMMIT_CACHE].indices.clear() 

            self._backend_classes[COMMIT_CACHE].kvs.clear() 

 

            log.info("Instantiating In Memeory Index Store") 

            # Pass self for index store service implementation 

            self.c_store = self._backend_classes[COMMIT_CACHE](self, indices=COMMIT_INDEXED_COLUMNS ) 

 

        if issubclass(self._backend_classes[BLOB_CACHE], cassandra.CassandraStore): 

            #raise NotImplementedError('Startup for cassandra store is not yet complete') 

            log.info("Instantiating Cassandra Store: %s" % self._backend_classes[BLOB_CACHE]) 

 

            storage_provider = self._storage_conf[STORAGE_PROVIDER] 

            keyspace = self._storage_conf[PERSISTENT_ARCHIVE]['name'] 

 

            self.b_store = self._backend_classes[BLOB_CACHE](self._username, self._password, storage_provider, keyspace, BLOB_CACHE) 

 

            yield self.b_store.initialize() 

            yield self.b_store.activate() 

 

            yield self.register_life_cycle_object(self.b_store) 

        else: 

 

            log.info("Clearing The In Memeory Store") 

 

            self._backend_classes[BLOB_CACHE].kvs.clear() 

 

            log.info("Instantiating In Memory Store") 

            # Pass self for store service implementation 

            self.b_store = self._backend_classes[BLOB_CACHE](self) 

 

 

        log.info("Created stores") 

 

        # Create a specialized workbench for the datastore which has a persistent back end. 

        self.workbench = DataStoreWorkbench(self, self.b_store, self.c_store, cache_size=self._cache_size) 

 

        # Replace the existing message client in the procss with a new one - that uses the new workbench 

        # Not doing this was the source of a huge memory leak! 

        self.message_client = MessageClient(self) 

 

        yield self.initialize_datastore() 

 

 

    def slc_activate(self): 

 

 

        self.op_fetch_blobs = self.workbench.op_fetch_blobs 

        self.op_pull = self.workbench.op_pull 

        self.op_push = self.workbench.op_push 

        self.op_checkout = self.workbench.op_checkout 

        self.op_get_lcs = self.workbench.op_get_lcs 

        self.op_put_blobs = self.workbench.op_put_blobs 

        self.op_get_object = self.workbench.op_get_object 

        self.op_extract_data = self.workbench.op_extract_data 

 

 

    @defer.inlineCallbacks 

    def initialize_datastore(self): 

        """ 

        This method is used to preload required content into the datastore 

        """ 

 

        if self.preload[ION_PREDICATES_CFG]: 

 

            log.info('Preloading Predicates') 

            for key, value in ION_PREDICATES.items(): 

 

                exists = yield self.workbench.test_existence(value[ID_CFG]) 

                if not exists: 

                    log.info('Preloading Predicate:' + str(value.get(PREDICATE_CFG))) 

                    predicate_repo = self._create_predicate(value) 

                    if predicate_repo is None: 

                        raise DataStoreError('Failed to create predicate: %s' % str(value)) 

                    #@TODO make associations to predicates! 

 

 

 

        # Load the Root User! 

        if self.preload[ION_IDENTITIES_CFG]: 

 

 

            log.info('Preloading Identities and Roles') 

 

            root_description = ION_IDENTITIES.get(root_name) 

            root_exists = yield self.workbench.test_existence(ROOT_USER_ID) 

            if not root_exists: 

                log.info('Preloading ROOT USER') 

 

                resource_instance = self._create_resource(root_description) 

                if resource_instance is None: 

                    raise DataStoreError('Failed to create Identity Resource: %s' % str(root_description)) 

                self._create_ownership_association(resource_instance.Repository, ROOT_USER_ID) 

 

 

            log.info('Preloading Roles') 

 

            for key, value in ION_ROLES.items(): 

                exists = yield self.workbench.test_existence(value[ID_CFG]) 

                if not exists: 

                    log.info('Preloading Role Resources:' + str(value.get(NAME_CFG))) 

 

                    resource_instance = self._create_resource(value) 

                    if resource_instance is None: 

                        raise DataStoreError('Failed to create AIS Resource: %s' % str(value)) 

                    self._create_ownership_association(resource_instance.Repository, ROOT_USER_ID) 

 

            # Root was just created - need to give it a role 

            if not root_exists: 

                create_role_association(self.workbench, ROOT_USER_ID, ADMIN_ROLE_ID) 

 

 

        if self.preload[ION_RESOURCE_TYPES_CFG]: 

            log.info('Preloading Resource Types') 

 

            for key, value in ION_RESOURCE_TYPES.items(): 

 

                exists = yield self.workbench.test_existence(value[ID_CFG]) 

                if not exists: 

                    log.info('Preloading Resource Type:' + str(value.get(NAME_CFG))) 

 

                    resource_instance = self._create_resource(value) 

                    if resource_instance is None: 

                        raise DataStoreError('Failed to create Resource Type Resource: %s' % str(value)) 

                    self._create_ownership_association(resource_instance.Repository, ROOT_USER_ID) 

 

 

        if self.preload[ION_IDENTITIES_CFG]: 

            log.info('Preloading Identities') 

 

            for key, value in ION_IDENTITIES.items(): 

                if key is root_name: 

                    # Don't load root twice... 

                    continue 

 

                exists = yield self.workbench.test_existence(value[ID_CFG]) 

                if not exists: 

                    log.info('Preloading Identity:' + str(value.get(NAME_CFG))) 

 

                    resource_instance = self._create_resource(value) 

                    if resource_instance is None: 

                        raise DataStoreError('Failed to create Identity Resource: %s' % str(value)) 

                    self._create_ownership_association(resource_instance.Repository, value[ID_CFG]) 

 

                    # Add the authenticated role for each user.... 

                    create_role_association(self.workbench, ROOT_USER_ID, AUTHENTICATED_ROLE_ID) 

 

 

        if self.preload[ION_DATASETS_CFG]: 

            log.info('Preloading Data Sets: %d' % len(ION_DATASETS)) 

 

            for key, value in ION_DATASETS.items(): 

                exists = yield self.workbench.test_existence(value[ID_CFG]) 

                if not exists: 

                    log.info('Preloading DataSet:' + str(value.get(NAME_CFG))) 

 

                    resource_instance = self._create_resource(value) 

                    # Do not fail if returning none - may or may not load data from disk 

                    if resource_instance is not None: 

 

                        owner = value.get(OWNER_ID) or ANONYMOUS_USER_ID 

                        log.info('Dataset Owner ID: %s' % owner) 

 

                        self._create_ownership_association(resource_instance.Repository, owner) 

 

                    else: 

                        # Delete this entry from the CONFIG! 

                        del ION_DATASETS[key] 

 

 

            log.info('Preloading Data Sources: %d' % len(ION_DATA_SOURCES)) 

            for key, value in ION_DATA_SOURCES.items(): 

                exists = yield self.workbench.test_existence(value[ID_CFG]) 

                if not exists: 

                    log.info('Preloading DataSource:' + str(value.get(NAME_CFG))) 

 

                    resource_instance = self._create_resource(value) 

                    # Do not fail if returning none - may or may not load data from disk 

                    if resource_instance is not None: 

 

                        owner = value.get(OWNER_ID) or ANONYMOUS_USER_ID 

                        log.info('Datasource Owner ID: %s' % owner) 

 

                        self._create_ownership_association(resource_instance.Repository, owner) 

                    else: 

                        # Delete this entry from the CONFIG! 

                        del ION_DATA_SOURCES[key] 

 

        if self.preload[ION_AIS_RESOURCES_CFG]: 

            log.info('Preloading AIS Resources') 

 

            for key, value in ION_AIS_RESOURCES.items(): 

                exists = yield self.workbench.test_existence(value[ID_CFG]) 

                if not exists: 

                    log.info('Preloading AIS Resource:' + str(value.get(NAME_CFG))) 

 

                    resource_instance = self._create_resource(value) 

                    if resource_instance is None: 

                        raise DataStoreError('Failed to create AIS Resource: %s' % str(value)) 

                    self._create_ownership_association(resource_instance.Repository, ANONYMOUS_USER_ID) 

 

 

 

        yield self.workbench.flush_initialization_to_backend() 

 

 

 

    def _create_predicate(self,description): 

 

        try: 

            predicate_type = description[TYPE_CFG] 

            predicate_key = description[ID_CFG] 

            predicate_word = description[PREDICATE_CFG] 

        except KeyError, ke: 

            log.info(ke) 

            return None 

 

        try: 

            predicate_repository = self.workbench.create_repository(root_type=predicate_type, repository_key=predicate_key) 

            predicate = predicate_repository.root_object 

            predicate.word = predicate_word 

 

            predicate_repository.commit('Predicate instantiated by datastore bootstrap') 

        except WorkBenchError, ex: 

            log.info(ex) 

            return None 

        except repository.RepositoryError, ex: 

            log.info(ex) 

            return None 

 

        return predicate_repository 

 

 

 

    def _create_resource(self, description): 

        """ 

        Helper method to create resource objects during initialization 

        """ 

        try: 

            resource_key = description[ID_CFG] 

            resource_description = description[DESCRIPTION_CFG] 

            resource_name = description[NAME_CFG] 

            resource_type = description[TYPE_CFG] 

            content = description[CONTENT_CFG] 

 

            # LCS is optional! 

            res_lcs = description.get(LCS_CFG) 

        except KeyError, ke: 

            log.info(ke) 

            return None 

 

        # Create this resource with a constant ID from the config file 

        try: 

            resource_repository = self.workbench.create_repository(root_type=RESOURCE_TYPE, repository_key=resource_key) 

        except WorkBenchError, we: 

            log.info(we) 

            return None 

        except repository.RepositoryError, re: 

            log.info(re) 

            return None 

 

        resource = resource_repository.root_object 

 

        # Set the identity of the resource 

        resource.identity = resource_repository.repository_key 

 

        # Create the new resource object 

        res_obj = resource_repository.create_object(resource_type) 

 

        # Set the object as the child of the resource 

        resource.resource_object = res_obj 

 

        # Name and Description is set by the resource client 

        resource.name = resource_name 

        resource.description = resource_description 

 

        # Set the type... 

        object_utils.set_type_from_obj(res_obj, resource.object_type) 

 

        res_type = resource_repository.create_object(IDREF_TYPE) 

        # Get the resource type if it exists - otherwise a default will be set! 

        res_type.key = self.type_map.get(resource_type.object_id) 

        resource.resource_type = res_type 

 

        # State is set to new by default 

        if res_lcs == COMMISSIONED: 

            resource.lcs = resource.LifeCycleState.COMMISSIONED 

        else: 

            resource.lcs = resource.LifeCycleState.ACTIVE 

 

 

 

        resource_instance = resource_client.ResourceInstance(resource_repository) 

 

        # Set the content 

        set_content_ok = True 

        if isinstance(content, dict): 

            # If it is a dictionary, set the content of the resource 

            for k,v in content.items(): 

                setattr(resource_instance,k,v) 

 

        elif isinstance(content, FunctionType): 

            #execute the function on the resource_instance! 

            kwargs = {'has_a_id':HAS_A_ID} 

            if description.has_key(CONTENT_ARGS_CFG): 

                kwargs.update(description[CONTENT_ARGS_CFG]) 

 

            set_content_ok = content(resource_instance, self, **kwargs) 

 

 

        if set_content_ok: 

            resource_instance.Repository.commit('Resource instantiated by datastore bootstrap') 

 

            ### EXTREMELY VERBOSE LOGGING! 

            #log.warn(description) 

            #log.warn(resource_instance) 

            #log.warn(resource_instance.ResourceObject.PPrint()) 

 

            return resource_instance 

 

        else: 

            self.workbench.clear_repository_key(resource_key) 

            log.info('Retrieving content for resource "%s" failed.  This resource instance will not be added to the repository!' % resource_name) 

            return None 

 

 

 

    def _create_ownership_association(self, repo_object, user_id): 

 

        association_repo = self.workbench.create_repository(ASSOCIATION_TYPE) 

 

        # Set the subject 

        id_ref = association_repo.create_object(IDREF_TYPE) 

        repo_object.set_repository_reference(id_ref, current_state=True) 

        association_repo.root_object.subject = id_ref 

 

        # Set the predicate 

        id_ref = association_repo.create_object(IDREF_TYPE) 

        owned_by_repo = self.workbench.get_repository(OWNED_BY_ID) 

        if owned_by_repo is None: 

            raise DataStoreError('Owned_By predicate not found during preload.') 

        owned_by_repo.set_repository_reference(id_ref, current_state=True) 

 

        association_repo.root_object.predicate = id_ref 

 

        # Set teh Object 

        id_ref = association_repo.create_object(IDREF_TYPE) 

        owner_repo = self.workbench.get_repository(user_id) 

        if owner_repo is None: 

            raise DataStoreError('Owner resource not found during preload.') 

        owner_repo.set_repository_reference(id_ref, current_state=True) 

 

        association_repo.root_object.object = id_ref 

 

        association_repo.commit('Ownership association created for preloaded object.') 

 

 

        return association_repo 

 

 

class DataStoreClient(ServiceClient): 

    """ 

    Client for retrieving datastore resources -- currently for retrieving the IDs of preloaded datasets 

    """ 

 

    def __init__(self, *args, **kwargs): 

        kwargs['targetname'] = 'datastore' 

        ServiceClient.__init__(self, *args, **kwargs) 

 

    @defer.inlineCallbacks 

    def push(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('push', content) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def pull(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('pull', content) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def checkout(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('checkout', content) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def fetch_blobs(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('fetch_blobs', content) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def get_lcs(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('get_lcs', content) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def put_blobs(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('put_blobs', content) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def get_object(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('get_object', content) 

        defer.returnValue(content) 

 

    @defer.inlineCallbacks 

    def extract_data(self, content): 

        yield self._check_init() 

 

        (content, headers, msg) = yield self.rpc_send('extract_data', content) 

        defer.returnValue(content) 

 

#    @defer.inlineCallbacks 

#    def get_preloaded_datasets_dict(self): 

#        """ 

#        Retrieve the dictionary of preloaded dataset IDs 

#        """ 

##        yield self._check_init() 

# 

#        log.info("@@@--->>> DataStoreClient: Sending RPC message.  OP = 'get_preloaded_datasets_dict'") 

##        (content, headers, msg) = yield self.rpc_send('get_preloaded_datasets_dict', None) 

## 

##        log.info("<<<---@@@ DataStoreClient: Incoming rpc reply to op: 'get_preloaded_datasets_dict'") 

##        log.debug("... Content\t" + str(content)) 

##        log.debug("... Headers\t" + str(headers)) 

##        log.debug("... Message\t" + str(msg)) 

## 

##        defer.returnValue(content) 

#        yield 

#        defer.returnValue({}) 

 

def create_role_association(workbench, user_id, role_id): 

   association_repo = workbench.create_repository(ASSOCIATION_TYPE) 

 

   # Set the subject 

   id_ref = association_repo.create_object(IDREF_TYPE) 

   user_repo = workbench.get_repository(user_id) 

   if user_repo is None: 

       raise DataStoreError('User Repository not found during preload.') 

   user_repo.set_repository_reference(id_ref, current_state=True) 

   association_repo.root_object.subject = id_ref 

 

   # Set the predicate 

   id_ref = association_repo.create_object(IDREF_TYPE) 

   has_role_repo = workbench.get_repository(HAS_ROLE_ID) 

   if has_role_repo is None: 

       raise DataStoreError('Has_Role predicate not found during preload.') 

   has_role_repo.set_repository_reference(id_ref, current_state=True) 

 

   association_repo.root_object.predicate = id_ref 

 

   # Set the Object 

   id_ref = association_repo.create_object(IDREF_TYPE) 

   role_repo = workbench.get_repository(role_id) 

   if role_repo is None: 

       raise DataStoreError('Role resource not found during preload.') 

   role_repo.set_repository_reference(id_ref, current_state=True) 

 

   association_repo.root_object.object = id_ref 

 

   association_repo.commit('Ownership association created for preloaded object.') 

 

 

   return association_repo 

 

# Spawn of the process using the module name 

factory = ProcessFactory(DataStoreService)