1# Copyright 2014 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Create / interact with Google Cloud Datastore queries.""" 16 17import base64 18 19from google.api_core import page_iterator 20from google.cloud._helpers import _ensure_tuple_or_list 21 22from google.cloud.datastore_v1.types import entity as entity_pb2 23from google.cloud.datastore_v1.types import query as query_pb2 24from google.cloud.datastore import helpers 25from google.cloud.datastore.key import Key 26 27 28_NOT_FINISHED = query_pb2.QueryResultBatch.MoreResultsType.NOT_FINISHED 29_NO_MORE_RESULTS = query_pb2.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS 30 31_FINISHED = ( 32 _NO_MORE_RESULTS, 33 query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_LIMIT, 34 query_pb2.QueryResultBatch.MoreResultsType.MORE_RESULTS_AFTER_CURSOR, 35) 36 37 38class Query(object): 39 """A Query against the Cloud Datastore. 40 41 This class serves as an abstraction for creating a query over data 42 stored in the Cloud Datastore. 43 44 :type client: :class:`google.cloud.datastore.client.Client` 45 :param client: The client used to connect to Datastore. 46 47 :type kind: str 48 :param kind: The kind to query. 49 50 :type project: str 51 :param project: 52 (Optional) The project associated with the query. If not passed, uses 53 the client's value. 54 55 :type namespace: str 56 :param namespace: 57 (Optional) The namespace to which to restrict results. If not passed, 58 uses the client's value. 59 60 :type ancestor: :class:`~google.cloud.datastore.key.Key` 61 :param ancestor: 62 (Optional) key of the ancestor to which this query's results are 63 restricted. 64 65 :type filters: tuple[str, str, str] 66 :param filters: Property filters applied by this query. The sequence 67 is ``(property_name, operator, value)``. 68 69 :type projection: sequence of string 70 :param projection: fields returned as part of query results. 71 72 :type order: sequence of string 73 :param order: field names used to order query results. Prepend ``-`` 74 to a field name to sort it in descending order. 75 76 :type distinct_on: sequence of string 77 :param distinct_on: field names used to group query results. 78 79 :raises: ValueError if ``project`` is not passed and no implicit 80 default is set. 81 """ 82 83 OPERATORS = { 84 "<=": query_pb2.PropertyFilter.Operator.LESS_THAN_OR_EQUAL, 85 ">=": query_pb2.PropertyFilter.Operator.GREATER_THAN_OR_EQUAL, 86 "<": query_pb2.PropertyFilter.Operator.LESS_THAN, 87 ">": query_pb2.PropertyFilter.Operator.GREATER_THAN, 88 "=": query_pb2.PropertyFilter.Operator.EQUAL, 89 } 90 """Mapping of operator strings and their protobuf equivalents.""" 91 92 def __init__( 93 self, 94 client, 95 kind=None, 96 project=None, 97 namespace=None, 98 ancestor=None, 99 filters=(), 100 projection=(), 101 order=(), 102 distinct_on=(), 103 ): 104 105 self._client = client 106 self._kind = kind 107 self._project = project or client.project 108 self._namespace = namespace or client.namespace 109 self._ancestor = ancestor 110 self._filters = [] 111 # Verify filters passed in. 112 for property_name, operator, value in filters: 113 self.add_filter(property_name, operator, value) 114 self._projection = _ensure_tuple_or_list("projection", projection) 115 self._order = _ensure_tuple_or_list("order", order) 116 self._distinct_on = _ensure_tuple_or_list("distinct_on", distinct_on) 117 118 @property 119 def project(self): 120 """Get the project for this Query. 121 122 :rtype: str 123 :returns: The project for the query. 124 """ 125 return self._project or self._client.project 126 127 @property 128 def namespace(self): 129 """This query's namespace 130 131 :rtype: str or None 132 :returns: the namespace assigned to this query 133 """ 134 return self._namespace or self._client.namespace 135 136 @namespace.setter 137 def namespace(self, value): 138 """Update the query's namespace. 139 140 :type value: str 141 """ 142 if not isinstance(value, str): 143 raise ValueError("Namespace must be a string") 144 self._namespace = value 145 146 @property 147 def kind(self): 148 """Get the Kind of the Query. 149 150 :rtype: str 151 :returns: The kind for the query. 152 """ 153 return self._kind 154 155 @kind.setter 156 def kind(self, value): 157 """Update the Kind of the Query. 158 159 :type value: str 160 :param value: updated kind for the query. 161 162 .. note:: 163 164 The protobuf specification allows for ``kind`` to be repeated, 165 but the current implementation returns an error if more than 166 one value is passed. If the back-end changes in the future to 167 allow multiple values, this method will be updated to allow passing 168 either a string or a sequence of strings. 169 """ 170 if not isinstance(value, str): 171 raise TypeError("Kind must be a string") 172 self._kind = value 173 174 @property 175 def ancestor(self): 176 """The ancestor key for the query. 177 178 :rtype: :class:`~google.cloud.datastore.key.Key` or None 179 :returns: The ancestor for the query. 180 """ 181 return self._ancestor 182 183 @ancestor.setter 184 def ancestor(self, value): 185 """Set the ancestor for the query 186 187 :type value: :class:`~google.cloud.datastore.key.Key` 188 :param value: the new ancestor key 189 """ 190 if not isinstance(value, Key): 191 raise TypeError("Ancestor must be a Key") 192 self._ancestor = value 193 194 @ancestor.deleter 195 def ancestor(self): 196 """Remove the ancestor for the query.""" 197 self._ancestor = None 198 199 @property 200 def filters(self): 201 """Filters set on the query. 202 203 :rtype: tuple[str, str, str] 204 :returns: The filters set on the query. The sequence is 205 ``(property_name, operator, value)``. 206 """ 207 return self._filters[:] 208 209 def add_filter(self, property_name, operator, value): 210 """Filter the query based on a property name, operator and a value. 211 212 Expressions take the form of:: 213 214 .add_filter('<property>', '<operator>', <value>) 215 216 where property is a property stored on the entity in the datastore 217 and operator is one of ``OPERATORS`` 218 (ie, ``=``, ``<``, ``<=``, ``>``, ``>=``): 219 220 .. testsetup:: query-filter 221 222 import uuid 223 224 from google.cloud import datastore 225 226 client = datastore.Client() 227 228 .. doctest:: query-filter 229 230 >>> query = client.query(kind='Person') 231 >>> query = query.add_filter('name', '=', 'James') 232 >>> query = query.add_filter('age', '>', 50) 233 234 :type property_name: str 235 :param property_name: A property name. 236 237 :type operator: str 238 :param operator: One of ``=``, ``<``, ``<=``, ``>``, ``>=``. 239 240 :type value: :class:`int`, :class:`str`, :class:`bool`, 241 :class:`float`, :class:`NoneType`, 242 :class:`datetime.datetime`, 243 :class:`google.cloud.datastore.key.Key` 244 :param value: The value to filter on. 245 246 :rtype: :class:`~google.cloud.datastore.query.Query` 247 :returns: A query object. 248 249 :raises: :class:`ValueError` if ``operation`` is not one of the 250 specified values, or if a filter names ``'__key__'`` but 251 passes an invalid value (a key is required). 252 """ 253 if self.OPERATORS.get(operator) is None: 254 error_message = 'Invalid expression: "%s"' % (operator,) 255 choices_message = "Please use one of: =, <, <=, >, >=." 256 raise ValueError(error_message, choices_message) 257 258 if property_name == "__key__" and not isinstance(value, Key): 259 raise ValueError('Invalid key: "%s"' % value) 260 261 self._filters.append((property_name, operator, value)) 262 return self 263 264 @property 265 def projection(self): 266 """Fields names returned by the query. 267 268 :rtype: sequence of string 269 :returns: Names of fields in query results. 270 """ 271 return self._projection[:] 272 273 @projection.setter 274 def projection(self, projection): 275 """Set the fields returned the query. 276 277 :type projection: str or sequence of strings 278 :param projection: Each value is a string giving the name of a 279 property to be included in the projection query. 280 """ 281 if isinstance(projection, str): 282 projection = [projection] 283 self._projection[:] = projection 284 285 def keys_only(self): 286 """Set the projection to include only keys.""" 287 self._projection[:] = ["__key__"] 288 289 def key_filter(self, key, operator="="): 290 """Filter on a key. 291 292 :type key: :class:`google.cloud.datastore.key.Key` 293 :param key: The key to filter on. 294 295 :type operator: str 296 :param operator: (Optional) One of ``=``, ``<``, ``<=``, ``>``, ``>=``. 297 Defaults to ``=``. 298 """ 299 self.add_filter("__key__", operator, key) 300 301 @property 302 def order(self): 303 """Names of fields used to sort query results. 304 305 :rtype: sequence of string 306 :returns: The order(s) set on the query. 307 """ 308 return self._order[:] 309 310 @order.setter 311 def order(self, value): 312 """Set the fields used to sort query results. 313 314 Sort fields will be applied in the order specified. 315 316 :type value: str or sequence of strings 317 :param value: Each value is a string giving the name of the 318 property on which to sort, optionally preceded by a 319 hyphen (-) to specify descending order. 320 Omitting the hyphen implies ascending order. 321 """ 322 if isinstance(value, str): 323 value = [value] 324 self._order[:] = value 325 326 @property 327 def distinct_on(self): 328 """Names of fields used to group query results. 329 330 :rtype: sequence of string 331 :returns: The "distinct on" fields set on the query. 332 """ 333 return self._distinct_on[:] 334 335 @distinct_on.setter 336 def distinct_on(self, value): 337 """Set fields used to group query results. 338 339 :type value: str or sequence of strings 340 :param value: Each value is a string giving the name of a 341 property to use to group results together. 342 """ 343 if isinstance(value, str): 344 value = [value] 345 self._distinct_on[:] = value 346 347 def fetch( 348 self, 349 limit=None, 350 offset=0, 351 start_cursor=None, 352 end_cursor=None, 353 client=None, 354 eventual=False, 355 retry=None, 356 timeout=None, 357 ): 358 """Execute the Query; return an iterator for the matching entities. 359 360 For example: 361 362 .. testsetup:: query-fetch 363 364 import uuid 365 366 from google.cloud import datastore 367 368 unique = str(uuid.uuid4())[0:8] 369 client = datastore.Client(namespace='ns{}'.format(unique)) 370 371 372 .. doctest:: query-fetch 373 374 >>> andy = datastore.Entity(client.key('Person', 1234)) 375 >>> andy['name'] = 'Andy' 376 >>> sally = datastore.Entity(client.key('Person', 2345)) 377 >>> sally['name'] = 'Sally' 378 >>> bobby = datastore.Entity(client.key('Person', 3456)) 379 >>> bobby['name'] = 'Bobby' 380 >>> client.put_multi([andy, sally, bobby]) 381 >>> query = client.query(kind='Person') 382 >>> result = list(query.add_filter('name', '=', 'Sally').fetch()) 383 >>> result 384 [<Entity('Person', 2345) {'name': 'Sally'}>] 385 386 .. testcleanup:: query-fetch 387 388 client.delete(andy.key) 389 client.delete(sally.key) 390 client.delete(bobby.key) 391 392 :type limit: int 393 :param limit: (Optional) limit passed through to the iterator. 394 395 :type offset: int 396 :param offset: (Optional) offset passed through to the iterator. 397 398 :type start_cursor: bytes 399 :param start_cursor: (Optional) cursor passed through to the iterator. 400 401 :type end_cursor: bytes 402 :param end_cursor: (Optional) cursor passed through to the iterator. 403 404 :type client: :class:`google.cloud.datastore.client.Client` 405 :param client: (Optional) client used to connect to datastore. 406 If not supplied, uses the query's value. 407 408 :type eventual: bool 409 :param eventual: (Optional) Defaults to strongly consistent (False). 410 Setting True will use eventual consistency, 411 but cannot be used inside a transaction or 412 will raise ValueError. 413 414 :type retry: :class:`google.api_core.retry.Retry` 415 :param retry: 416 A retry object used to retry requests. If ``None`` is specified, 417 requests will be retried using a default configuration. 418 419 :type timeout: float 420 :param timeout: 421 Time, in seconds, to wait for the request to complete. 422 Note that if ``retry`` is specified, the timeout applies 423 to each individual attempt. 424 425 :rtype: :class:`Iterator` 426 :returns: The iterator for the query. 427 """ 428 if client is None: 429 client = self._client 430 431 return Iterator( 432 self, 433 client, 434 limit=limit, 435 offset=offset, 436 start_cursor=start_cursor, 437 end_cursor=end_cursor, 438 eventual=eventual, 439 retry=retry, 440 timeout=timeout, 441 ) 442 443 444class Iterator(page_iterator.Iterator): 445 """Represent the state of a given execution of a Query. 446 447 :type query: :class:`~google.cloud.datastore.query.Query` 448 :param query: Query object holding permanent configuration (i.e. 449 things that don't change on with each page in 450 a results set). 451 452 :type client: :class:`~google.cloud.datastore.client.Client` 453 :param client: The client used to make a request. 454 455 :type limit: int 456 :param limit: (Optional) Limit the number of results returned. 457 458 :type offset: int 459 :param offset: (Optional) Offset used to begin a query. 460 461 :type start_cursor: bytes 462 :param start_cursor: (Optional) Cursor to begin paging through 463 query results. 464 465 :type end_cursor: bytes 466 :param end_cursor: (Optional) Cursor to end paging through 467 query results. 468 469 :type eventual: bool 470 :param eventual: (Optional) Defaults to strongly consistent (False). 471 Setting True will use eventual consistency, 472 but cannot be used inside a transaction or 473 will raise ValueError. 474 475 :type retry: :class:`google.api_core.retry.Retry` 476 :param retry: 477 A retry object used to retry requests. If ``None`` is specified, 478 requests will be retried using a default configuration. 479 480 :type timeout: float 481 :param timeout: 482 Time, in seconds, to wait for the request to complete. 483 Note that if ``retry`` is specified, the timeout applies 484 to each individual attempt. 485 """ 486 487 next_page_token = None 488 489 def __init__( 490 self, 491 query, 492 client, 493 limit=None, 494 offset=None, 495 start_cursor=None, 496 end_cursor=None, 497 eventual=False, 498 retry=None, 499 timeout=None, 500 ): 501 super(Iterator, self).__init__( 502 client=client, 503 item_to_value=_item_to_entity, 504 page_token=start_cursor, 505 max_results=limit, 506 ) 507 self._query = query 508 self._offset = offset 509 self._end_cursor = end_cursor 510 self._eventual = eventual 511 self._retry = retry 512 self._timeout = timeout 513 # The attributes below will change over the life of the iterator. 514 self._more_results = True 515 self._skipped_results = 0 516 517 def _build_protobuf(self): 518 """Build a query protobuf. 519 520 Relies on the current state of the iterator. 521 522 :rtype: 523 :class:`.query_pb2.Query` 524 :returns: The query protobuf object for the current 525 state of the iterator. 526 """ 527 pb = _pb_from_query(self._query) 528 529 start_cursor = self.next_page_token 530 if start_cursor is not None: 531 pb.start_cursor = base64.urlsafe_b64decode(start_cursor) 532 533 end_cursor = self._end_cursor 534 if end_cursor is not None: 535 pb.end_cursor = base64.urlsafe_b64decode(end_cursor) 536 537 if self.max_results is not None: 538 pb.limit = self.max_results - self.num_results 539 540 if start_cursor is None and self._offset is not None: 541 # NOTE: We don't need to add an offset to the request protobuf 542 # if we are using an existing cursor, because the offset 543 # is only relative to the start of the result set, not 544 # relative to each page (this method is called per-page) 545 pb.offset = self._offset 546 547 return pb 548 549 def _process_query_results(self, response_pb): 550 """Process the response from a datastore query. 551 552 :type response_pb: :class:`.datastore_pb2.RunQueryResponse` 553 :param response_pb: The protobuf response from a ``runQuery`` request. 554 555 :rtype: iterable 556 :returns: The next page of entity results. 557 :raises ValueError: If ``more_results`` is an unexpected value. 558 """ 559 self._skipped_results = response_pb.batch.skipped_results 560 if response_pb.batch.more_results == _NO_MORE_RESULTS: 561 self.next_page_token = None 562 else: 563 self.next_page_token = base64.urlsafe_b64encode( 564 response_pb.batch.end_cursor 565 ) 566 self._end_cursor = None 567 568 if response_pb.batch.more_results == _NOT_FINISHED: 569 self._more_results = True 570 elif response_pb.batch.more_results in _FINISHED: 571 self._more_results = False 572 else: 573 raise ValueError("Unexpected value returned for `more_results`.") 574 575 return [result.entity for result in response_pb.batch.entity_results] 576 577 def _next_page(self): 578 """Get the next page in the iterator. 579 580 :rtype: :class:`~google.cloud.iterator.Page` 581 :returns: The next page in the iterator (or :data:`None` if 582 there are no pages left). 583 """ 584 if not self._more_results: 585 return None 586 587 query_pb = self._build_protobuf() 588 transaction = self.client.current_transaction 589 if transaction is None: 590 transaction_id = None 591 else: 592 transaction_id = transaction.id 593 read_options = helpers.get_read_options(self._eventual, transaction_id) 594 595 partition_id = entity_pb2.PartitionId( 596 project_id=self._query.project, namespace_id=self._query.namespace 597 ) 598 599 kwargs = {} 600 601 if self._retry is not None: 602 kwargs["retry"] = self._retry 603 604 if self._timeout is not None: 605 kwargs["timeout"] = self._timeout 606 607 response_pb = self.client._datastore_api.run_query( 608 request={ 609 "project_id": self._query.project, 610 "partition_id": partition_id, 611 "read_options": read_options, 612 "query": query_pb, 613 }, 614 **kwargs, 615 ) 616 617 while ( 618 response_pb.batch.more_results == _NOT_FINISHED 619 and response_pb.batch.skipped_results < query_pb.offset 620 ): 621 # We haven't finished processing. A likely reason is we haven't 622 # skipped all of the results yet. Don't return any results. 623 # Instead, rerun query, adjusting offsets. Datastore doesn't process 624 # more than 1000 skipped results in a query. 625 old_query_pb = query_pb 626 query_pb = query_pb2.Query() 627 query_pb._pb.CopyFrom(old_query_pb._pb) # copy for testability 628 query_pb.start_cursor = response_pb.batch.skipped_cursor 629 query_pb.offset -= response_pb.batch.skipped_results 630 631 response_pb = self.client._datastore_api.run_query( 632 request={ 633 "project_id": self._query.project, 634 "partition_id": partition_id, 635 "read_options": read_options, 636 "query": query_pb, 637 }, 638 **kwargs, 639 ) 640 641 entity_pbs = self._process_query_results(response_pb) 642 return page_iterator.Page(self, entity_pbs, self.item_to_value) 643 644 645def _pb_from_query(query): 646 """Convert a Query instance to the corresponding protobuf. 647 648 :type query: :class:`Query` 649 :param query: The source query. 650 651 :rtype: :class:`.query_pb2.Query` 652 :returns: A protobuf that can be sent to the protobuf API. N.b. that 653 it does not contain "in-flight" fields for ongoing query 654 executions (cursors, offset, limit). 655 """ 656 pb = query_pb2.Query() 657 658 for projection_name in query.projection: 659 projection = query_pb2.Projection() 660 projection.property.name = projection_name 661 pb.projection.append(projection) 662 663 if query.kind: 664 kind = query_pb2.KindExpression() 665 kind.name = query.kind 666 pb.kind.append(kind) 667 668 composite_filter = pb.filter.composite_filter 669 composite_filter.op = query_pb2.CompositeFilter.Operator.AND 670 671 if query.ancestor: 672 ancestor_pb = query.ancestor.to_protobuf() 673 674 # Filter on __key__ HAS_ANCESTOR == ancestor. 675 ancestor_filter = composite_filter.filters._pb.add().property_filter 676 ancestor_filter.property.name = "__key__" 677 ancestor_filter.op = query_pb2.PropertyFilter.Operator.HAS_ANCESTOR 678 ancestor_filter.value.key_value.CopyFrom(ancestor_pb._pb) 679 680 for property_name, operator, value in query.filters: 681 pb_op_enum = query.OPERATORS.get(operator) 682 683 # Add the specific filter 684 property_filter = composite_filter.filters._pb.add().property_filter 685 property_filter.property.name = property_name 686 property_filter.op = pb_op_enum 687 688 # Set the value to filter on based on the type. 689 if property_name == "__key__": 690 key_pb = value.to_protobuf() 691 property_filter.value.key_value.CopyFrom(key_pb._pb) 692 else: 693 helpers._set_protobuf_value(property_filter.value, value) 694 695 if not composite_filter.filters: 696 pb._pb.ClearField("filter") 697 698 for prop in query.order: 699 property_order = query_pb2.PropertyOrder() 700 701 if prop.startswith("-"): 702 property_order.property.name = prop[1:] 703 property_order.direction = property_order.Direction.DESCENDING 704 else: 705 property_order.property.name = prop 706 property_order.direction = property_order.Direction.ASCENDING 707 708 pb.order.append(property_order) 709 710 for distinct_on_name in query.distinct_on: 711 ref = query_pb2.PropertyReference() 712 ref.name = distinct_on_name 713 pb.distinct_on.append(ref) 714 715 return pb 716 717 718# pylint: disable=unused-argument 719def _item_to_entity(iterator, entity_pb): 720 """Convert a raw protobuf entity to the native object. 721 722 :type iterator: :class:`~google.api_core.page_iterator.Iterator` 723 :param iterator: The iterator that is currently in use. 724 725 :type entity_pb: 726 :class:`.entity_pb2.Entity` 727 :param entity_pb: An entity protobuf to convert to a native entity. 728 729 :rtype: :class:`~google.cloud.datastore.entity.Entity` 730 :returns: The next entity in the page. 731 """ 732 return helpers.entity_from_protobuf(entity_pb) 733 734 735# pylint: enable=unused-argument 736