1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module for storing and getting objects from datastore. 5 6App Engine datastore limits entity size to less than 1 MB; this module 7supports storing larger objects by splitting the data and using multiple 8datastore entities. 9 10Although this module contains ndb.Model classes, these are not intended 11to be used directly by other modules. 12 13Example: 14 john = Account() 15 john.username = 'John' 16 john.userid = 123 17 stored_object.Set(john.userid, john) 18""" 19from __future__ import print_function 20from __future__ import division 21from __future__ import absolute_import 22 23import cPickle as pickle 24 25from google.appengine.ext import ndb 26 27# Max bytes per entity. 28_CHUNK_SIZE = 1000 * 1000 29 30 31@ndb.synctasklet 32@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 33def Get(key): 34 """Gets the value. 35 36 Args: 37 key: String key value. 38 39 Returns: 40 A value for key. 41 """ 42 result = yield GetAsync(key) 43 raise ndb.Return(result) 44 45 46@ndb.tasklet 47@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 48def GetAsync(key): 49 entity = yield ndb.Key(MultipartEntity, key).get_async() 50 if not entity: 51 raise ndb.Return(None) 52 yield entity.GetPartsAsync() 53 raise ndb.Return(entity.GetData()) 54 55 56@ndb.synctasklet 57@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 58def Set(key, value): 59 """Sets the value in datastore. 60 61 Args: 62 key: String key value. 63 value: A pickleable value to be stored. 64 """ 65 yield SetAsync(key, value) 66 67 68@ndb.tasklet 69@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 70def SetAsync(key, value): 71 entity = yield ndb.Key(MultipartEntity, key).get_async() 72 if not entity: 73 entity = MultipartEntity(id=key) 74 entity.SetData(value) 75 yield entity.PutAsync() 76 77 78@ndb.synctasklet 79@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 80def Delete(key): 81 """Deletes the value in datastore.""" 82 yield DeleteAsync(key) 83 84 85@ndb.tasklet 86@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 87def DeleteAsync(key): 88 multipart_entity_key = ndb.Key(MultipartEntity, key) 89 # Check if the entity exists before attempting to delete it in order to avoid 90 # contention errors. 91 # "Every attempt to create, update, or delete an entity takes place in the 92 # context of a transaction," 93 # "There is a write throughput limit of about one transaction per second 94 # within a single entity group" 95 entity = yield multipart_entity_key.get_async() 96 if not entity: 97 return 98 yield (multipart_entity_key.delete_async(), 99 MultipartEntity.DeleteAsync(multipart_entity_key)) 100 101 102class MultipartEntity(ndb.Model): 103 """Container for PartEntity.""" 104 105 # Number of entities use to store serialized. 106 size = ndb.IntegerProperty(default=0, indexed=False) 107 108 @ndb.tasklet 109 @ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 110 def GetPartsAsync(self): 111 """Deserializes data from multiple PartEntity.""" 112 if not self.size: 113 raise ndb.Return(None) 114 115 string_id = self.key.string_id() 116 part_keys = [ 117 ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) 118 for i in range(self.size) 119 ] 120 part_entities = yield ndb.get_multi_async(part_keys) 121 serialized = ''.join(p.value for p in part_entities if p is not None) 122 self.SetData(pickle.loads(serialized)) 123 124 @classmethod 125 @ndb.tasklet 126 @ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 127 def DeleteAsync(cls, key): 128 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) 129 yield ndb.delete_multi_async(part_keys) 130 131 @ndb.tasklet 132 @ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True) 133 def PutAsync(self): 134 """Stores serialized data over multiple PartEntity.""" 135 part_list = [ 136 PartEntity(id=i + 1, parent=self.key, value=part) 137 for i, part in enumerate(_Serialize(self.GetData())) 138 ] 139 self.size = len(part_list) 140 yield ndb.put_multi_async(part_list + [self]) 141 142 def GetData(self): 143 return getattr(self, '_data', None) 144 145 def SetData(self, data): 146 setattr(self, '_data', data) 147 148 149class PartEntity(ndb.Model): 150 """Holds a part of serialized data for MultipartEntity. 151 152 This entity key has the form: 153 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) 154 """ 155 value = ndb.BlobProperty() 156 157 158def _Serialize(value): 159 """Serializes value and returns a list of its parts. 160 161 Args: 162 value: A pickleable value. 163 164 Returns: 165 A list of string representation of the value that has been pickled and split 166 into _CHUNK_SIZE. 167 """ 168 serialized = pickle.dumps(value, 2) 169 length = len(serialized) 170 values = [] 171 for i in range(0, length, _CHUNK_SIZE): 172 values.append(serialized[i:i + _CHUNK_SIZE]) 173 return values 174