1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A module for storing and getting objects from datastore.
5
6App Engine datastore limits entity size to less than 1 MB; this module
7supports storing larger objects by splitting the data and using multiple
8datastore entities.
9
10Although this module contains ndb.Model classes, these are not intended
11to be used directly by other modules.
12
13Example:
14  john = Account()
15  john.username = 'John'
16  john.userid = 123
17  stored_object.Set(john.userid, john)
18"""
19from __future__ import print_function
20from __future__ import division
21from __future__ import absolute_import
22
23import cPickle as pickle
24
25from google.appengine.ext import ndb
26
27# Max bytes per entity.
28_CHUNK_SIZE = 1000 * 1000
29
30
31@ndb.synctasklet
32@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
33def Get(key):
34  """Gets the value.
35
36  Args:
37    key: String key value.
38
39  Returns:
40    A value for key.
41  """
42  result = yield GetAsync(key)
43  raise ndb.Return(result)
44
45
46@ndb.tasklet
47@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
48def GetAsync(key):
49  entity = yield ndb.Key(MultipartEntity, key).get_async()
50  if not entity:
51    raise ndb.Return(None)
52  yield entity.GetPartsAsync()
53  raise ndb.Return(entity.GetData())
54
55
56@ndb.synctasklet
57@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
58def Set(key, value):
59  """Sets the value in datastore.
60
61  Args:
62    key: String key value.
63    value: A pickleable value to be stored.
64  """
65  yield SetAsync(key, value)
66
67
68@ndb.tasklet
69@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
70def SetAsync(key, value):
71  entity = yield ndb.Key(MultipartEntity, key).get_async()
72  if not entity:
73    entity = MultipartEntity(id=key)
74  entity.SetData(value)
75  yield entity.PutAsync()
76
77
78@ndb.synctasklet
79@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
80def Delete(key):
81  """Deletes the value in datastore."""
82  yield DeleteAsync(key)
83
84
85@ndb.tasklet
86@ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
87def DeleteAsync(key):
88  multipart_entity_key = ndb.Key(MultipartEntity, key)
89  # Check if the entity exists before attempting to delete it in order to avoid
90  # contention errors.
91  # "Every attempt to create, update, or delete an entity takes place in the
92  # context of a transaction,"
93  # "There is a write throughput limit of about one transaction per second
94  # within a single entity group"
95  entity = yield multipart_entity_key.get_async()
96  if not entity:
97    return
98  yield (multipart_entity_key.delete_async(),
99         MultipartEntity.DeleteAsync(multipart_entity_key))
100
101
102class MultipartEntity(ndb.Model):
103  """Container for PartEntity."""
104
105  # Number of entities use to store serialized.
106  size = ndb.IntegerProperty(default=0, indexed=False)
107
108  @ndb.tasklet
109  @ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
110  def GetPartsAsync(self):
111    """Deserializes data from multiple PartEntity."""
112    if not self.size:
113      raise ndb.Return(None)
114
115    string_id = self.key.string_id()
116    part_keys = [
117        ndb.Key(MultipartEntity, string_id, PartEntity, i + 1)
118        for i in range(self.size)
119    ]
120    part_entities = yield ndb.get_multi_async(part_keys)
121    serialized = ''.join(p.value for p in part_entities if p is not None)
122    self.SetData(pickle.loads(serialized))
123
124  @classmethod
125  @ndb.tasklet
126  @ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
127  def DeleteAsync(cls, key):
128    part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True)
129    yield ndb.delete_multi_async(part_keys)
130
131  @ndb.tasklet
132  @ndb.transactional(propagation=ndb.TransactionOptions.ALLOWED, xg=True)
133  def PutAsync(self):
134    """Stores serialized data over multiple PartEntity."""
135    part_list = [
136        PartEntity(id=i + 1, parent=self.key, value=part)
137        for i, part in enumerate(_Serialize(self.GetData()))
138    ]
139    self.size = len(part_list)
140    yield ndb.put_multi_async(part_list + [self])
141
142  def GetData(self):
143    return getattr(self, '_data', None)
144
145  def SetData(self, data):
146    setattr(self, '_data', data)
147
148
149class PartEntity(ndb.Model):
150  """Holds a part of serialized data for MultipartEntity.
151
152  This entity key has the form:
153    ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index)
154  """
155  value = ndb.BlobProperty()
156
157
158def _Serialize(value):
159  """Serializes value and returns a list of its parts.
160
161  Args:
162    value: A pickleable value.
163
164  Returns:
165    A list of string representation of the value that has been pickled and split
166    into _CHUNK_SIZE.
167  """
168  serialized = pickle.dumps(value, 2)
169  length = len(serialized)
170  values = []
171  for i in range(0, length, _CHUNK_SIZE):
172    values.append(serialized[i:i + _CHUNK_SIZE])
173  return values
174