1# Copyright 2014 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Create / interact with a batch of updates / deletes. 16 17Batches provide the ability to execute multiple operations 18in a single request to the Cloud Datastore API. 19 20See 21https://cloud.google.com/datastore/docs/concepts/entities#batch_operations 22""" 23 24from google.cloud.datastore import helpers 25from google.cloud.datastore_v1.types import datastore as _datastore_pb2 26 27 28class Batch(object): 29 """An abstraction representing a collected group of updates / deletes. 30 31 Used to build up a bulk mutation. 32 33 For example, the following snippet of code will put the two ``save`` 34 operations and the ``delete`` operation into the same mutation, and send 35 them to the server in a single API request: 36 37 .. testsetup:: batch 38 39 import uuid 40 41 from google.cloud import datastore 42 43 unique = str(uuid.uuid4())[0:8] 44 client = datastore.Client(namespace='ns{}'.format(unique)) 45 46 .. doctest:: batch 47 48 >>> entity1 = datastore.Entity(client.key('EntityKind', 1234)) 49 >>> entity2 = datastore.Entity(client.key('EntityKind', 2345)) 50 >>> key3 = client.key('EntityKind', 3456) 51 >>> batch = client.batch() 52 >>> batch.begin() 53 >>> batch.put(entity1) 54 >>> batch.put(entity2) 55 >>> batch.delete(key3) 56 >>> batch.commit() 57 58 You can also use a batch as a context manager, in which case 59 :meth:`commit` will be called automatically if its block exits without 60 raising an exception: 61 62 .. doctest:: batch 63 64 >>> with client.batch() as batch: 65 ... batch.put(entity1) 66 ... batch.put(entity2) 67 ... batch.delete(key3) 68 69 By default, no updates will be sent if the block exits with an error: 70 71 .. doctest:: batch 72 73 >>> def do_some_work(batch): 74 ... return 75 >>> with client.batch() as batch: 76 ... do_some_work(batch) 77 ... raise Exception() # rolls back 78 Traceback (most recent call last): 79 ... 80 Exception 81 82 .. testcleanup:: txn 83 84 with client.batch() as batch: 85 batch.delete(client.key('EntityKind', 1234)) 86 batch.delete(client.key('EntityKind', 2345)) 87 88 :type client: :class:`google.cloud.datastore.client.Client` 89 :param client: The client used to connect to datastore. 90 """ 91 92 _id = None # "protected" attribute, always None for non-transactions 93 94 _INITIAL = 0 95 """Enum value for _INITIAL status of batch/transaction.""" 96 97 _IN_PROGRESS = 1 98 """Enum value for _IN_PROGRESS status of batch/transaction.""" 99 100 _ABORTED = 2 101 """Enum value for _ABORTED status of batch/transaction.""" 102 103 _FINISHED = 3 104 """Enum value for _FINISHED status of batch/transaction.""" 105 106 def __init__(self, client): 107 self._client = client 108 self._mutations = [] 109 self._partial_key_entities = [] 110 self._status = self._INITIAL 111 112 def current(self): 113 """Return the topmost batch / transaction, or None.""" 114 return self._client.current_batch 115 116 @property 117 def project(self): 118 """Getter for project in which the batch will run. 119 120 :rtype: :class:`str` 121 :returns: The project in which the batch will run. 122 """ 123 return self._client.project 124 125 @property 126 def namespace(self): 127 """Getter for namespace in which the batch will run. 128 129 :rtype: :class:`str` 130 :returns: The namespace in which the batch will run. 131 """ 132 return self._client.namespace 133 134 def _add_partial_key_entity_pb(self): 135 """Adds a new mutation for an entity with a partial key. 136 137 :rtype: :class:`.entity_pb2.Entity` 138 :returns: The newly created entity protobuf that will be 139 updated and sent with a commit. 140 """ 141 new_mutation = _datastore_pb2.Mutation() 142 self._mutations.append(new_mutation) 143 return new_mutation.insert 144 145 def _add_complete_key_entity_pb(self): 146 """Adds a new mutation for an entity with a completed key. 147 148 :rtype: :class:`.entity_pb2.Entity` 149 :returns: The newly created entity protobuf that will be 150 updated and sent with a commit. 151 """ 152 # We use ``upsert`` for entities with completed keys, rather than 153 # ``insert`` or ``update``, in order not to create race conditions 154 # based on prior existence / removal of the entity. 155 new_mutation = _datastore_pb2.Mutation() 156 self._mutations.append(new_mutation) 157 return new_mutation.upsert 158 159 def _add_delete_key_pb(self): 160 """Adds a new mutation for a key to be deleted. 161 162 :rtype: :class:`.entity_pb2.Key` 163 :returns: The newly created key protobuf that will be 164 deleted when sent with a commit. 165 """ 166 new_mutation = _datastore_pb2.Mutation() 167 self._mutations.append(new_mutation) 168 return new_mutation.delete 169 170 @property 171 def mutations(self): 172 """Getter for the changes accumulated by this batch. 173 174 Every batch is committed with a single commit request containing all 175 the work to be done as mutations. Inside a batch, calling :meth:`put` 176 with an entity, or :meth:`delete` with a key, builds up the request by 177 adding a new mutation. This getter returns the protobuf that has been 178 built-up so far. 179 180 :rtype: iterable 181 :returns: The list of :class:`.datastore_pb2.Mutation` 182 protobufs to be sent in the commit request. 183 """ 184 return self._mutations 185 186 def put(self, entity): 187 """Remember an entity's state to be saved during :meth:`commit`. 188 189 .. note:: 190 Any existing properties for the entity will be replaced by those 191 currently set on this instance. Already-stored properties which do 192 not correspond to keys set on this instance will be removed from 193 the datastore. 194 195 .. note:: 196 Property values which are "text" ('unicode' in Python2, 'str' in 197 Python3) map to 'string_value' in the datastore; values which are 198 "bytes" ('str' in Python2, 'bytes' in Python3) map to 'blob_value'. 199 200 When an entity has a partial key, calling :meth:`commit` sends it as 201 an ``insert`` mutation and the key is completed. On return, 202 the key for the ``entity`` passed in is updated to match the key ID 203 assigned by the server. 204 205 :type entity: :class:`google.cloud.datastore.entity.Entity` 206 :param entity: the entity to be saved. 207 208 :raises: :class:`~exceptions.ValueError` if the batch is not in 209 progress, if entity has no key assigned, or if the key's 210 ``project`` does not match ours. 211 """ 212 if self._status != self._IN_PROGRESS: 213 raise ValueError("Batch must be in progress to put()") 214 215 if entity.key is None: 216 raise ValueError("Entity must have a key") 217 218 if self.project != entity.key.project: 219 raise ValueError("Key must be from same project as batch") 220 221 if entity.key.is_partial: 222 entity_pb = self._add_partial_key_entity_pb() 223 self._partial_key_entities.append(entity) 224 else: 225 entity_pb = self._add_complete_key_entity_pb() 226 227 _assign_entity_to_pb(entity_pb, entity) 228 229 def delete(self, key): 230 """Remember a key to be deleted during :meth:`commit`. 231 232 :type key: :class:`google.cloud.datastore.key.Key` 233 :param key: the key to be deleted. 234 235 :raises: :class:`~exceptions.ValueError` if the batch is not in 236 progress, if key is not complete, or if the key's 237 ``project`` does not match ours. 238 """ 239 if self._status != self._IN_PROGRESS: 240 raise ValueError("Batch must be in progress to delete()") 241 242 if key.is_partial: 243 raise ValueError("Key must be complete") 244 245 if self.project != key.project: 246 raise ValueError("Key must be from same project as batch") 247 248 key_pb = key.to_protobuf() 249 self._add_delete_key_pb()._pb.CopyFrom(key_pb._pb) 250 251 def begin(self): 252 """Begins a batch. 253 254 This method is called automatically when entering a with 255 statement, however it can be called explicitly if you don't want 256 to use a context manager. 257 258 Overridden by :class:`google.cloud.datastore.transaction.Transaction`. 259 260 :raises: :class:`ValueError` if the batch has already begun. 261 """ 262 if self._status != self._INITIAL: 263 raise ValueError("Batch already started previously.") 264 self._status = self._IN_PROGRESS 265 266 def _commit(self, retry, timeout): 267 """Commits the batch. 268 269 This is called by :meth:`commit`. 270 """ 271 if self._id is None: 272 mode = _datastore_pb2.CommitRequest.Mode.NON_TRANSACTIONAL 273 else: 274 mode = _datastore_pb2.CommitRequest.Mode.TRANSACTIONAL 275 276 kwargs = {} 277 278 if retry is not None: 279 kwargs["retry"] = retry 280 281 if timeout is not None: 282 kwargs["timeout"] = timeout 283 284 commit_response_pb = self._client._datastore_api.commit( 285 request={ 286 "project_id": self.project, 287 "mode": mode, 288 "transaction": self._id, 289 "mutations": self._mutations, 290 }, 291 **kwargs, 292 ) 293 294 _, updated_keys = _parse_commit_response(commit_response_pb) 295 # If the back-end returns without error, we are guaranteed that 296 # ``commit`` will return keys that match (length and 297 # order) directly ``_partial_key_entities``. 298 for new_key_pb, entity in zip(updated_keys, self._partial_key_entities): 299 new_id = new_key_pb.path[-1].id 300 entity.key = entity.key.completed_key(new_id) 301 302 def commit(self, retry=None, timeout=None): 303 """Commits the batch. 304 305 This is called automatically upon exiting a with statement, 306 however it can be called explicitly if you don't want to use a 307 context manager. 308 309 :type retry: :class:`google.api_core.retry.Retry` 310 :param retry: 311 A retry object used to retry requests. If ``None`` is specified, 312 requests will be retried using a default configuration. 313 314 :type timeout: float 315 :param timeout: 316 Time, in seconds, to wait for the request to complete. 317 Note that if ``retry`` is specified, the timeout applies 318 to each individual attempt. 319 320 :raises: :class:`~exceptions.ValueError` if the batch is not 321 in progress. 322 """ 323 if self._status != self._IN_PROGRESS: 324 raise ValueError("Batch must be in progress to commit()") 325 326 try: 327 self._commit(retry=retry, timeout=timeout) 328 finally: 329 self._status = self._FINISHED 330 331 def rollback(self): 332 """Rolls back the current batch. 333 334 Marks the batch as aborted (can't be used again). 335 336 Overridden by :class:`google.cloud.datastore.transaction.Transaction`. 337 338 :raises: :class:`~exceptions.ValueError` if the batch is not 339 in progress. 340 """ 341 if self._status != self._IN_PROGRESS: 342 raise ValueError("Batch must be in progress to rollback()") 343 344 self._status = self._ABORTED 345 346 def __enter__(self): 347 self.begin() 348 # NOTE: We make sure begin() succeeds before pushing onto the stack. 349 self._client._push_batch(self) 350 return self 351 352 def __exit__(self, exc_type, exc_val, exc_tb): 353 try: 354 if exc_type is None: 355 self.commit() 356 else: 357 self.rollback() 358 finally: 359 self._client._pop_batch() 360 361 362def _assign_entity_to_pb(entity_pb, entity): 363 """Copy ``entity`` into ``entity_pb``. 364 365 Helper method for ``Batch.put``. 366 367 :type entity_pb: :class:`.entity_pb2.Entity` 368 :param entity_pb: The entity owned by a mutation. 369 370 :type entity: :class:`google.cloud.datastore.entity.Entity` 371 :param entity: The entity being updated within the batch / transaction. 372 """ 373 bare_entity_pb = helpers.entity_to_protobuf(entity) 374 bare_entity_pb._pb.key.CopyFrom(bare_entity_pb._pb.key) 375 entity_pb._pb.CopyFrom(bare_entity_pb._pb) 376 377 378def _parse_commit_response(commit_response): 379 """Extract response data from a commit response. 380 381 :type commit_response_pb: :class:`.datastore_pb2.CommitResponse` 382 :param commit_response_pb: The protobuf response from a commit request. 383 384 :rtype: tuple 385 :returns: The pair of the number of index updates and a list of 386 :class:`.entity_pb2.Key` for each incomplete key 387 that was completed in the commit. 388 """ 389 commit_response_pb = commit_response._pb 390 mut_results = commit_response_pb.mutation_results 391 index_updates = commit_response_pb.index_updates 392 completed_keys = [ 393 mut_result.key for mut_result in mut_results if mut_result.HasField("key") 394 ] # Message field (Key) 395 return index_updates, completed_keys 396