1# Copyright 2014 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Create / interact with a batch of updates / deletes.
16
17Batches provide the ability to execute multiple operations
18in a single request to the Cloud Datastore API.
19
20See
21https://cloud.google.com/datastore/docs/concepts/entities#batch_operations
22"""
23
24from google.cloud.datastore import helpers
25from google.cloud.datastore_v1.types import datastore as _datastore_pb2
26
27
28class Batch(object):
29    """An abstraction representing a collected group of updates / deletes.
30
31    Used to build up a bulk mutation.
32
33    For example, the following snippet of code will put the two ``save``
34    operations and the ``delete`` operation into the same mutation, and send
35    them to the server in a single API request:
36
37    .. testsetup:: batch
38
39        import uuid
40
41        from google.cloud import datastore
42
43        unique = str(uuid.uuid4())[0:8]
44        client = datastore.Client(namespace='ns{}'.format(unique))
45
46    .. doctest:: batch
47
48        >>> entity1 = datastore.Entity(client.key('EntityKind', 1234))
49        >>> entity2 = datastore.Entity(client.key('EntityKind', 2345))
50        >>> key3 = client.key('EntityKind', 3456)
51        >>> batch = client.batch()
52        >>> batch.begin()
53        >>> batch.put(entity1)
54        >>> batch.put(entity2)
55        >>> batch.delete(key3)
56        >>> batch.commit()
57
58    You can also use a batch as a context manager, in which case
59    :meth:`commit` will be called automatically if its block exits without
60    raising an exception:
61
62    .. doctest:: batch
63
64        >>> with client.batch() as batch:
65        ...     batch.put(entity1)
66        ...     batch.put(entity2)
67        ...     batch.delete(key3)
68
69    By default, no updates will be sent if the block exits with an error:
70
71    .. doctest:: batch
72
73        >>> def do_some_work(batch):
74        ...    return
75        >>> with client.batch() as batch:
76        ...     do_some_work(batch)
77        ...     raise Exception()  # rolls back
78        Traceback (most recent call last):
79          ...
80        Exception
81
82    .. testcleanup:: txn
83
84        with client.batch() as batch:
85            batch.delete(client.key('EntityKind', 1234))
86            batch.delete(client.key('EntityKind', 2345))
87
88    :type client: :class:`google.cloud.datastore.client.Client`
89    :param client: The client used to connect to datastore.
90    """
91
92    _id = None  # "protected" attribute, always None for non-transactions
93
94    _INITIAL = 0
95    """Enum value for _INITIAL status of batch/transaction."""
96
97    _IN_PROGRESS = 1
98    """Enum value for _IN_PROGRESS status of batch/transaction."""
99
100    _ABORTED = 2
101    """Enum value for _ABORTED status of batch/transaction."""
102
103    _FINISHED = 3
104    """Enum value for _FINISHED status of batch/transaction."""
105
106    def __init__(self, client):
107        self._client = client
108        self._mutations = []
109        self._partial_key_entities = []
110        self._status = self._INITIAL
111
112    def current(self):
113        """Return the topmost batch / transaction, or None."""
114        return self._client.current_batch
115
116    @property
117    def project(self):
118        """Getter for project in which the batch will run.
119
120        :rtype: :class:`str`
121        :returns: The project in which the batch will run.
122        """
123        return self._client.project
124
125    @property
126    def namespace(self):
127        """Getter for namespace in which the batch will run.
128
129        :rtype: :class:`str`
130        :returns: The namespace in which the batch will run.
131        """
132        return self._client.namespace
133
134    def _add_partial_key_entity_pb(self):
135        """Adds a new mutation for an entity with a partial key.
136
137        :rtype: :class:`.entity_pb2.Entity`
138        :returns: The newly created entity protobuf that will be
139                  updated and sent with a commit.
140        """
141        new_mutation = _datastore_pb2.Mutation()
142        self._mutations.append(new_mutation)
143        return new_mutation.insert
144
145    def _add_complete_key_entity_pb(self):
146        """Adds a new mutation for an entity with a completed key.
147
148        :rtype: :class:`.entity_pb2.Entity`
149        :returns: The newly created entity protobuf that will be
150                  updated and sent with a commit.
151        """
152        # We use ``upsert`` for entities with completed keys, rather than
153        # ``insert`` or ``update``, in order not to create race conditions
154        # based on prior existence / removal of the entity.
155        new_mutation = _datastore_pb2.Mutation()
156        self._mutations.append(new_mutation)
157        return new_mutation.upsert
158
159    def _add_delete_key_pb(self):
160        """Adds a new mutation for a key to be deleted.
161
162        :rtype: :class:`.entity_pb2.Key`
163        :returns: The newly created key protobuf that will be
164                  deleted when sent with a commit.
165        """
166        new_mutation = _datastore_pb2.Mutation()
167        self._mutations.append(new_mutation)
168        return new_mutation.delete
169
170    @property
171    def mutations(self):
172        """Getter for the changes accumulated by this batch.
173
174        Every batch is committed with a single commit request containing all
175        the work to be done as mutations. Inside a batch, calling :meth:`put`
176        with an entity, or :meth:`delete` with a key, builds up the request by
177        adding a new mutation. This getter returns the protobuf that has been
178        built-up so far.
179
180        :rtype: iterable
181        :returns: The list of :class:`.datastore_pb2.Mutation`
182                  protobufs to be sent in the commit request.
183        """
184        return self._mutations
185
186    def put(self, entity):
187        """Remember an entity's state to be saved during :meth:`commit`.
188
189        .. note::
190           Any existing properties for the entity will be replaced by those
191           currently set on this instance.  Already-stored properties which do
192           not correspond to keys set on this instance will be removed from
193           the datastore.
194
195        .. note::
196           Property values which are "text" ('unicode' in Python2, 'str' in
197           Python3) map to 'string_value' in the datastore;  values which are
198           "bytes" ('str' in Python2, 'bytes' in Python3) map to 'blob_value'.
199
200        When an entity has a partial key, calling :meth:`commit` sends it as
201        an ``insert`` mutation and the key is completed. On return,
202        the key for the ``entity`` passed in is updated to match the key ID
203        assigned by the server.
204
205        :type entity: :class:`google.cloud.datastore.entity.Entity`
206        :param entity: the entity to be saved.
207
208        :raises: :class:`~exceptions.ValueError` if the batch is not in
209                 progress, if entity has no key assigned, or if the key's
210                 ``project`` does not match ours.
211        """
212        if self._status != self._IN_PROGRESS:
213            raise ValueError("Batch must be in progress to put()")
214
215        if entity.key is None:
216            raise ValueError("Entity must have a key")
217
218        if self.project != entity.key.project:
219            raise ValueError("Key must be from same project as batch")
220
221        if entity.key.is_partial:
222            entity_pb = self._add_partial_key_entity_pb()
223            self._partial_key_entities.append(entity)
224        else:
225            entity_pb = self._add_complete_key_entity_pb()
226
227        _assign_entity_to_pb(entity_pb, entity)
228
229    def delete(self, key):
230        """Remember a key to be deleted during :meth:`commit`.
231
232        :type key: :class:`google.cloud.datastore.key.Key`
233        :param key: the key to be deleted.
234
235        :raises: :class:`~exceptions.ValueError` if the batch is not in
236                 progress, if key is not complete, or if the key's
237                 ``project`` does not match ours.
238        """
239        if self._status != self._IN_PROGRESS:
240            raise ValueError("Batch must be in progress to delete()")
241
242        if key.is_partial:
243            raise ValueError("Key must be complete")
244
245        if self.project != key.project:
246            raise ValueError("Key must be from same project as batch")
247
248        key_pb = key.to_protobuf()
249        self._add_delete_key_pb()._pb.CopyFrom(key_pb._pb)
250
251    def begin(self):
252        """Begins a batch.
253
254        This method is called automatically when entering a with
255        statement, however it can be called explicitly if you don't want
256        to use a context manager.
257
258        Overridden by :class:`google.cloud.datastore.transaction.Transaction`.
259
260        :raises: :class:`ValueError` if the batch has already begun.
261        """
262        if self._status != self._INITIAL:
263            raise ValueError("Batch already started previously.")
264        self._status = self._IN_PROGRESS
265
266    def _commit(self, retry, timeout):
267        """Commits the batch.
268
269        This is called by :meth:`commit`.
270        """
271        if self._id is None:
272            mode = _datastore_pb2.CommitRequest.Mode.NON_TRANSACTIONAL
273        else:
274            mode = _datastore_pb2.CommitRequest.Mode.TRANSACTIONAL
275
276        kwargs = {}
277
278        if retry is not None:
279            kwargs["retry"] = retry
280
281        if timeout is not None:
282            kwargs["timeout"] = timeout
283
284        commit_response_pb = self._client._datastore_api.commit(
285            request={
286                "project_id": self.project,
287                "mode": mode,
288                "transaction": self._id,
289                "mutations": self._mutations,
290            },
291            **kwargs,
292        )
293
294        _, updated_keys = _parse_commit_response(commit_response_pb)
295        # If the back-end returns without error, we are guaranteed that
296        # ``commit`` will return keys that match (length and
297        # order) directly ``_partial_key_entities``.
298        for new_key_pb, entity in zip(updated_keys, self._partial_key_entities):
299            new_id = new_key_pb.path[-1].id
300            entity.key = entity.key.completed_key(new_id)
301
302    def commit(self, retry=None, timeout=None):
303        """Commits the batch.
304
305        This is called automatically upon exiting a with statement,
306        however it can be called explicitly if you don't want to use a
307        context manager.
308
309        :type retry: :class:`google.api_core.retry.Retry`
310        :param retry:
311            A retry object used to retry requests. If ``None`` is specified,
312            requests will be retried using a default configuration.
313
314        :type timeout: float
315        :param timeout:
316            Time, in seconds, to wait for the request to complete.
317            Note that if ``retry`` is specified, the timeout applies
318            to each individual attempt.
319
320        :raises: :class:`~exceptions.ValueError` if the batch is not
321                 in progress.
322        """
323        if self._status != self._IN_PROGRESS:
324            raise ValueError("Batch must be in progress to commit()")
325
326        try:
327            self._commit(retry=retry, timeout=timeout)
328        finally:
329            self._status = self._FINISHED
330
331    def rollback(self):
332        """Rolls back the current batch.
333
334        Marks the batch as aborted (can't be used again).
335
336        Overridden by :class:`google.cloud.datastore.transaction.Transaction`.
337
338        :raises: :class:`~exceptions.ValueError` if the batch is not
339                 in progress.
340        """
341        if self._status != self._IN_PROGRESS:
342            raise ValueError("Batch must be in progress to rollback()")
343
344        self._status = self._ABORTED
345
346    def __enter__(self):
347        self.begin()
348        # NOTE: We make sure begin() succeeds before pushing onto the stack.
349        self._client._push_batch(self)
350        return self
351
352    def __exit__(self, exc_type, exc_val, exc_tb):
353        try:
354            if exc_type is None:
355                self.commit()
356            else:
357                self.rollback()
358        finally:
359            self._client._pop_batch()
360
361
362def _assign_entity_to_pb(entity_pb, entity):
363    """Copy ``entity`` into ``entity_pb``.
364
365    Helper method for ``Batch.put``.
366
367    :type entity_pb: :class:`.entity_pb2.Entity`
368    :param entity_pb: The entity owned by a mutation.
369
370    :type entity: :class:`google.cloud.datastore.entity.Entity`
371    :param entity: The entity being updated within the batch / transaction.
372    """
373    bare_entity_pb = helpers.entity_to_protobuf(entity)
374    bare_entity_pb._pb.key.CopyFrom(bare_entity_pb._pb.key)
375    entity_pb._pb.CopyFrom(bare_entity_pb._pb)
376
377
378def _parse_commit_response(commit_response):
379    """Extract response data from a commit response.
380
381    :type commit_response_pb: :class:`.datastore_pb2.CommitResponse`
382    :param commit_response_pb: The protobuf response from a commit request.
383
384    :rtype: tuple
385    :returns: The pair of the number of index updates and a list of
386              :class:`.entity_pb2.Key` for each incomplete key
387              that was completed in the commit.
388    """
389    commit_response_pb = commit_response._pb
390    mut_results = commit_response_pb.mutation_results
391    index_updates = commit_response_pb.index_updates
392    completed_keys = [
393        mut_result.key for mut_result in mut_results if mut_result.HasField("key")
394    ]  # Message field (Key)
395    return index_updates, completed_keys
396