1# Copyright 2014 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Create / interact with Google Cloud Datastore queries."""
16
17import base64
18
19from google.api_core import page_iterator
20from google.cloud._helpers import _ensure_tuple_or_list
21
22from google.cloud.datastore_v1.types import entity as entity_pb2
23from google.cloud.datastore_v1.types import query as query_pb2
24from google.cloud.datastore import helpers
25from google.cloud.datastore.key import Key
26
27
28_NOT_FINISHED = query_pb2.QueryResultBatch.MoreResultsType.NOT_FINISHED
29_NO_MORE_RESULTS = query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS
30
31_FINISHED = (
32    _NO_MORE_RESULTS,
33    query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT,
34    query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR,
35)
36
37
38class Query(object):
39    """A Query against the Cloud Datastore.
40
41    This class serves as an abstraction for creating a query over data
42    stored in the Cloud Datastore.
43
44    :type client: :class:`google.cloud.datastore.client.Client`
45    :param client: The client used to connect to Datastore.
46
47    :type kind: str
48    :param kind: The kind to query.
49
50    :type project: str
51    :param project:
52        (Optional) The project associated with the query.  If not passed, uses
53        the client's value.
54
55    :type namespace: str
56    :param namespace:
57        (Optional) The namespace to which to restrict results.  If not passed,
58        uses the client's value.
59
60    :type ancestor: :class:`~google.cloud.datastore.key.Key`
61    :param ancestor:
62        (Optional) key of the ancestor to which this query's results are
63        restricted.
64
65    :type filters: tuple[str, str, str]
66    :param filters: Property filters applied by this query. The sequence
67        is ``(property_name, operator, value)``.
68
69    :type projection: sequence of string
70    :param projection:  fields returned as part of query results.
71
72    :type order: sequence of string
73    :param order:  field names used to order query results. Prepend ``-``
74                   to a field name to sort it in descending order.
75
76    :type distinct_on: sequence of string
77    :param distinct_on: field names used to group query results.
78
79    :raises: ValueError if ``project`` is not passed and no implicit
80             default is set.
81    """
82
83    OPERATORS = {
84        "<=": query_pb2.PropertyFilter.Operator.LESS_THAN_OR_EQUAL,
85        ">=": query_pb2.PropertyFilter.Operator.GREATER_THAN_OR_EQUAL,
86        "<": query_pb2.PropertyFilter.Operator.LESS_THAN,
87        ">": query_pb2.PropertyFilter.Operator.GREATER_THAN,
88        "=": query_pb2.PropertyFilter.Operator.EQUAL,
89    }
90    """Mapping of operator strings and their protobuf equivalents."""
91
92    def __init__(
93        self,
94        client,
95        kind=None,
96        project=None,
97        namespace=None,
98        ancestor=None,
99        filters=(),
100        projection=(),
101        order=(),
102        distinct_on=(),
103    ):
104
105        self._client = client
106        self._kind = kind
107        self._project = project or client.project
108        self._namespace = namespace or client.namespace
109        self._ancestor = ancestor
110        self._filters = []
111        # Verify filters passed in.
112        for property_name, operator, value in filters:
113            self.add_filter(property_name, operator, value)
114        self._projection = _ensure_tuple_or_list("projection", projection)
115        self._order = _ensure_tuple_or_list("order", order)
116        self._distinct_on = _ensure_tuple_or_list("distinct_on", distinct_on)
117
118    @property
119    def project(self):
120        """Get the project for this Query.
121
122        :rtype: str
123        :returns: The project for the query.
124        """
125        return self._project or self._client.project
126
127    @property
128    def namespace(self):
129        """This query's namespace
130
131        :rtype: str or None
132        :returns: the namespace assigned to this query
133        """
134        return self._namespace or self._client.namespace
135
136    @namespace.setter
137    def namespace(self, value):
138        """Update the query's namespace.
139
140        :type value: str
141        """
142        if not isinstance(value, str):
143            raise ValueError("Namespace must be a string")
144        self._namespace = value
145
146    @property
147    def kind(self):
148        """Get the Kind of the Query.
149
150        :rtype: str
151        :returns: The kind for the query.
152        """
153        return self._kind
154
155    @kind.setter
156    def kind(self, value):
157        """Update the Kind of the Query.
158
159        :type value: str
160        :param value: updated kind for the query.
161
162        .. note::
163
164           The protobuf specification allows for ``kind`` to be repeated,
165           but the current implementation returns an error if more than
166           one value is passed.  If the back-end changes in the future to
167           allow multiple values, this method will be updated to allow passing
168           either a string or a sequence of strings.
169        """
170        if not isinstance(value, str):
171            raise TypeError("Kind must be a string")
172        self._kind = value
173
174    @property
175    def ancestor(self):
176        """The ancestor key for the query.
177
178        :rtype: :class:`~google.cloud.datastore.key.Key` or None
179        :returns: The ancestor for the query.
180        """
181        return self._ancestor
182
183    @ancestor.setter
184    def ancestor(self, value):
185        """Set the ancestor for the query
186
187        :type value: :class:`~google.cloud.datastore.key.Key`
188        :param value: the new ancestor key
189        """
190        if not isinstance(value, Key):
191            raise TypeError("Ancestor must be a Key")
192        self._ancestor = value
193
194    @ancestor.deleter
195    def ancestor(self):
196        """Remove the ancestor for the query."""
197        self._ancestor = None
198
199    @property
200    def filters(self):
201        """Filters set on the query.
202
203        :rtype: tuple[str, str, str]
204        :returns: The filters set on the query. The sequence is
205            ``(property_name, operator, value)``.
206        """
207        return self._filters[:]
208
209    def add_filter(self, property_name, operator, value):
210        """Filter the query based on a property name, operator and a value.
211
212        Expressions take the form of::
213
214          .add_filter('<property>', '<operator>', <value>)
215
216        where property is a property stored on the entity in the datastore
217        and operator is one of ``OPERATORS``
218        (ie, ``=``, ``<``, ``<=``, ``>``, ``>=``):
219
220        .. testsetup:: query-filter
221
222            import uuid
223
224            from google.cloud import datastore
225
226            client = datastore.Client()
227
228        .. doctest:: query-filter
229
230            >>> query = client.query(kind='Person')
231            >>> query = query.add_filter('name', '=', 'James')
232            >>> query = query.add_filter('age', '>', 50)
233
234        :type property_name: str
235        :param property_name: A property name.
236
237        :type operator: str
238        :param operator: One of ``=``, ``<``, ``<=``, ``>``, ``>=``.
239
240        :type value: :class:`int`, :class:`str`, :class:`bool`,
241                     :class:`float`, :class:`NoneType`,
242                     :class:`datetime.datetime`,
243                     :class:`google.cloud.datastore.key.Key`
244        :param value: The value to filter on.
245
246        :rtype: :class:`~google.cloud.datastore.query.Query`
247        :returns: A query object.
248
249        :raises: :class:`ValueError` if ``operation`` is not one of the
250                 specified values, or if a filter names ``'__key__'`` but
251                 passes an invalid value (a key is required).
252        """
253        if self.OPERATORS.get(operator) is None:
254            error_message = 'Invalid expression: "%s"' % (operator,)
255            choices_message = "Please use one of: =, <, <=, >, >=."
256            raise ValueError(error_message, choices_message)
257
258        if property_name == "__key__" and not isinstance(value, Key):
259            raise ValueError('Invalid key: "%s"' % value)
260
261        self._filters.append((property_name, operator, value))
262        return self
263
264    @property
265    def projection(self):
266        """Fields names returned by the query.
267
268        :rtype: sequence of string
269        :returns: Names of fields in query results.
270        """
271        return self._projection[:]
272
273    @projection.setter
274    def projection(self, projection):
275        """Set the fields returned the query.
276
277        :type projection: str or sequence of strings
278        :param projection: Each value is a string giving the name of a
279                           property to be included in the projection query.
280        """
281        if isinstance(projection, str):
282            projection = [projection]
283        self._projection[:] = projection
284
285    def keys_only(self):
286        """Set the projection to include only keys."""
287        self._projection[:] = ["__key__"]
288
289    def key_filter(self, key, operator="="):
290        """Filter on a key.
291
292        :type key: :class:`google.cloud.datastore.key.Key`
293        :param key: The key to filter on.
294
295        :type operator: str
296        :param operator: (Optional) One of ``=``, ``<``, ``<=``, ``>``, ``>=``.
297                         Defaults to ``=``.
298        """
299        self.add_filter("__key__", operator, key)
300
301    @property
302    def order(self):
303        """Names of fields used to sort query results.
304
305        :rtype: sequence of string
306        :returns: The order(s) set on the query.
307        """
308        return self._order[:]
309
310    @order.setter
311    def order(self, value):
312        """Set the fields used to sort query results.
313
314        Sort fields will be applied in the order specified.
315
316        :type value: str or sequence of strings
317        :param value: Each value is a string giving the name of the
318                      property on which to sort, optionally preceded by a
319                      hyphen (-) to specify descending order.
320                      Omitting the hyphen implies ascending order.
321        """
322        if isinstance(value, str):
323            value = [value]
324        self._order[:] = value
325
326    @property
327    def distinct_on(self):
328        """Names of fields used to group query results.
329
330        :rtype: sequence of string
331        :returns: The "distinct on" fields set on the query.
332        """
333        return self._distinct_on[:]
334
335    @distinct_on.setter
336    def distinct_on(self, value):
337        """Set fields used to group query results.
338
339        :type value: str or sequence of strings
340        :param value: Each value is a string giving the name of a
341                      property to use to group results together.
342        """
343        if isinstance(value, str):
344            value = [value]
345        self._distinct_on[:] = value
346
347    def fetch(
348        self,
349        limit=None,
350        offset=0,
351        start_cursor=None,
352        end_cursor=None,
353        client=None,
354        eventual=False,
355        retry=None,
356        timeout=None,
357    ):
358        """Execute the Query; return an iterator for the matching entities.
359
360        For example:
361
362        .. testsetup:: query-fetch
363
364            import uuid
365
366            from google.cloud import datastore
367
368            unique = str(uuid.uuid4())[0:8]
369            client = datastore.Client(namespace='ns{}'.format(unique))
370
371
372        .. doctest:: query-fetch
373
374            >>> andy = datastore.Entity(client.key('Person', 1234))
375            >>> andy['name'] = 'Andy'
376            >>> sally = datastore.Entity(client.key('Person', 2345))
377            >>> sally['name'] = 'Sally'
378            >>> bobby = datastore.Entity(client.key('Person', 3456))
379            >>> bobby['name'] = 'Bobby'
380            >>> client.put_multi([andy, sally, bobby])
381            >>> query = client.query(kind='Person')
382            >>> result = list(query.add_filter('name', '=', 'Sally').fetch())
383            >>> result
384            [<Entity('Person', 2345) {'name': 'Sally'}>]
385
386        .. testcleanup:: query-fetch
387
388            client.delete(andy.key)
389            client.delete(sally.key)
390            client.delete(bobby.key)
391
392        :type limit: int
393        :param limit: (Optional) limit passed through to the iterator.
394
395        :type offset: int
396        :param offset: (Optional) offset passed through to the iterator.
397
398        :type start_cursor: bytes
399        :param start_cursor: (Optional) cursor passed through to the iterator.
400
401        :type end_cursor: bytes
402        :param end_cursor: (Optional) cursor passed through to the iterator.
403
404        :type client: :class:`google.cloud.datastore.client.Client`
405        :param client: (Optional) client used to connect to datastore.
406                       If not supplied, uses the query's value.
407
408        :type eventual: bool
409        :param eventual: (Optional) Defaults to strongly consistent (False).
410                                    Setting True will use eventual consistency,
411                                    but cannot be used inside a transaction or
412                                    will raise ValueError.
413
414        :type retry: :class:`google.api_core.retry.Retry`
415        :param retry:
416            A retry object used to retry requests. If ``None`` is specified,
417            requests will be retried using a default configuration.
418
419        :type timeout: float
420        :param timeout:
421            Time, in seconds, to wait for the request to complete.
422            Note that if ``retry`` is specified, the timeout applies
423            to each individual attempt.
424
425        :rtype: :class:`Iterator`
426        :returns: The iterator for the query.
427        """
428        if client is None:
429            client = self._client
430
431        return Iterator(
432            self,
433            client,
434            limit=limit,
435            offset=offset,
436            start_cursor=start_cursor,
437            end_cursor=end_cursor,
438            eventual=eventual,
439            retry=retry,
440            timeout=timeout,
441        )
442
443
444class Iterator(page_iterator.Iterator):
445    """Represent the state of a given execution of a Query.
446
447    :type query: :class:`~google.cloud.datastore.query.Query`
448    :param query: Query object holding permanent configuration (i.e.
449                  things that don't change on with each page in
450                  a results set).
451
452    :type client: :class:`~google.cloud.datastore.client.Client`
453    :param client: The client used to make a request.
454
455    :type limit: int
456    :param limit: (Optional) Limit the number of results returned.
457
458    :type offset: int
459    :param offset: (Optional) Offset used to begin a query.
460
461    :type start_cursor: bytes
462    :param start_cursor: (Optional) Cursor to begin paging through
463                         query results.
464
465    :type end_cursor: bytes
466    :param end_cursor: (Optional) Cursor to end paging through
467                       query results.
468
469    :type eventual: bool
470    :param eventual: (Optional) Defaults to strongly consistent (False).
471                                Setting True will use eventual consistency,
472                                but cannot be used inside a transaction or
473                                will raise ValueError.
474
475    :type retry: :class:`google.api_core.retry.Retry`
476    :param retry:
477        A retry object used to retry requests. If ``None`` is specified,
478        requests will be retried using a default configuration.
479
480    :type timeout: float
481    :param timeout:
482        Time, in seconds, to wait for the request to complete.
483        Note that if ``retry`` is specified, the timeout applies
484        to each individual attempt.
485    """
486
487    next_page_token = None
488
489    def __init__(
490        self,
491        query,
492        client,
493        limit=None,
494        offset=None,
495        start_cursor=None,
496        end_cursor=None,
497        eventual=False,
498        retry=None,
499        timeout=None,
500    ):
501        super(Iterator, self).__init__(
502            client=client,
503            item_to_value=_item_to_entity,
504            page_token=start_cursor,
505            max_results=limit,
506        )
507        self._query = query
508        self._offset = offset
509        self._end_cursor = end_cursor
510        self._eventual = eventual
511        self._retry = retry
512        self._timeout = timeout
513        # The attributes below will change over the life of the iterator.
514        self._more_results = True
515        self._skipped_results = 0
516
517    def _build_protobuf(self):
518        """Build a query protobuf.
519
520        Relies on the current state of the iterator.
521
522        :rtype:
523            :class:`.query_pb2.Query`
524        :returns: The query protobuf object for the current
525                  state of the iterator.
526        """
527        pb = _pb_from_query(self._query)
528
529        start_cursor = self.next_page_token
530        if start_cursor is not None:
531            pb.start_cursor = base64.urlsafe_b64decode(start_cursor)
532
533        end_cursor = self._end_cursor
534        if end_cursor is not None:
535            pb.end_cursor = base64.urlsafe_b64decode(end_cursor)
536
537        if self.max_results is not None:
538            pb.limit = self.max_results - self.num_results
539
540        if start_cursor is None and self._offset is not None:
541            # NOTE: We don't need to add an offset to the request protobuf
542            #       if we are using an existing cursor, because the offset
543            #       is only relative to the start of the result set, not
544            #       relative to each page (this method is called per-page)
545            pb.offset = self._offset
546
547        return pb
548
549    def _process_query_results(self, response_pb):
550        """Process the response from a datastore query.
551
552        :type response_pb: :class:`.datastore_pb2.RunQueryResponse`
553        :param response_pb: The protobuf response from a ``runQuery`` request.
554
555        :rtype: iterable
556        :returns: The next page of entity results.
557        :raises ValueError: If ``more_results`` is an unexpected value.
558        """
559        self._skipped_results = response_pb.batch.skipped_results
560        if response_pb.batch.more_results == _NO_MORE_RESULTS:
561            self.next_page_token = None
562        else:
563            self.next_page_token = base64.urlsafe_b64encode(
564                response_pb.batch.end_cursor
565            )
566        self._end_cursor = None
567
568        if response_pb.batch.more_results == _NOT_FINISHED:
569            self._more_results = True
570        elif response_pb.batch.more_results in _FINISHED:
571            self._more_results = False
572        else:
573            raise ValueError("Unexpected value returned for `more_results`.")
574
575        return [result.entity for result in response_pb.batch.entity_results]
576
577    def _next_page(self):
578        """Get the next page in the iterator.
579
580        :rtype: :class:`~google.cloud.iterator.Page`
581        :returns: The next page in the iterator (or :data:`None` if
582                  there are no pages left).
583        """
584        if not self._more_results:
585            return None
586
587        query_pb = self._build_protobuf()
588        transaction = self.client.current_transaction
589        if transaction is None:
590            transaction_id = None
591        else:
592            transaction_id = transaction.id
593        read_options = helpers.get_read_options(self._eventual, transaction_id)
594
595        partition_id = entity_pb2.PartitionId(
596            project_id=self._query.project, namespace_id=self._query.namespace
597        )
598
599        kwargs = {}
600
601        if self._retry is not None:
602            kwargs["retry"] = self._retry
603
604        if self._timeout is not None:
605            kwargs["timeout"] = self._timeout
606
607        response_pb = self.client._datastore_api.run_query(
608            request={
609                "project_id": self._query.project,
610                "partition_id": partition_id,
611                "read_options": read_options,
612                "query": query_pb,
613            },
614            **kwargs,
615        )
616
617        while (
618            response_pb.batch.more_results == _NOT_FINISHED
619            and response_pb.batch.skipped_results < query_pb.offset
620        ):
621            # We haven't finished processing. A likely reason is we haven't
622            # skipped all of the results yet. Don't return any results.
623            # Instead, rerun query, adjusting offsets. Datastore doesn't process
624            # more than 1000 skipped results in a query.
625            old_query_pb = query_pb
626            query_pb = query_pb2.Query()
627            query_pb._pb.CopyFrom(old_query_pb._pb)  # copy for testability
628            query_pb.start_cursor = response_pb.batch.skipped_cursor
629            query_pb.offset -= response_pb.batch.skipped_results
630
631            response_pb = self.client._datastore_api.run_query(
632                request={
633                    "project_id": self._query.project,
634                    "partition_id": partition_id,
635                    "read_options": read_options,
636                    "query": query_pb,
637                },
638                **kwargs,
639            )
640
641        entity_pbs = self._process_query_results(response_pb)
642        return page_iterator.Page(self, entity_pbs, self.item_to_value)
643
644
645def _pb_from_query(query):
646    """Convert a Query instance to the corresponding protobuf.
647
648    :type query: :class:`Query`
649    :param query: The source query.
650
651    :rtype: :class:`.query_pb2.Query`
652    :returns: A protobuf that can be sent to the protobuf API.  N.b. that
653              it does not contain "in-flight" fields for ongoing query
654              executions (cursors, offset, limit).
655    """
656    pb = query_pb2.Query()
657
658    for projection_name in query.projection:
659        projection = query_pb2.Projection()
660        projection.property.name = projection_name
661        pb.projection.append(projection)
662
663    if query.kind:
664        kind = query_pb2.KindExpression()
665        kind.name = query.kind
666        pb.kind.append(kind)
667
668    composite_filter = pb.filter.composite_filter
669    composite_filter.op = query_pb2.CompositeFilter.Operator.AND
670
671    if query.ancestor:
672        ancestor_pb = query.ancestor.to_protobuf()
673
674        # Filter on __key__ HAS_ANCESTOR == ancestor.
675        ancestor_filter = composite_filter.filters._pb.add().property_filter
676        ancestor_filter.property.name = "__key__"
677        ancestor_filter.op = query_pb2.PropertyFilter.Operator.HAS_ANCESTOR
678        ancestor_filter.value.key_value.CopyFrom(ancestor_pb._pb)
679
680    for property_name, operator, value in query.filters:
681        pb_op_enum = query.OPERATORS.get(operator)
682
683        # Add the specific filter
684        property_filter = composite_filter.filters._pb.add().property_filter
685        property_filter.property.name = property_name
686        property_filter.op = pb_op_enum
687
688        # Set the value to filter on based on the type.
689        if property_name == "__key__":
690            key_pb = value.to_protobuf()
691            property_filter.value.key_value.CopyFrom(key_pb._pb)
692        else:
693            helpers._set_protobuf_value(property_filter.value, value)
694
695    if not composite_filter.filters:
696        pb._pb.ClearField("filter")
697
698    for prop in query.order:
699        property_order = query_pb2.PropertyOrder()
700
701        if prop.startswith("-"):
702            property_order.property.name = prop[1:]
703            property_order.direction = property_order.Direction.DESCENDING
704        else:
705            property_order.property.name = prop
706            property_order.direction = property_order.Direction.ASCENDING
707
708        pb.order.append(property_order)
709
710    for distinct_on_name in query.distinct_on:
711        ref = query_pb2.PropertyReference()
712        ref.name = distinct_on_name
713        pb.distinct_on.append(ref)
714
715    return pb
716
717
718# pylint: disable=unused-argument
719def _item_to_entity(iterator, entity_pb):
720    """Convert a raw protobuf entity to the native object.
721
722    :type iterator: :class:`~google.api_core.page_iterator.Iterator`
723    :param iterator: The iterator that is currently in use.
724
725    :type entity_pb:
726        :class:`.entity_pb2.Entity`
727    :param entity_pb: An entity protobuf to convert to a native entity.
728
729    :rtype: :class:`~google.cloud.datastore.entity.Entity`
730    :returns: The next entity in the page.
731    """
732    return helpers.entity_from_protobuf(entity_pb)
733
734
735# pylint: enable=unused-argument
736