Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

#!/usr/bin/env python 

 

""" 

@file ion/core/object/codec.py 

@author David Stuebe 

@brief Interceptor for encoding and decoding ION messages 

""" 

 

from twisted.internet import defer 

 

import ion.util.ionlog 

log = ion.util.ionlog.getLogger(__name__) 

 

from ion.core.intercept.interceptor import EnvelopeInterceptor 

from google.protobuf.internal import decoder 

 

from ion.core.object import gpb_wrapper 

from ion.core.object import repository 

from net.ooici.core.container import container_pb2 

from ion.core.object import object_utils 

from ion.core.messaging import message_client 

 

ION_MESSAGE_TYPE = object_utils.create_type_identifier(object_id=11, version=1) 

 

STRUCTURE_ELEMENT_TYPE = object_utils.create_type_identifier(object_id=1, version=1) 

STRUCTURE_TYPE = object_utils.create_type_identifier(object_id=2, version=1) 

 

ION_R1_GPB = 'ION R1 GPB' 

 

class CodecError(Exception): 

    """ 

    An error class for problems that occur in the codec 

    """ 

 

 

class ObjectCodecInterceptor(EnvelopeInterceptor): 

    """ 

    Interceptor that decodes the serialized content in a message. 

    The object returned is the root of a repository structure. It is not yet added to the workbench and completely 

    separate from the process until it finishes the interceptor stack! 

    """ 

    def before(self, invocation): 

 

        # Only mess with ION_R1_GPB encoded objects... 

        if isinstance(invocation.content, dict) and ION_R1_GPB == invocation.content['encoding']: 

            raw_content = invocation.content['content'] 

            unpacked_content = unpack_structure(raw_content) 

 

            if hasattr(unpacked_content, 'ObjectType') and unpacked_content.ObjectType == ION_MESSAGE_TYPE: 

                # If this content should be returned in a Message Instance 

                unpacked_content = message_client.MessageInstance(unpacked_content.Repository) 

 

 

            invocation.content['content'] = unpacked_content 

 

        return invocation 

 

    def after(self, invocation): 

        """ 

        Encode a Message Instance to a serialized form. 

        Also possible to encode a gpb_wrapper for backward compatibility. 

        """ 

 

        content = invocation.message['content'] 

 

        if isinstance(content, (message_client.MessageInstance, gpb_wrapper.Wrapper)): 

 

            # Turn of access to shared process object Cache 

            content.Repository.index_hash.has_cache = False 

 

            invocation.message['content'] = pack_structure(content) 

 

            invocation.message['encoding'] = ION_R1_GPB 

 

            # Turn it back on. 

            content.Repository.index_hash.has_cache = True 

 

 

        return invocation 

 

 

 

def pack_structure(content): 

    """ 

    Pack all children of the content stucture into a message. 

    Return the content as a serialized container object. 

    """ 

 

    repo = getattr(content, 'Repository', None) 

    if repo is None: 

        raise CodecError('Pack Structure received content which does not have a valid Repository') 

 

    if not repo.status == repo.UPTODATE: 

        comment='Commiting to send message with wrapper object' 

        repo.commit(comment=comment) 

 

    # only put StructureElements in this, please. 

    obj_set=set() 

 

    # Get the serialized root object 

    root_obj = repo.root_object 

    root_obj_se = repo.index_hash.get(root_obj.MyId) 

 

    items = set([root_obj]) 

 

    # extract the excluded_object_types list if we have one! 

    excluded_object_types = [] 

    if hasattr(content, 'excluded_object_types') and len(content.excluded_object_types) > 0: 

        log.debug("Codec pack_structure has %d excluded_object_types" % len(content.excluded_object_types)) 

        excluded_object_types = [x.GPBMessage for x in content.excluded_object_types] 

 

    # Recurse through the DAG and add the keys to a set - obj_set. 

    while len(items) > 0: 

        child_items = set() 

        for item in items: 

 

            # Add this item to the set we are sending 

            if item not in obj_set: 

 

                for link in item.ChildLinks: 

 

                    # if this link's key is not in the index_hash, then its type must be in the excluded_type list we 

                    # pull out of the message above. if not, we have an error. 

 

                    hashobj = repo.index_hash.get(link.key, None) 

                    if hashobj is None: 

                        # link is a CASRef to a GPBType 

                        if link.GPBMessage.type not in excluded_object_types: 

                            raise CodecError("Hashed CREF not found (and not excluded)! Please call David") 

                    else: 

                        # store the object we just pulled out of the index_hash for passing to the _pack_container method 

                        obj_set.add(hashobj) 

 

                        # load this object so we can examine its childlinks - should be simple extraction from 

                        # repo._workspace, but use the public method. 

                        subobj = repo.get_linked_object(link) 

                        child_items.add(subobj) 

 

        items = child_items 

 

    container_structure = _pack_container(root_obj_se, obj_set) 

    serialized = container_structure.SerializeToString() 

 

    log.debug('pack_structure: Packing Complete!') 

 

    return serialized 

 

def _pack_container(head, objects): 

    """ 

    Helper for the sender to pack message content into a container in order 

    """ 

    log.debug('_pack_container: Packing container head and object_keys!') 

    # An unwrapped GPB Structure message to put stuff into! 

    cs = object_utils.get_gpb_class_from_type_id(STRUCTURE_TYPE)() 

 

 

    cs.head.key = head.key 

 

    cs.head.type.object_id =  head.type.object_id 

    cs.head.type.version =  head.type.version 

 

    cs.head.isleaf = head.isleaf 

    cs.head.value = head.value 

 

    for item in objects: 

 

        se = cs.items.add() 

 

        # Can not set the pointer directly... must set the components 

        se.key = item.key 

        se.isleaf = item.isleaf 

        se.type.object_id = item.type.object_id 

        se.type.version = item.type.version 

 

        # @TODO - How can we measure memory usage here to make sure this is the okay? 

        se.value = item.value # Let python's object manager keep track of the pointer to the big things! 

 

 

    log.debug('_pack_container: Packed container!') 

    return cs 

 

def unpack_structure(serialized_container): 

    """ 

    Take a serialized container object and load a repository with its contents 

    """ 

    log.debug('unpack_structure: Unpacking Structure!') 

    head, obj_dict = _unpack_container(serialized_container) 

 

    assert len(obj_dict) > 0, 'There should be objects in the container!' 

 

    repo = repository.Repository() 

 

    repo.index_hash.update(obj_dict) 

 

    # Load the object and set it as the workspace root 

    root_obj = repo._load_element(head) 

    repo.root_object = root_obj 

 

    repo.branch(nickname='master') 

 

    # attempt to extract a list of excluded objects, if the message contains the field 'excluded_object_types' 

    excluded_types = [] 

    if hasattr(root_obj, 'message_object') and hasattr(root_obj.message_object, 'excluded_object_types'): 

        log.debug("Codec unpack_structure has %d excluded_object_types set in field" % len(root_obj.message_object.excluded_object_types)) 

        excluded_types = [x.GPBMessage for x in root_obj.message_object.excluded_object_types] 

 

    # Now load the rest of the linked objects - down to the leaf nodes. 

    repo.load_links(root_obj, excluded_types) 

 

    # append the excluded object types in the repo (load links no longer does this) 

    for extype in excluded_types: 

        if extype not in repo.excluded_types: 

            repo.excluded_types.append(extype) 

 

    # Create a commit to record the state when the message arrived 

    cref = repo.commit(comment='Message for you Sir!') 

 

 

    log.debug('unpack_structure: returning root_obj') 

 

    return root_obj 

 

 

 

def _unpack_container(serialized_container): 

    """ 

    Helper for the receiver for unpacking message content 

    Returns the head object and items as wrapped structure elements 

    """ 

 

    log.debug('_unpack_container: Unpacking Container') 

    # An unwrapped GPB Structure message to put stuff into! 

    cs = object_utils.get_gpb_class_from_type_id(STRUCTURE_TYPE)() 

 

    try: 

        cs.ParseFromString(serialized_container) 

    except decoder._DecodeError, de: 

        log.debug('Received invalid content - decode error: "%s"' % str(de)) 

        raise CodecError('Could not decode message content as a GPB container structure!') 

 

    # Return arguments 

    obj_dict={} 

 

    head = gpb_wrapper.StructureElement(cs.head) 

    obj_dict[head.key] = head 

 

 

    for se in cs.items: 

        wse = gpb_wrapper.StructureElement(se) 

 

        obj_dict[wse.key] = wse 

 

    log.debug('_unpack_container: returning head and dictionary of %d objects' % len(obj_dict)) 

 

    return head, obj_dict