1#The MIT License (MIT) 2#Copyright (c) 2014 Microsoft Corporation 3 4#Permission is hereby granted, free of charge, to any person obtaining a copy 5#of this software and associated documentation files (the "Software"), to deal 6#in the Software without restriction, including without limitation the rights 7#to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8#copies of the Software, and to permit persons to whom the Software is 9#furnished to do so, subject to the following conditions: 10 11#The above copyright notice and this permission notice shall be included in all 12#copies or substantial portions of the Software. 13 14#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15#IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18#LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 20#SOFTWARE. 21 22"""Document client class for the Azure Cosmos database service. 23""" 24 25import requests 26from requests.adapters import HTTPAdapter 27 28import six 29import azure.cosmos.base as base 30import azure.cosmos.documents as documents 31import azure.cosmos.constants as constants 32import azure.cosmos.http_constants as http_constants 33import azure.cosmos.query_iterable as query_iterable 34import azure.cosmos.runtime_constants as runtime_constants 35import azure.cosmos.request_object as request_object 36import azure.cosmos.synchronized_request as synchronized_request 37import azure.cosmos.global_endpoint_manager as global_endpoint_manager 38import azure.cosmos.routing.routing_map_provider as routing_map_provider 39import azure.cosmos.session as session 40import azure.cosmos.utils as utils 41import os 42 43class CosmosClient(object): 44 """Represents a document client. 45 46 Provides a client-side logical representation of the Azure Cosmos 47 service. This client is used to configure and execute requests against the 48 service. 49 50 The service client encapsulates the endpoint and credentials used to access 51 the Azure Cosmos service. 52 """ 53 54 class _QueryCompatibilityMode: 55 Default = 0 56 Query = 1 57 SqlQuery = 2 58 59 # default number precisions 60 _DefaultNumberHashPrecision = 3 61 _DefaultNumberRangePrecision = -1 62 63 # default string precision 64 _DefaultStringHashPrecision = 3 65 _DefaultStringRangePrecision = -1 66 67 def __init__(self, 68 url_connection, 69 auth, 70 connection_policy=None, 71 consistency_level=documents.ConsistencyLevel.Session): 72 """ 73 :param str url_connection: 74 The URL for connecting to the DB server. 75 :param dict auth: 76 Contains 'masterKey' or 'resourceTokens', where 77 auth['masterKey'] is the default authorization key to use to 78 create the client, and auth['resourceTokens'] is the alternative 79 authorization key. 80 :param documents.ConnectionPolicy connection_policy: 81 The connection policy for the client. 82 :param documents.ConsistencyLevel consistency_level: 83 The default consistency policy for client operations. 84 85 if url_connection and auth are not provided, 86 COSMOS_ENDPOINT and COSMOS_KEY environment variables will be used. 87 """ 88 89 self.url_connection = url_connection or os.environ.get('COSMOS_ENDPOINT') 90 91 self.master_key = None 92 self.resource_tokens = None 93 if auth is not None: 94 self.master_key = auth.get('masterKey') 95 self.resource_tokens = auth.get('resourceTokens') 96 97 if auth.get('permissionFeed'): 98 self.resource_tokens = {} 99 for permission_feed in auth['permissionFeed']: 100 resource_parts = permission_feed['resource'].split('/') 101 id = resource_parts[-1] 102 self.resource_tokens[id] = permission_feed['_token'] 103 else: 104 self.master_key = os.environ.get('COSMOS_KEY') 105 106 self.connection_policy = (connection_policy or 107 documents.ConnectionPolicy()) 108 109 self.partition_resolvers = {} 110 111 self.partition_key_definition_cache = {} 112 113 self.default_headers = { 114 http_constants.HttpHeaders.CacheControl: 'no-cache', 115 http_constants.HttpHeaders.Version: 116 http_constants.Versions.CurrentVersion, 117 http_constants.HttpHeaders.UserAgent: 118 utils._get_user_agent(), 119 # For single partition query with aggregate functions we would try to accumulate the results on the SDK. 120 # We need to set continuation as not expected. 121 http_constants.HttpHeaders.IsContinuationExpected: False 122 } 123 124 if consistency_level != None: 125 self.default_headers[ 126 http_constants.HttpHeaders.ConsistencyLevel] = consistency_level 127 128 # Keeps the latest response headers from server. 129 self.last_response_headers = None 130 131 if consistency_level == documents.ConsistencyLevel.Session: 132 '''create a session - this is maintained only if the default consistency level 133 on the client is set to session, or if the user explicitly sets it as a property 134 via setter''' 135 self.session = session.Session(self.url_connection) 136 else: 137 self.session = None 138 139 self._useMultipleWriteLocations = False 140 self._global_endpoint_manager = global_endpoint_manager._GlobalEndpointManager(self) 141 142 # creating a requests session used for connection pooling and re-used by all requests 143 self._requests_session = requests.Session() 144 145 if self.connection_policy.ConnectionRetryConfiguration is not None: 146 adapter = HTTPAdapter(max_retries=self.connection_policy.ConnectionRetryConfiguration) 147 self._requests_session.mount('http://', adapter) 148 self._requests_session.mount('https://', adapter) 149 150 if self.connection_policy.ProxyConfiguration and self.connection_policy.ProxyConfiguration.Host: 151 host = connection_policy.ProxyConfiguration.Host 152 url = six.moves.urllib.parse.urlparse(host) 153 proxy = host if url.port else host + ":" + str(connection_policy.ProxyConfiguration.Port) 154 proxyDict = {url.scheme : proxy} 155 self._requests_session.proxies.update(proxyDict) 156 157 # Query compatibility mode. 158 # Allows to specify compatibility mode used by client when making query requests. Should be removed when 159 # application/sql is no longer supported. 160 self._query_compatibility_mode = CosmosClient._QueryCompatibilityMode.Default 161 162 # Routing map provider 163 self._routing_map_provider = routing_map_provider._SmartRoutingMapProvider(self) 164 165 database_account = self._global_endpoint_manager._GetDatabaseAccount() 166 self._global_endpoint_manager.force_refresh(database_account) 167 168 @property 169 def Session(self): 170 """ Gets the session object from the client """ 171 return self.session 172 173 @Session.setter 174 def Session(self, session): 175 """ Sets a session object on the document client 176 This will override the existing session 177 """ 178 self.session = session 179 180 @property 181 def WriteEndpoint(self): 182 """Gets the curent write endpoint for a geo-replicated database account. 183 """ 184 return self._global_endpoint_manager.get_write_endpoint() 185 186 @property 187 def ReadEndpoint(self): 188 """Gets the curent read endpoint for a geo-replicated database account. 189 """ 190 return self._global_endpoint_manager.get_read_endpoint() 191 192 def RegisterPartitionResolver(self, database_link, partition_resolver): 193 """Registers the partition resolver associated with the database link 194 195 :param str database_link: 196 Database Self Link or ID based link. 197 :param object partition_resolver: 198 An instance of PartitionResolver. 199 200 """ 201 if not database_link: 202 raise ValueError("database_link is None or empty.") 203 204 if partition_resolver is None: 205 raise ValueError("partition_resolver is None.") 206 207 self.partition_resolvers = {base.TrimBeginningAndEndingSlashes(database_link): partition_resolver} 208 209 210 def GetPartitionResolver(self, database_link): 211 """Gets the partition resolver associated with the database link 212 213 :param str database_link: 214 Database self link or ID based link. 215 216 :return: 217 An instance of PartitionResolver. 218 :rtype: object 219 220 """ 221 if not database_link: 222 raise ValueError("database_link is None or empty.") 223 224 return self.partition_resolvers.get(base.TrimBeginningAndEndingSlashes(database_link)) 225 226 227 def CreateDatabase(self, database, options=None): 228 """Creates a database. 229 230 :param dict database: 231 The Azure Cosmos database to create. 232 :param dict options: 233 The request options for the request. 234 235 :return: 236 The Database that was created. 237 :rtype: dict 238 239 """ 240 if options is None: 241 options = {} 242 243 CosmosClient.__ValidateResource(database) 244 path = '/dbs' 245 return self.Create(database, path, 'dbs', None, None, options) 246 247 def ReadDatabase(self, database_link, options=None): 248 """Reads a database. 249 250 :param str database_link: 251 The link to the database. 252 :param dict options: 253 The request options for the request. 254 255 :return: 256 The Database that was read. 257 :rtype: dict 258 259 """ 260 if options is None: 261 options = {} 262 263 path = base.GetPathFromLink(database_link) 264 database_id = base.GetResourceIdOrFullNameFromLink(database_link) 265 return self.Read(path, 'dbs', database_id, None, options) 266 267 def ReadDatabases(self, options=None): 268 """Reads all databases. 269 270 :param dict options: 271 The request options for the request. 272 273 :return: 274 Query Iterable of Databases. 275 :rtype: 276 query_iterable.QueryIterable 277 278 """ 279 if options is None: 280 options = {} 281 282 return self.QueryDatabases(None, options) 283 284 def QueryDatabases(self, query, options=None): 285 """Queries databases. 286 287 :param (str or dict) query: 288 :param dict options: 289 The request options for the request. 290 291 :return: Query Iterable of Databases. 292 :rtype: 293 query_iterable.QueryIterable 294 295 """ 296 if options is None: 297 options = {} 298 299 def fetch_fn(options): 300 return self.__QueryFeed('/dbs', 301 'dbs', 302 '', 303 lambda r: r['Databases'], 304 lambda _, b: b, 305 query, 306 options), self.last_response_headers 307 return query_iterable.QueryIterable(self, query, options, fetch_fn) 308 309 def ReadContainers(self, database_link, options=None): 310 """Reads all collections in a database. 311 312 :param str database_link: 313 The link to the database. 314 :param dict options: 315 The request options for the request. 316 317 :return: Query Iterable of Collections. 318 :rtype: 319 query_iterable.QueryIterable 320 321 """ 322 if options is None: 323 options = {} 324 325 return self.QueryContainers(database_link, None, options) 326 327 def QueryContainers(self, database_link, query, options=None): 328 """Queries collections in a database. 329 330 :param str database_link: 331 The link to the database. 332 :param (str or dict) query: 333 :param dict options: 334 The request options for the request. 335 336 :return: Query Iterable of Collections. 337 :rtype: 338 query_iterable.QueryIterable 339 340 """ 341 if options is None: 342 options = {} 343 344 path = base.GetPathFromLink(database_link, 'colls') 345 database_id = base.GetResourceIdOrFullNameFromLink(database_link) 346 def fetch_fn(options): 347 return self.__QueryFeed(path, 348 'colls', 349 database_id, 350 lambda r: r['DocumentCollections'], 351 lambda _, body: body, 352 query, 353 options), self.last_response_headers 354 return query_iterable.QueryIterable(self, query, options, fetch_fn) 355 356 def CreateContainer(self, database_link, collection, options=None): 357 """Creates a collection in a database. 358 359 :param str database_link: 360 The link to the database. 361 :param dict collection: 362 The Azure Cosmos collection to create. 363 :param dict options: 364 The request options for the request. 365 366 :return: The Collection that was created. 367 :rtype: dict 368 369 """ 370 if options is None: 371 options = {} 372 373 CosmosClient.__ValidateResource(collection) 374 path = base.GetPathFromLink(database_link, 'colls') 375 database_id = base.GetResourceIdOrFullNameFromLink(database_link) 376 return self.Create(collection, 377 path, 378 'colls', 379 database_id, 380 None, 381 options) 382 383 def ReplaceContainer(self, collection_link, collection, options=None): 384 """Replaces a collection and return it. 385 386 :param str collection_link: 387 The link to the collection entity. 388 :param dict collection: 389 The collection to be used. 390 :param dict options: 391 The request options for the request. 392 393 :return: 394 The new Collection. 395 :rtype: 396 dict 397 398 """ 399 if options is None: 400 options = {} 401 402 CosmosClient.__ValidateResource(collection) 403 path = base.GetPathFromLink(collection_link) 404 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 405 return self.Replace(collection, 406 path, 407 'colls', 408 collection_id, 409 None, 410 options) 411 412 def ReadContainer(self, collection_link, options=None): 413 """Reads a collection. 414 415 :param str collection_link: 416 The link to the document collection. 417 :param dict options: 418 The request options for the request. 419 420 :return: 421 The read Collection. 422 :rtype: 423 dict 424 425 """ 426 if options is None: 427 options = {} 428 429 path = base.GetPathFromLink(collection_link) 430 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 431 return self.Read(path, 432 'colls', 433 collection_id, 434 None, 435 options) 436 437 def CreateUser(self, database_link, user, options=None): 438 """Creates a user. 439 440 :param str database_link: 441 The link to the database. 442 :param dict user: 443 The Azure Cosmos user to create. 444 :param dict options: 445 The request options for the request. 446 447 :return: 448 The created User. 449 :rtype: 450 dict 451 452 """ 453 if options is None: 454 options = {} 455 456 database_id, path = self._GetDatabaseIdWithPathForUser(database_link, user) 457 return self.Create(user, 458 path, 459 'users', 460 database_id, 461 None, 462 options) 463 464 def UpsertUser(self, database_link, user, options=None): 465 """Upserts a user. 466 467 :param str database_link: 468 The link to the database. 469 :param dict user: 470 The Azure Cosmos user to upsert. 471 :param dict options: 472 The request options for the request. 473 474 :return: 475 The upserted User. 476 :rtype: dict 477 """ 478 if options is None: 479 options = {} 480 481 database_id, path = self._GetDatabaseIdWithPathForUser(database_link, user) 482 return self.Upsert(user, 483 path, 484 'users', 485 database_id, 486 None, 487 options) 488 489 def _GetDatabaseIdWithPathForUser(self, database_link, user): 490 CosmosClient.__ValidateResource(user) 491 path = base.GetPathFromLink(database_link, 'users') 492 database_id = base.GetResourceIdOrFullNameFromLink(database_link) 493 return database_id, path 494 495 496 def ReadUser(self, user_link, options=None): 497 """Reads a user. 498 499 :param str user_link: 500 The link to the user entity. 501 :param dict options: 502 The request options for the request. 503 504 :return: 505 The read User. 506 :rtype: 507 dict 508 509 """ 510 if options is None: 511 options = {} 512 513 path = base.GetPathFromLink(user_link) 514 user_id = base.GetResourceIdOrFullNameFromLink(user_link) 515 return self.Read(path, 'users', user_id, None, options) 516 517 def ReadUsers(self, database_link, options=None): 518 """Reads all users in a database. 519 520 :params str database_link: 521 The link to the database. 522 :params dict options: 523 The request options for the request. 524 :return: 525 Query iterable of Users. 526 :rtype: 527 query_iterable.QueryIterable 528 529 """ 530 if options is None: 531 options = {} 532 533 return self.QueryUsers(database_link, None, options) 534 535 def QueryUsers(self, database_link, query, options=None): 536 """Queries users in a database. 537 538 :param str database_link: 539 The link to the database. 540 :param (str or dict) query: 541 :param dict options: 542 The request options for the request. 543 544 :return: 545 Query Iterable of Users. 546 :rtype: 547 query_iterable.QueryIterable 548 549 """ 550 if options is None: 551 options = {} 552 553 path = base.GetPathFromLink(database_link, 'users') 554 database_id = base.GetResourceIdOrFullNameFromLink(database_link) 555 def fetch_fn(options): 556 return self.__QueryFeed(path, 557 'users', 558 database_id, 559 lambda r: r['Users'], 560 lambda _, b: b, 561 query, 562 options), self.last_response_headers 563 return query_iterable.QueryIterable(self, query, options, fetch_fn) 564 565 def DeleteDatabase(self, database_link, options=None): 566 """Deletes a database. 567 568 :param str database_link: 569 The link to the database. 570 :param dict options: 571 The request options for the request. 572 573 :return: 574 The deleted Database. 575 :rtype: 576 dict 577 578 """ 579 if options is None: 580 options = {} 581 582 path = base.GetPathFromLink(database_link) 583 database_id = base.GetResourceIdOrFullNameFromLink(database_link) 584 return self.DeleteResource(path, 585 'dbs', 586 database_id, 587 None, 588 options) 589 590 def CreatePermission(self, user_link, permission, options=None): 591 """Creates a permission for a user. 592 593 :param str user_link: 594 The link to the user entity. 595 :param dict permission: 596 The Azure Cosmos user permission to create. 597 :param dict options: 598 The request options for the request. 599 600 :return: 601 The created Permission. 602 :rtype: 603 dict 604 605 """ 606 if options is None: 607 options = {} 608 609 path, user_id = self._GetUserIdWithPathForPermission(permission, user_link) 610 return self.Create(permission, 611 path, 612 'permissions', 613 user_id, 614 None, 615 options) 616 617 def UpsertPermission(self, user_link, permission, options=None): 618 """Upserts a permission for a user. 619 620 :param str user_link: 621 The link to the user entity. 622 :param dict permission: 623 The Azure Cosmos user permission to upsert. 624 :param dict options: 625 The request options for the request. 626 627 :return: 628 The upserted permission. 629 :rtype: 630 dict 631 632 """ 633 if options is None: 634 options = {} 635 636 path, user_id = self._GetUserIdWithPathForPermission(permission, user_link) 637 return self.Upsert(permission, 638 path, 639 'permissions', 640 user_id, 641 None, 642 options) 643 644 def _GetUserIdWithPathForPermission(self, permission, user_link): 645 CosmosClient.__ValidateResource(permission) 646 path = base.GetPathFromLink(user_link, 'permissions') 647 user_id = base.GetResourceIdOrFullNameFromLink(user_link) 648 return path, user_id 649 650 651 def ReadPermission(self, permission_link, options=None): 652 """Reads a permission. 653 654 :param str permission_link: 655 The link to the permission. 656 :param dict options: 657 The request options for the request. 658 659 :return: 660 The read permission. 661 :rtype: 662 dict 663 664 """ 665 if options is None: 666 options = {} 667 668 path = base.GetPathFromLink(permission_link) 669 permission_id = base.GetResourceIdOrFullNameFromLink(permission_link) 670 return self.Read(path, 671 'permissions', 672 permission_id, 673 None, 674 options) 675 676 def ReadPermissions(self, user_link, options=None): 677 """Reads all permissions for a user. 678 679 :param str user_link: 680 The link to the user entity. 681 :param dict options: 682 The request options for the request. 683 684 :return: 685 Query Iterable of Permissions. 686 :rtype: 687 query_iterable.QueryIterable 688 689 """ 690 if options is None: 691 options = {} 692 693 return self.QueryPermissions(user_link, None, options) 694 695 def QueryPermissions(self, user_link, query, options=None): 696 """Queries permissions for a user. 697 698 :param str user_link: 699 The link to the user entity. 700 :param (str or dict) query: 701 :param dict options: 702 The request options for the request. 703 704 :return: 705 Query Iterable of Permissions. 706 :rtype: 707 query_iterable.QueryIterable 708 709 """ 710 if options is None: 711 options = {} 712 713 path = base.GetPathFromLink(user_link, 'permissions') 714 user_id = base.GetResourceIdOrFullNameFromLink(user_link) 715 def fetch_fn(options): 716 return self.__QueryFeed(path, 717 'permissions', 718 user_id, 719 lambda r: r['Permissions'], 720 lambda _, b: b, 721 query, 722 options), self.last_response_headers 723 return query_iterable.QueryIterable(self, query, options, fetch_fn) 724 725 def ReplaceUser(self, user_link, user, options=None): 726 """Replaces a user and return it. 727 728 :param str user_link: 729 The link to the user entity. 730 :param dict user: 731 :param dict options: 732 The request options for the request. 733 734 :return: 735 The new User. 736 :rtype: 737 dict 738 739 """ 740 if options is None: 741 options = {} 742 743 CosmosClient.__ValidateResource(user) 744 path = base.GetPathFromLink(user_link) 745 user_id = base.GetResourceIdOrFullNameFromLink(user_link) 746 return self.Replace(user, 747 path, 748 'users', 749 user_id, 750 None, 751 options) 752 753 def DeleteUser(self, user_link, options=None): 754 """Deletes a user. 755 756 :param str user_link: 757 The link to the user entity. 758 :param dict options: 759 The request options for the request. 760 761 :return: 762 The deleted user. 763 :rtype: 764 dict 765 766 """ 767 if options is None: 768 options = {} 769 770 path = base.GetPathFromLink(user_link) 771 user_id = base.GetResourceIdOrFullNameFromLink(user_link) 772 return self.DeleteResource(path, 773 'users', 774 user_id, 775 None, 776 options) 777 778 def ReplacePermission(self, permission_link, permission, options=None): 779 """Replaces a permission and return it. 780 781 :param str permission_link: 782 The link to the permission. 783 :param dict permission: 784 :param dict options: 785 The request options for the request. 786 787 :return: 788 The new Permission. 789 :rtype: 790 dict 791 792 """ 793 if options is None: 794 options = {} 795 796 CosmosClient.__ValidateResource(permission) 797 path = base.GetPathFromLink(permission_link) 798 permission_id = base.GetResourceIdOrFullNameFromLink(permission_link) 799 return self.Replace(permission, 800 path, 801 'permissions', 802 permission_id, 803 None, 804 options) 805 806 def DeletePermission(self, permission_link, options=None): 807 """Deletes a permission. 808 809 :param str permission_link: 810 The link to the permission. 811 :param dict options: 812 The request options for the request. 813 814 :return: 815 The deleted Permission. 816 :rtype: 817 dict 818 819 """ 820 if options is None: 821 options = {} 822 823 path = base.GetPathFromLink(permission_link) 824 permission_id = base.GetResourceIdOrFullNameFromLink(permission_link) 825 return self.DeleteResource(path, 826 'permissions', 827 permission_id, 828 None, 829 options) 830 831 def ReadItems(self, collection_link, feed_options=None): 832 """Reads all documents in a collection. 833 834 :param str collection_link: 835 The link to the document collection. 836 :param dict feed_options: 837 838 :return: 839 Query Iterable of Documents. 840 :rtype: 841 query_iterable.QueryIterable 842 843 """ 844 if feed_options is None: 845 feed_options = {} 846 847 return self.QueryItems(collection_link, None, feed_options) 848 849 def QueryItems(self, database_or_Container_link, query, options=None, partition_key=None): 850 """Queries documents in a collection. 851 852 :param str database_or_Container_link: 853 The link to the database when using partitioning, otherwise link to the document collection. 854 :param (str or dict) query: 855 :param dict options: 856 The request options for the request. 857 :param str partition_key: 858 Partition key for the query(default value None) 859 860 :return: 861 Query Iterable of Documents. 862 :rtype: 863 query_iterable.QueryIterable 864 865 """ 866 database_or_Container_link = base.TrimBeginningAndEndingSlashes(database_or_Container_link) 867 868 if options is None: 869 options = {} 870 871 if(base.IsDatabaseLink(database_or_Container_link)): 872 # Python doesn't have a good way of specifying an overloaded constructor, and this is how it's generally overloaded constructors are specified(by calling a @classmethod) and returning the 'self' instance 873 return query_iterable.QueryIterable.PartitioningQueryIterable(self, query, options, database_or_Container_link, partition_key) 874 else: 875 path = base.GetPathFromLink(database_or_Container_link, 'docs') 876 collection_id = base.GetResourceIdOrFullNameFromLink(database_or_Container_link) 877 def fetch_fn(options): 878 return self.__QueryFeed(path, 879 'docs', 880 collection_id, 881 lambda r: r['Documents'], 882 lambda _, b: b, 883 query, 884 options), self.last_response_headers 885 return query_iterable.QueryIterable(self, query, options, fetch_fn, database_or_Container_link) 886 887 def QueryItemsChangeFeed(self, collection_link, options=None): 888 """Queries documents change feed in a collection. 889 890 :param str collection_link: 891 The link to the document collection. 892 :param dict options: 893 The request options for the request. 894 options may also specify partition key range id. 895 896 :return: 897 Query Iterable of Documents. 898 :rtype: 899 query_iterable.QueryIterable 900 901 """ 902 903 partition_key_range_id = None 904 if options is not None and 'partitionKeyRangeId' in options: 905 partition_key_range_id = options['partitionKeyRangeId'] 906 907 return self._QueryChangeFeed(collection_link, "Documents" , options, partition_key_range_id) 908 909 def _QueryChangeFeed(self, collection_link, resource_type, options=None, partition_key_range_id=None): 910 """Queries change feed of a resource in a collection. 911 912 :param str collection_link: 913 The link to the document collection. 914 :param str resource_type: 915 The type of the resource. 916 :param dict options: 917 The request options for the request. 918 :param str partition_key_range_id: 919 Specifies partition key range id. 920 921 :return: 922 Query Iterable of Documents. 923 :rtype: 924 query_iterable.QueryIterable 925 926 """ 927 if options is None: 928 options = {} 929 options['changeFeed'] = True 930 931 resource_key_map = {'Documents' : 'docs'} 932 933 # For now, change feed only supports Documents and Partition Key Range resouce type 934 if resource_type not in resource_key_map: 935 raise NotImplementedError(resource_type + " change feed query is not supported.") 936 937 resource_key = resource_key_map[resource_type] 938 path = base.GetPathFromLink(collection_link, resource_key) 939 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 940 def fetch_fn(options): 941 return self.__QueryFeed(path, 942 resource_key, 943 collection_id, 944 lambda r: r[resource_type], 945 lambda _, b: b, 946 None, 947 options, 948 partition_key_range_id), self.last_response_headers 949 return query_iterable.QueryIterable(self, None, options, fetch_fn, collection_link) 950 951 def _ReadPartitionKeyRanges(self, collection_link, feed_options=None): 952 """Reads Partition Key Ranges. 953 954 :param str collection_link: 955 The link to the document collection. 956 :param dict feed_options: 957 958 :return: 959 Query Iterable of PartitionKeyRanges. 960 :rtype: 961 query_iterable.QueryIterable 962 963 """ 964 if feed_options is None: 965 feed_options = {} 966 967 return self._QueryPartitionKeyRanges(collection_link, None, feed_options) 968 969 def _QueryPartitionKeyRanges(self, collection_link, query, options=None): 970 """Queries Partition Key Ranges in a collection. 971 972 :param str collection_link: 973 The link to the document collection. 974 :param (str or dict) query: 975 :param dict options: 976 The request options for the request. 977 978 :return: 979 Query Iterable of PartitionKeyRanges. 980 :rtype: 981 query_iterable.QueryIterable 982 983 """ 984 if options is None: 985 options = {} 986 987 path = base.GetPathFromLink(collection_link, 'pkranges') 988 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 989 def fetch_fn(options): 990 return self.__QueryFeed(path, 991 'pkranges', 992 collection_id, 993 lambda r: r['PartitionKeyRanges'], 994 lambda _, b: b, 995 query, 996 options), self.last_response_headers 997 return query_iterable.QueryIterable(self, query, options, fetch_fn) 998 999 def CreateItem(self, database_or_Container_link, document, options=None): 1000 """Creates a document in a collection. 1001 1002 :param str database_or_Container_link: 1003 The link to the database when using partitioning, otherwise link to the document collection. 1004 :param dict document: 1005 The Azure Cosmos document to create. 1006 :param dict options: 1007 The request options for the request. 1008 :param bool options['disableAutomaticIdGeneration']: 1009 Disables the automatic id generation. If id is missing in the body and this 1010 option is true, an error will be returned. 1011 1012 :return: 1013 The created Document. 1014 :rtype: 1015 dict 1016 1017 """ 1018 # Python's default arguments are evaluated once when the function is defined, not each time the function is called (like it is in say, Ruby). 1019 # This means that if you use a mutable default argument and mutate it, you will and have mutated that object for all future calls to the function as well. 1020 # So, using a non-mutable deafult in this case(None) and assigning an empty dict(mutable) inside the method 1021 # For more details on this gotcha, please refer http://docs.python-guide.org/en/latest/writing/gotchas/ 1022 if options is None: 1023 options = {} 1024 1025 # We check the link to be document collection link since it can be database link in case of client side partitioning 1026 if(base.IsItemContainerLink(database_or_Container_link)): 1027 options = self._AddPartitionKey(database_or_Container_link, document, options) 1028 1029 collection_id, document, path = self._GetContainerIdWithPathForItem(database_or_Container_link, document, options) 1030 return self.Create(document, 1031 path, 1032 'docs', 1033 collection_id, 1034 None, 1035 options) 1036 1037 def UpsertItem(self, database_or_Container_link, document, options=None): 1038 """Upserts a document in a collection. 1039 1040 :param str database_or_Container_link: 1041 The link to the database when using partitioning, otherwise link to the document collection. 1042 :param dict document: 1043 The Azure Cosmos document to upsert. 1044 :param dict options: 1045 The request options for the request. 1046 :param bool options['disableAutomaticIdGeneration']: 1047 Disables the automatic id generation. If id is missing in the body and this 1048 option is true, an error will be returned. 1049 1050 :return: 1051 The upserted Document. 1052 :rtype: 1053 dict 1054 1055 """ 1056 # Python's default arguments are evaluated once when the function is defined, not each time the function is called (like it is in say, Ruby). 1057 # This means that if you use a mutable default argument and mutate it, you will and have mutated that object for all future calls to the function as well. 1058 # So, using a non-mutable deafult in this case(None) and assigning an empty dict(mutable) inside the method 1059 # For more details on this gotcha, please refer http://docs.python-guide.org/en/latest/writing/gotchas/ 1060 if options is None: 1061 options = {} 1062 1063 # We check the link to be document collection link since it can be database link in case of client side partitioning 1064 if(base.IsItemContainerLink(database_or_Container_link)): 1065 options = self._AddPartitionKey(database_or_Container_link, document, options) 1066 1067 collection_id, document, path = self._GetContainerIdWithPathForItem(database_or_Container_link, document, options) 1068 return self.Upsert(document, 1069 path, 1070 'docs', 1071 collection_id, 1072 None, 1073 options) 1074 1075 PartitionResolverErrorMessage = "Couldn't find any partition resolvers for the database link provided. Ensure that the link you used when registering the partition resolvers matches the link provided or you need to register both types of database link(self link as well as ID based link)." 1076 1077 # Gets the collection id and path for the document 1078 def _GetContainerIdWithPathForItem(self, database_or_Container_link, document, options): 1079 1080 if not database_or_Container_link: 1081 raise ValueError("database_or_Container_link is None or empty.") 1082 1083 if document is None: 1084 raise ValueError("document is None.") 1085 1086 CosmosClient.__ValidateResource(document) 1087 document = document.copy() 1088 if (not document.get('id') and 1089 not options.get('disableAutomaticIdGeneration')): 1090 document['id'] = base.GenerateGuidId() 1091 1092 collection_link = database_or_Container_link 1093 1094 if(base.IsDatabaseLink(database_or_Container_link)): 1095 partition_resolver = self.GetPartitionResolver(database_or_Container_link) 1096 1097 if(partition_resolver != None): 1098 collection_link = partition_resolver.ResolveForCreate(document) 1099 else: 1100 raise ValueError(CosmosClient.PartitionResolverErrorMessage) 1101 1102 path = base.GetPathFromLink(collection_link, 'docs') 1103 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1104 return collection_id, document, path 1105 1106 def ReadItem(self, document_link, options=None): 1107 """Reads a document. 1108 1109 :param str document_link: 1110 The link to the document. 1111 :param dict options: 1112 The request options for the request. 1113 1114 :return: 1115 The read Document. 1116 :rtype: 1117 dict 1118 1119 """ 1120 if options is None: 1121 options = {} 1122 1123 path = base.GetPathFromLink(document_link) 1124 document_id = base.GetResourceIdOrFullNameFromLink(document_link) 1125 return self.Read(path, 1126 'docs', 1127 document_id, 1128 None, 1129 options) 1130 1131 def ReadTriggers(self, collection_link, options=None): 1132 """Reads all triggers in a collection. 1133 1134 :param str collection_link: 1135 The link to the document collection. 1136 :param dict options: 1137 The request options for the request. 1138 1139 :return: 1140 Query Iterable of Triggers. 1141 :rtype: 1142 query_iterable.QueryIterable 1143 1144 """ 1145 if options is None: 1146 options = {} 1147 1148 return self.QueryTriggers(collection_link, None, options) 1149 1150 def QueryTriggers(self, collection_link, query, options=None): 1151 """Queries triggers in a collection. 1152 1153 :param str collection_link: 1154 The link to the document collection. 1155 :param (str or dict) query: 1156 :param dict options: 1157 The request options for the request. 1158 1159 :return: 1160 Query Iterable of Triggers. 1161 :rtype: 1162 query_iterable.QueryIterable 1163 1164 """ 1165 if options is None: 1166 options = {} 1167 1168 path = base.GetPathFromLink(collection_link, 'triggers') 1169 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1170 def fetch_fn(options): 1171 return self.__QueryFeed(path, 1172 'triggers', 1173 collection_id, 1174 lambda r: r['Triggers'], 1175 lambda _, b: b, 1176 query, 1177 options), self.last_response_headers 1178 return query_iterable.QueryIterable(self, query, options, fetch_fn) 1179 1180 def CreateTrigger(self, collection_link, trigger, options=None): 1181 """Creates a trigger in a collection. 1182 1183 :param str collection_link: 1184 The link to the document collection. 1185 :param dict trigger: 1186 :param dict options: 1187 The request options for the request. 1188 1189 :return: 1190 The created Trigger. 1191 :rtype: 1192 dict 1193 1194 """ 1195 if options is None: 1196 options = {} 1197 1198 collection_id, path, trigger = self._GetContainerIdWithPathForTrigger(collection_link, trigger) 1199 return self.Create(trigger, 1200 path, 1201 'triggers', 1202 collection_id, 1203 None, 1204 options) 1205 1206 def UpsertTrigger(self, collection_link, trigger, options=None): 1207 """Upserts a trigger in a collection. 1208 1209 :param str collection_link: 1210 The link to the document collection. 1211 :param dict trigger: 1212 :param dict options: 1213 The request options for the request. 1214 1215 :return: 1216 The upserted Trigger. 1217 :rtype: 1218 dict 1219 1220 """ 1221 if options is None: 1222 options = {} 1223 1224 collection_id, path, trigger = self._GetContainerIdWithPathForTrigger(collection_link, trigger) 1225 return self.Upsert(trigger, 1226 path, 1227 'triggers', 1228 collection_id, 1229 None, 1230 options) 1231 1232 def _GetContainerIdWithPathForTrigger(self, collection_link, trigger): 1233 CosmosClient.__ValidateResource(trigger) 1234 trigger = trigger.copy() 1235 if trigger.get('serverScript'): 1236 trigger['body'] = str(trigger.pop('serverScript', '')) 1237 elif trigger.get('body'): 1238 trigger['body'] = str(trigger['body']) 1239 1240 path = base.GetPathFromLink(collection_link, 'triggers') 1241 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1242 return collection_id, path, trigger 1243 1244 1245 def ReadTrigger(self, trigger_link, options=None): 1246 """Reads a trigger. 1247 1248 :param str trigger_link: 1249 The link to the trigger. 1250 :param dict options: 1251 The request options for the request. 1252 1253 :return: 1254 The read Trigger. 1255 :rtype: 1256 dict 1257 1258 """ 1259 if options is None: 1260 options = {} 1261 1262 path = base.GetPathFromLink(trigger_link) 1263 trigger_id = base.GetResourceIdOrFullNameFromLink(trigger_link) 1264 return self.Read(path, 'triggers', trigger_id, None, options) 1265 1266 def ReadUserDefinedFunctions(self, collection_link, options=None): 1267 """Reads all user defined functions in a collection. 1268 1269 :param str collection_link: 1270 The link to the document collection. 1271 :param dict options: 1272 The request options for the request. 1273 1274 :return: 1275 Query Iterable of UDFs. 1276 :rtype: 1277 query_iterable.QueryIterable 1278 1279 """ 1280 if options is None: 1281 options = {} 1282 1283 return self.QueryUserDefinedFunctions(collection_link, None, options) 1284 1285 def QueryUserDefinedFunctions(self, collection_link, query, options=None): 1286 """Queries user defined functions in a collection. 1287 1288 :param str collection_link: 1289 The link to the collection. 1290 :param (str or dict) query: 1291 :param dict options: 1292 The request options for the request. 1293 1294 :return: 1295 Query Iterable of UDFs. 1296 :rtype: 1297 query_iterable.QueryIterable 1298 1299 """ 1300 if options is None: 1301 options = {} 1302 1303 path = base.GetPathFromLink(collection_link, 'udfs') 1304 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1305 def fetch_fn(options): 1306 return self.__QueryFeed(path, 1307 'udfs', 1308 collection_id, 1309 lambda r: r['UserDefinedFunctions'], 1310 lambda _, b: b, 1311 query, 1312 options), self.last_response_headers 1313 return query_iterable.QueryIterable(self, query, options, fetch_fn) 1314 1315 def CreateUserDefinedFunction(self, collection_link, udf, options=None): 1316 """Creates a user defined function in a collection. 1317 1318 :param str collection_link: 1319 The link to the collection. 1320 :param str udf: 1321 :param dict options: 1322 The request options for the request. 1323 1324 :return: 1325 The created UDF. 1326 :rtype: 1327 dict 1328 1329 """ 1330 if options is None: 1331 options = {} 1332 1333 collection_id, path, udf = self._GetContainerIdWithPathForUDF(collection_link, udf) 1334 return self.Create(udf, 1335 path, 1336 'udfs', 1337 collection_id, 1338 None, 1339 options) 1340 1341 def UpsertUserDefinedFunction(self, collection_link, udf, options=None): 1342 """Upserts a user defined function in a collection. 1343 1344 :param str collection_link: 1345 The link to the collection. 1346 :param str udf: 1347 :param dict options: 1348 The request options for the request. 1349 1350 :return: 1351 The upserted UDF. 1352 :rtype: 1353 dict 1354 1355 """ 1356 if options is None: 1357 options = {} 1358 1359 collection_id, path, udf = self._GetContainerIdWithPathForUDF(collection_link, udf) 1360 return self.Upsert(udf, 1361 path, 1362 'udfs', 1363 collection_id, 1364 None, 1365 options) 1366 1367 def _GetContainerIdWithPathForUDF(self, collection_link, udf): 1368 CosmosClient.__ValidateResource(udf) 1369 udf = udf.copy() 1370 if udf.get('serverScript'): 1371 udf['body'] = str(udf.pop('serverScript', '')) 1372 elif udf.get('body'): 1373 udf['body'] = str(udf['body']) 1374 1375 path = base.GetPathFromLink(collection_link, 'udfs') 1376 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1377 return collection_id, path, udf 1378 1379 1380 def ReadUserDefinedFunction(self, udf_link, options=None): 1381 """Reads a user defined function. 1382 1383 :param str udf_link: 1384 The link to the user defined function. 1385 :param dict options: 1386 The request options for the request. 1387 1388 :return: 1389 The read UDF. 1390 :rtype: 1391 dict 1392 1393 """ 1394 if options is None: 1395 options = {} 1396 1397 path = base.GetPathFromLink(udf_link) 1398 udf_id = base.GetResourceIdOrFullNameFromLink(udf_link) 1399 return self.Read(path, 'udfs', udf_id, None, options) 1400 1401 def ReadStoredProcedures(self, collection_link, options=None): 1402 """Reads all store procedures in a collection. 1403 1404 :param str collection_link: 1405 The link to the document collection. 1406 :param dict options: 1407 The request options for the request. 1408 1409 :return: 1410 Query Iterable of Stored Procedures. 1411 :rtype: 1412 query_iterable.QueryIterable 1413 1414 """ 1415 if options is None: 1416 options = {} 1417 1418 return self.QueryStoredProcedures(collection_link, None, options) 1419 1420 def QueryStoredProcedures(self, collection_link, query, options=None): 1421 """Queries stored procedures in a collection. 1422 1423 :param str collection_link: 1424 The link to the document collection. 1425 :param (str or dict) query: 1426 :param dict options: 1427 The request options for the request. 1428 1429 :return: 1430 Query Iterable of Stored Procedures. 1431 :rtype: 1432 query_iterable.QueryIterable 1433 1434 """ 1435 if options is None: 1436 options = {} 1437 1438 path = base.GetPathFromLink(collection_link, 'sprocs') 1439 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1440 def fetch_fn(options): 1441 return self.__QueryFeed(path, 1442 'sprocs', 1443 collection_id, 1444 lambda r: r['StoredProcedures'], 1445 lambda _, b: b, 1446 query, 1447 options), self.last_response_headers 1448 return query_iterable.QueryIterable(self, query, options, fetch_fn) 1449 1450 def CreateStoredProcedure(self, collection_link, sproc, options=None): 1451 """Creates a stored procedure in a collection. 1452 1453 :param str collection_link: 1454 The link to the document collection. 1455 :param str sproc: 1456 :param dict options: 1457 The request options for the request. 1458 1459 :return: 1460 The created Stored Procedure. 1461 :rtype: 1462 dict 1463 1464 """ 1465 if options is None: 1466 options = {} 1467 1468 collection_id, path, sproc = self._GetContainerIdWithPathForSproc(collection_link, sproc) 1469 return self.Create(sproc, 1470 path, 1471 'sprocs', 1472 collection_id, 1473 None, 1474 options) 1475 1476 def UpsertStoredProcedure(self, collection_link, sproc, options=None): 1477 """Upserts a stored procedure in a collection. 1478 1479 :param str collection_link: 1480 The link to the document collection. 1481 :param str sproc: 1482 :param dict options: 1483 The request options for the request. 1484 1485 :return: 1486 The upserted Stored Procedure. 1487 :rtype: 1488 dict 1489 1490 """ 1491 if options is None: 1492 options = {} 1493 1494 collection_id, path, sproc = self._GetContainerIdWithPathForSproc(collection_link, sproc) 1495 return self.Upsert(sproc, 1496 path, 1497 'sprocs', 1498 collection_id, 1499 None, 1500 options) 1501 1502 def _GetContainerIdWithPathForSproc(self, collection_link, sproc): 1503 CosmosClient.__ValidateResource(sproc) 1504 sproc = sproc.copy() 1505 if sproc.get('serverScript'): 1506 sproc['body'] = str(sproc.pop('serverScript', '')) 1507 elif sproc.get('body'): 1508 sproc['body'] = str(sproc['body']) 1509 path = base.GetPathFromLink(collection_link, 'sprocs') 1510 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1511 return collection_id, path, sproc 1512 1513 1514 def ReadStoredProcedure(self, sproc_link, options=None): 1515 """Reads a stored procedure. 1516 1517 :param str sproc_link: 1518 The link to the stored procedure. 1519 :param dict options: 1520 The request options for the request. 1521 1522 :return: 1523 The read Stored Procedure. 1524 :rtype: 1525 dict 1526 1527 """ 1528 if options is None: 1529 options = {} 1530 1531 path = base.GetPathFromLink(sproc_link) 1532 sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link) 1533 return self.Read(path, 'sprocs', sproc_id, None, options) 1534 1535 def ReadConflicts(self, collection_link, feed_options=None): 1536 """Reads conflicts. 1537 1538 :param str collection_link: 1539 The link to the document collection. 1540 :param dict feed_options: 1541 1542 :return: 1543 Query Iterable of Conflicts. 1544 :rtype: 1545 query_iterable.QueryIterable 1546 1547 """ 1548 if feed_options is None: 1549 feed_options = {} 1550 1551 return self.QueryConflicts(collection_link, None, feed_options) 1552 1553 def QueryConflicts(self, collection_link, query, options=None): 1554 """Queries conflicts in a collection. 1555 1556 :param str collection_link: 1557 The link to the document collection. 1558 :param (str or dict) query: 1559 :param dict options: 1560 The request options for the request. 1561 1562 :return: 1563 Query Iterable of Conflicts. 1564 :rtype: 1565 query_iterable.QueryIterable 1566 1567 """ 1568 if options is None: 1569 options = {} 1570 1571 path = base.GetPathFromLink(collection_link, 'conflicts') 1572 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1573 def fetch_fn(options): 1574 return self.__QueryFeed(path, 1575 'conflicts', 1576 collection_id, 1577 lambda r: r['Conflicts'], 1578 lambda _, b: b, 1579 query, 1580 options), self.last_response_headers 1581 return query_iterable.QueryIterable(self, query, options, fetch_fn) 1582 1583 def ReadConflict(self, conflict_link, options=None): 1584 """Reads a conflict. 1585 1586 :param str conflict_link: 1587 The link to the conflict. 1588 :param dict options: 1589 1590 :return: 1591 The read Conflict. 1592 :rtype: 1593 dict 1594 1595 """ 1596 if options is None: 1597 options = {} 1598 1599 path = base.GetPathFromLink(conflict_link) 1600 conflict_id = base.GetResourceIdOrFullNameFromLink(conflict_link) 1601 return self.Read(path, 1602 'conflicts', 1603 conflict_id, 1604 None, 1605 options) 1606 1607 def DeleteContainer(self, collection_link, options=None): 1608 """Deletes a collection. 1609 1610 :param str collection_link: 1611 The link to the document collection. 1612 :param dict options: 1613 The request options for the request. 1614 1615 :return: 1616 The deleted Collection. 1617 :rtype: 1618 dict 1619 1620 """ 1621 if options is None: 1622 options = {} 1623 1624 path = base.GetPathFromLink(collection_link) 1625 collection_id = base.GetResourceIdOrFullNameFromLink(collection_link) 1626 return self.DeleteResource(path, 1627 'colls', 1628 collection_id, 1629 None, 1630 options) 1631 1632 def ReplaceItem(self, document_link, new_document, options=None): 1633 """Replaces a document and returns it. 1634 1635 :param str document_link: 1636 The link to the document. 1637 :param dict new_document: 1638 :param dict options: 1639 The request options for the request. 1640 1641 :return: 1642 The new Document. 1643 :rtype: 1644 dict 1645 1646 """ 1647 CosmosClient.__ValidateResource(new_document) 1648 path = base.GetPathFromLink(document_link) 1649 document_id = base.GetResourceIdOrFullNameFromLink(document_link) 1650 1651 # Python's default arguments are evaluated once when the function is defined, not each time the function is called (like it is in say, Ruby). 1652 # This means that if you use a mutable default argument and mutate it, you will and have mutated that object for all future calls to the function as well. 1653 # So, using a non-mutable deafult in this case(None) and assigning an empty dict(mutable) inside the function so that it remains local 1654 # For more details on this gotcha, please refer http://docs.python-guide.org/en/latest/writing/gotchas/ 1655 if options is None: 1656 options = {} 1657 1658 # Extract the document collection link and add the partition key to options 1659 collection_link = base.GetItemContainerLink(document_link) 1660 options = self._AddPartitionKey(collection_link, new_document, options) 1661 1662 return self.Replace(new_document, 1663 path, 1664 'docs', 1665 document_id, 1666 None, 1667 options) 1668 1669 def DeleteItem(self, document_link, options=None): 1670 """Deletes a document. 1671 1672 :param str document_link: 1673 The link to the document. 1674 :param dict options: 1675 The request options for the request. 1676 1677 :return: 1678 The deleted Document. 1679 :rtype: 1680 dict 1681 1682 """ 1683 if options is None: 1684 options = {} 1685 1686 path = base.GetPathFromLink(document_link) 1687 document_id = base.GetResourceIdOrFullNameFromLink(document_link) 1688 return self.DeleteResource(path, 1689 'docs', 1690 document_id, 1691 None, 1692 options) 1693 1694 def CreateAttachment(self, document_link, attachment, options=None): 1695 """Creates an attachment in a document. 1696 1697 :param str document_link: 1698 The link to the document. 1699 :param dict attachment: 1700 The Azure Cosmos attachment to create. 1701 :param dict options: 1702 The request options for the request. 1703 1704 :return: 1705 The created Attachment. 1706 :rtype: 1707 dict 1708 1709 """ 1710 if options is None: 1711 options = {} 1712 1713 document_id, path = self._GetItemIdWithPathForAttachment(attachment, document_link) 1714 return self.Create(attachment, 1715 path, 1716 'attachments', 1717 document_id, 1718 None, 1719 options) 1720 1721 def UpsertAttachment(self, document_link, attachment, options=None): 1722 """Upserts an attachment in a document. 1723 1724 :param str document_link: 1725 The link to the document. 1726 :param dict attachment: 1727 The Azure Cosmos attachment to upsert. 1728 :param dict options: 1729 The request options for the request. 1730 1731 :return: 1732 The upserted Attachment. 1733 :rtype: 1734 dict 1735 1736 """ 1737 if options is None: 1738 options = {} 1739 1740 document_id, path = self._GetItemIdWithPathForAttachment(attachment, document_link) 1741 return self.Upsert(attachment, 1742 path, 1743 'attachments', 1744 document_id, 1745 None, 1746 options) 1747 1748 def _GetItemIdWithPathForAttachment(self, attachment, document_link): 1749 CosmosClient.__ValidateResource(attachment) 1750 path = base.GetPathFromLink(document_link, 'attachments') 1751 document_id = base.GetResourceIdOrFullNameFromLink(document_link) 1752 return document_id, path 1753 1754 def CreateAttachmentAndUploadMedia(self, 1755 document_link, 1756 readable_stream, 1757 options=None): 1758 """Creates an attachment and upload media. 1759 1760 :param str document_link: 1761 The link to the document. 1762 :param (file-like stream object) readable_stream: 1763 :param dict options: 1764 The request options for the request. 1765 1766 :return: 1767 The created Attachment. 1768 :rtype: 1769 dict 1770 1771 """ 1772 if options is None: 1773 options = {} 1774 1775 document_id, initial_headers, path = self._GetItemIdWithPathForAttachmentMedia(document_link, options) 1776 return self.Create(readable_stream, 1777 path, 1778 'attachments', 1779 document_id, 1780 initial_headers, 1781 options) 1782 1783 def UpsertAttachmentAndUploadMedia(self, 1784 document_link, 1785 readable_stream, 1786 options=None): 1787 """Upserts an attachment and upload media. 1788 1789 :param str document_link: 1790 The link to the document. 1791 :param (file-like stream object) readable_stream: 1792 :param dict options: 1793 The request options for the request. 1794 1795 :return: 1796 The upserted Attachment. 1797 :rtype: 1798 dict 1799 1800 """ 1801 if options is None: 1802 options = {} 1803 1804 document_id, initial_headers, path = self._GetItemIdWithPathForAttachmentMedia(document_link, options) 1805 return self.Upsert(readable_stream, 1806 path, 1807 'attachments', 1808 document_id, 1809 initial_headers, 1810 options) 1811 1812 def _GetItemIdWithPathForAttachmentMedia(self, document_link, options): 1813 initial_headers = dict(self.default_headers) 1814 1815 # Add required headers slug and content-type. 1816 if options.get('slug'): 1817 initial_headers[http_constants.HttpHeaders.Slug] = options['slug'] 1818 1819 if options.get('contentType'): 1820 initial_headers[http_constants.HttpHeaders.ContentType] = ( 1821 options['contentType']) 1822 else: 1823 initial_headers[http_constants.HttpHeaders.ContentType] = ( 1824 runtime_constants.MediaTypes.OctetStream) 1825 1826 path = base.GetPathFromLink(document_link, 'attachments') 1827 document_id = base.GetResourceIdOrFullNameFromLink(document_link) 1828 return document_id, initial_headers, path 1829 1830 1831 def ReadAttachment(self, attachment_link, options=None): 1832 """Reads an attachment. 1833 1834 :param str attachment_link: 1835 The link to the attachment. 1836 :param dict options: 1837 The request options for the request. 1838 1839 :return: 1840 The read Attachment. 1841 :rtype: 1842 dict 1843 1844 """ 1845 if options is None: 1846 options = {} 1847 1848 path = base.GetPathFromLink(attachment_link) 1849 attachment_id = base.GetResourceIdOrFullNameFromLink(attachment_link) 1850 return self.Read(path, 1851 'attachments', 1852 attachment_id, 1853 None, 1854 options) 1855 1856 def ReadAttachments(self, document_link, options=None): 1857 """Reads all attachments in a document. 1858 1859 :param str document_link: 1860 The link to the document. 1861 :param dict options: 1862 The request options for the request. 1863 1864 :return: 1865 Query Iterable of Attachments. 1866 :rtype: 1867 query_iterable.QueryIterable 1868 1869 """ 1870 if options is None: 1871 options = {} 1872 1873 return self.QueryAttachments(document_link, None, options) 1874 1875 def QueryAttachments(self, document_link, query, options=None): 1876 """Queries attachments in a document. 1877 1878 :param str document_link: 1879 The link to the document. 1880 :param (str or dict) query: 1881 :param dict options: 1882 The request options for the request. 1883 1884 :return: 1885 Query Iterable of Attachments. 1886 :rtype: 1887 query_iterable.QueryIterable 1888 1889 """ 1890 if options is None: 1891 options = {} 1892 1893 path = base.GetPathFromLink(document_link, 'attachments') 1894 document_id = base.GetResourceIdOrFullNameFromLink(document_link) 1895 1896 def fetch_fn(options): 1897 return self.__QueryFeed(path, 1898 'attachments', 1899 document_id, 1900 lambda r: r['Attachments'], 1901 lambda _, b: b, 1902 query, 1903 options), self.last_response_headers 1904 return query_iterable.QueryIterable(self, query, options, fetch_fn) 1905 1906 1907 def ReadMedia(self, media_link): 1908 """Reads a media. 1909 1910 When self.connection_policy.MediaReadMode == 1911 documents.MediaReadMode.Streamed, returns a file-like stream object; 1912 otherwise, returns a str. 1913 1914 :param str media_link: 1915 The link to the media. 1916 1917 :return: 1918 The read Media. 1919 :rtype: 1920 str or file-like stream object 1921 1922 """ 1923 default_headers = self.default_headers 1924 1925 path = base.GetPathFromLink(media_link) 1926 media_id = base.GetResourceIdOrFullNameFromLink(media_link) 1927 attachment_id = base.GetAttachmentIdFromMediaId(media_id) 1928 headers = base.GetHeaders(self, 1929 default_headers, 1930 'get', 1931 path, 1932 attachment_id, 1933 'media', 1934 {}) 1935 1936 # ReadMedia will always use WriteEndpoint since it's not replicated in readable Geo regions 1937 request = request_object._RequestObject('media', documents._OperationType.Read) 1938 result, self.last_response_headers = self.__Get(path, 1939 request, 1940 headers) 1941 return result 1942 1943 def UpdateMedia(self, media_link, readable_stream, options=None): 1944 """Updates a media and returns it. 1945 1946 :param str media_link: 1947 The link to the media. 1948 :param (file-like stream object) readable_stream: 1949 :param dict options: 1950 The request options for the request. 1951 1952 :return: 1953 The updated Media. 1954 :rtype: 1955 str or file-like stream object 1956 1957 """ 1958 if options is None: 1959 options = {} 1960 1961 initial_headers = dict(self.default_headers) 1962 1963 # Add required headers slug and content-type in case the body is a stream 1964 if options.get('slug'): 1965 initial_headers[http_constants.HttpHeaders.Slug] = options['slug'] 1966 1967 if options.get('contentType'): 1968 initial_headers[http_constants.HttpHeaders.ContentType] = ( 1969 options['contentType']) 1970 else: 1971 initial_headers[http_constants.HttpHeaders.ContentType] = ( 1972 runtime_constants.MediaTypes.OctetStream) 1973 1974 path = base.GetPathFromLink(media_link) 1975 media_id = base.GetResourceIdOrFullNameFromLink(media_link) 1976 attachment_id = base.GetAttachmentIdFromMediaId(media_id) 1977 headers = base.GetHeaders(self, 1978 initial_headers, 1979 'put', 1980 path, 1981 attachment_id, 1982 'media', 1983 options) 1984 1985 # UpdateMedia will use WriteEndpoint since it uses PUT operation 1986 request = request_object._RequestObject('media', documents._OperationType.Update) 1987 result, self.last_response_headers = self.__Put(path, 1988 request, 1989 readable_stream, 1990 headers) 1991 1992 self._UpdateSessionIfRequired(headers, result, self.last_response_headers) 1993 return result 1994 1995 def ReplaceAttachment(self, attachment_link, attachment, options=None): 1996 """Replaces an attachment and returns it. 1997 1998 :param str attachment_link: 1999 The link to the attachment. 2000 :param dict attachment: 2001 :param dict options: 2002 The request options for the request. 2003 2004 :return: 2005 The replaced Attachment 2006 :rtype: 2007 dict 2008 2009 """ 2010 if options is None: 2011 options = {} 2012 2013 CosmosClient.__ValidateResource(attachment) 2014 path = base.GetPathFromLink(attachment_link) 2015 attachment_id = base.GetResourceIdOrFullNameFromLink(attachment_link) 2016 return self.Replace(attachment, 2017 path, 2018 'attachments', 2019 attachment_id, 2020 None, 2021 options) 2022 2023 def DeleteAttachment(self, attachment_link, options=None): 2024 """Deletes an attachment. 2025 2026 :param str attachment_link: 2027 The link to the attachment. 2028 :param dict options: 2029 The request options for the request. 2030 2031 :return: 2032 The deleted Attachment. 2033 :rtype: 2034 dict 2035 2036 """ 2037 if options is None: 2038 options = {} 2039 2040 path = base.GetPathFromLink(attachment_link) 2041 attachment_id = base.GetResourceIdOrFullNameFromLink(attachment_link) 2042 return self.DeleteResource(path, 2043 'attachments', 2044 attachment_id, 2045 None, 2046 options) 2047 2048 def ReplaceTrigger(self, trigger_link, trigger, options=None): 2049 """Replaces a trigger and returns it. 2050 2051 :param str trigger_link: 2052 The link to the trigger. 2053 :param dict trigger: 2054 :param dict options: 2055 The request options for the request. 2056 2057 :return: 2058 The replaced Trigger. 2059 :rtype: 2060 dict 2061 2062 """ 2063 if options is None: 2064 options = {} 2065 2066 CosmosClient.__ValidateResource(trigger) 2067 trigger = trigger.copy() 2068 if trigger.get('serverScript'): 2069 trigger['body'] = str(trigger['serverScript']) 2070 elif trigger.get('body'): 2071 trigger['body'] = str(trigger['body']) 2072 2073 path = base.GetPathFromLink(trigger_link) 2074 trigger_id = base.GetResourceIdOrFullNameFromLink(trigger_link) 2075 return self.Replace(trigger, 2076 path, 2077 'triggers', 2078 trigger_id, 2079 None, 2080 options) 2081 2082 def DeleteTrigger(self, trigger_link, options=None): 2083 """Deletes a trigger. 2084 2085 :param str trigger_link: 2086 The link to the trigger. 2087 :param dict options: 2088 The request options for the request. 2089 2090 :return: 2091 The deleted Trigger. 2092 :rtype: 2093 dict 2094 2095 """ 2096 if options is None: 2097 options = {} 2098 2099 path = base.GetPathFromLink(trigger_link) 2100 trigger_id = base.GetResourceIdOrFullNameFromLink(trigger_link) 2101 return self.DeleteResource(path, 2102 'triggers', 2103 trigger_id, 2104 None, 2105 options) 2106 2107 def ReplaceUserDefinedFunction(self, udf_link, udf, options=None): 2108 """Replaces a user defined function and returns it. 2109 2110 :param str udf_link: 2111 The link to the user defined function. 2112 :param dict udf: 2113 :param dict options: 2114 The request options for the request. 2115 2116 :return: 2117 The new UDF. 2118 :rtype: 2119 dict 2120 2121 """ 2122 if options is None: 2123 options = {} 2124 2125 CosmosClient.__ValidateResource(udf) 2126 udf = udf.copy() 2127 if udf.get('serverScript'): 2128 udf['body'] = str(udf['serverScript']) 2129 elif udf.get('body'): 2130 udf['body'] = str(udf['body']) 2131 2132 path = base.GetPathFromLink(udf_link) 2133 udf_id = base.GetResourceIdOrFullNameFromLink(udf_link) 2134 return self.Replace(udf, 2135 path, 2136 'udfs', 2137 udf_id, 2138 None, 2139 options) 2140 2141 def DeleteUserDefinedFunction(self, udf_link, options=None): 2142 """Deletes a user defined function. 2143 2144 :param str udf_link: 2145 The link to the user defined function. 2146 :param dict options: 2147 The request options for the request. 2148 2149 :return: 2150 The deleted UDF. 2151 :rtype: 2152 dict 2153 2154 """ 2155 if options is None: 2156 options = {} 2157 2158 path = base.GetPathFromLink(udf_link) 2159 udf_id = base.GetResourceIdOrFullNameFromLink(udf_link) 2160 return self.DeleteResource(path, 2161 'udfs', 2162 udf_id, 2163 None, 2164 options) 2165 2166 def ExecuteStoredProcedure(self, sproc_link, params, options=None): 2167 """Executes a store procedure. 2168 2169 :param str sproc_link: 2170 The link to the stored procedure. 2171 :param dict params: 2172 List or None 2173 :param dict options: 2174 The request options for the request. 2175 2176 :return: 2177 The Stored Procedure response. 2178 :rtype: 2179 dict 2180 2181 """ 2182 if options is None: 2183 options = {} 2184 2185 initial_headers = dict(self.default_headers) 2186 initial_headers.update({ 2187 http_constants.HttpHeaders.Accept: ( 2188 runtime_constants.MediaTypes.Json) 2189 }) 2190 2191 if params and not type(params) is list: 2192 params = [params] 2193 2194 path = base.GetPathFromLink(sproc_link) 2195 sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link) 2196 headers = base.GetHeaders(self, 2197 initial_headers, 2198 'post', 2199 path, 2200 sproc_id, 2201 'sprocs', 2202 options) 2203 2204 # ExecuteStoredProcedure will use WriteEndpoint since it uses POST operation 2205 request = request_object._RequestObject('sprocs', documents._OperationType.ExecuteJavaScript) 2206 result, self.last_response_headers = self.__Post(path, 2207 request, 2208 params, 2209 headers) 2210 return result 2211 2212 def ReplaceStoredProcedure(self, sproc_link, sproc, options=None): 2213 """Replaces a stored procedure and returns it. 2214 2215 :param str sproc_link: 2216 The link to the stored procedure. 2217 :param dict sproc: 2218 :param dict options: 2219 The request options for the request. 2220 2221 :return: 2222 The replaced Stored Procedure. 2223 :rtype: 2224 dict 2225 2226 """ 2227 if options is None: 2228 options = {} 2229 2230 CosmosClient.__ValidateResource(sproc) 2231 sproc = sproc.copy() 2232 if sproc.get('serverScript'): 2233 sproc['body'] = str(sproc['serverScript']) 2234 elif sproc.get('body'): 2235 sproc['body'] = str(sproc['body']) 2236 2237 path = base.GetPathFromLink(sproc_link) 2238 sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link) 2239 return self.Replace(sproc, 2240 path, 2241 'sprocs', 2242 sproc_id, 2243 None, 2244 options) 2245 2246 def DeleteStoredProcedure(self, sproc_link, options=None): 2247 """Deletes a stored procedure. 2248 2249 :param str sproc_link: 2250 The link to the stored procedure. 2251 :param dict options: 2252 The request options for the request. 2253 2254 :return: 2255 The deleted Stored Procedure. 2256 :rtype: 2257 dict 2258 2259 """ 2260 if options is None: 2261 options = {} 2262 2263 path = base.GetPathFromLink(sproc_link) 2264 sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link) 2265 return self.DeleteResource(path, 2266 'sprocs', 2267 sproc_id, 2268 None, 2269 options) 2270 2271 def DeleteConflict(self, conflict_link, options=None): 2272 """Deletes a conflict. 2273 2274 :param str conflict_link: 2275 The link to the conflict. 2276 :param dict options: 2277 The request options for the request. 2278 2279 :return: 2280 The deleted Conflict. 2281 :rtype: 2282 dict 2283 2284 """ 2285 if options is None: 2286 options = {} 2287 2288 path = base.GetPathFromLink(conflict_link) 2289 conflict_id = base.GetResourceIdOrFullNameFromLink(conflict_link) 2290 return self.DeleteResource(path, 2291 'conflicts', 2292 conflict_id, 2293 None, 2294 options) 2295 2296 def ReplaceOffer(self, offer_link, offer): 2297 """Replaces an offer and returns it. 2298 2299 :param str offer_link: 2300 The link to the offer. 2301 :param dict offer: 2302 2303 :return: 2304 The replaced Offer. 2305 :rtype: 2306 dict 2307 2308 """ 2309 CosmosClient.__ValidateResource(offer) 2310 path = base.GetPathFromLink(offer_link) 2311 offer_id = base.GetResourceIdOrFullNameFromLink(offer_link) 2312 return self.Replace(offer, path, 'offers', offer_id, None, None) 2313 2314 def ReadOffer(self, offer_link): 2315 """Reads an offer. 2316 2317 :param str offer_link: 2318 The link to the offer. 2319 2320 :return: 2321 The read Offer. 2322 :rtype: 2323 dict 2324 2325 """ 2326 path = base.GetPathFromLink(offer_link) 2327 offer_id = base.GetResourceIdOrFullNameFromLink(offer_link) 2328 return self.Read(path, 'offers', offer_id, None, {}) 2329 2330 def ReadOffers(self, options=None): 2331 """Reads all offers. 2332 2333 :param dict options: 2334 The request options for the request 2335 2336 :return: 2337 Query Iterable of Offers. 2338 :rtype: 2339 query_iterable.QueryIterable 2340 2341 """ 2342 if options is None: 2343 options = {} 2344 2345 return self.QueryOffers(None, options) 2346 2347 def QueryOffers(self, query, options=None): 2348 """Query for all offers. 2349 2350 :param (str or dict) query: 2351 :param dict options: 2352 The request options for the request 2353 2354 :return: 2355 Query Iterable of Offers. 2356 :rtype: 2357 query_iterable.QueryIterable 2358 2359 """ 2360 if options is None: 2361 options = {} 2362 2363 def fetch_fn(options): 2364 return self.__QueryFeed('/offers', 2365 'offers', 2366 '', 2367 lambda r: r['Offers'], 2368 lambda _, b: b, 2369 query, 2370 options), self.last_response_headers 2371 return query_iterable.QueryIterable(self, query, options, fetch_fn) 2372 2373 def GetDatabaseAccount(self, url_connection=None): 2374 """Gets database account info. 2375 2376 :return: 2377 The Database Account. 2378 :rtype: 2379 documents.DatabaseAccount 2380 2381 """ 2382 if url_connection is None: 2383 url_connection = self.url_connection 2384 2385 initial_headers = dict(self.default_headers) 2386 headers = base.GetHeaders(self, 2387 initial_headers, 2388 'get', 2389 '', # path 2390 '', # id 2391 '', # type 2392 {}) 2393 2394 request = request_object._RequestObject('databaseaccount', documents._OperationType.Read, url_connection) 2395 result, self.last_response_headers = self.__Get('', 2396 request, 2397 headers) 2398 database_account = documents.DatabaseAccount() 2399 database_account.DatabasesLink = '/dbs/' 2400 database_account.MediaLink = '/media/' 2401 if (http_constants.HttpHeaders.MaxMediaStorageUsageInMB in 2402 self.last_response_headers): 2403 database_account.MaxMediaStorageUsageInMB = ( 2404 self.last_response_headers[ 2405 http_constants.HttpHeaders.MaxMediaStorageUsageInMB]) 2406 if (http_constants.HttpHeaders.CurrentMediaStorageUsageInMB in 2407 self.last_response_headers): 2408 database_account.CurrentMediaStorageUsageInMB = ( 2409 self.last_response_headers[ 2410 http_constants.HttpHeaders.CurrentMediaStorageUsageInMB]) 2411 database_account.ConsistencyPolicy = result.get(constants._Constants.UserConsistencyPolicy) 2412 2413 # WritableLocations and ReadableLocations fields will be available only for geo-replicated database accounts 2414 if constants._Constants.WritableLocations in result: 2415 database_account._WritableLocations = result[constants._Constants.WritableLocations] 2416 if constants._Constants.ReadableLocations in result: 2417 database_account._ReadableLocations = result[constants._Constants.ReadableLocations] 2418 if constants._Constants.EnableMultipleWritableLocations in result: 2419 database_account._EnableMultipleWritableLocations = result[constants._Constants.EnableMultipleWritableLocations] 2420 2421 self._useMultipleWriteLocations = self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations 2422 return database_account 2423 2424 def Create(self, body, path, type, id, initial_headers, options=None): 2425 """Creates a Azure Cosmos resource and returns it. 2426 2427 :param dict body: 2428 :param str path: 2429 :param str type: 2430 :param str id: 2431 :param dict initial_headers: 2432 :param dict options: 2433 The request options for the request. 2434 2435 :return: 2436 The created Azure Cosmos resource. 2437 :rtype: 2438 dict 2439 2440 """ 2441 if options is None: 2442 options = {} 2443 2444 initial_headers = initial_headers or self.default_headers 2445 headers = base.GetHeaders(self, 2446 initial_headers, 2447 'post', 2448 path, 2449 id, 2450 type, 2451 options) 2452 # Create will use WriteEndpoint since it uses POST operation 2453 2454 request = request_object._RequestObject(type, documents._OperationType.Create) 2455 result, self.last_response_headers = self.__Post(path, 2456 request, 2457 body, 2458 headers) 2459 2460 # update session for write request 2461 self._UpdateSessionIfRequired(headers, result, self.last_response_headers) 2462 return result 2463 2464 def Upsert(self, body, path, type, id, initial_headers, options=None): 2465 """Upserts a Azure Cosmos resource and returns it. 2466 2467 :param dict body: 2468 :param str path: 2469 :param str type: 2470 :param str id: 2471 :param dict initial_headers: 2472 :param dict options: 2473 The request options for the request. 2474 2475 :return: 2476 The upserted Azure Cosmos resource. 2477 :rtype: 2478 dict 2479 2480 """ 2481 if options is None: 2482 options = {} 2483 2484 initial_headers = initial_headers or self.default_headers 2485 headers = base.GetHeaders(self, 2486 initial_headers, 2487 'post', 2488 path, 2489 id, 2490 type, 2491 options) 2492 2493 headers[http_constants.HttpHeaders.IsUpsert] = True 2494 2495 # Upsert will use WriteEndpoint since it uses POST operation 2496 request = request_object._RequestObject(type, documents._OperationType.Upsert) 2497 result, self.last_response_headers = self.__Post(path, 2498 request, 2499 body, 2500 headers) 2501 # update session for write request 2502 self._UpdateSessionIfRequired(headers, result, self.last_response_headers) 2503 return result 2504 2505 def Replace(self, resource, path, type, id, initial_headers, options=None): 2506 """Replaces a Azure Cosmos resource and returns it. 2507 2508 :param dict resource: 2509 :param str path: 2510 :param str type: 2511 :param str id: 2512 :param dict initial_headers: 2513 :param dict options: 2514 The request options for the request. 2515 2516 :return: 2517 The new Azure Cosmos resource. 2518 :rtype: 2519 dict 2520 2521 """ 2522 if options is None: 2523 options = {} 2524 2525 initial_headers = initial_headers or self.default_headers 2526 headers = base.GetHeaders(self, 2527 initial_headers, 2528 'put', 2529 path, 2530 id, 2531 type, 2532 options) 2533 # Replace will use WriteEndpoint since it uses PUT operation 2534 request = request_object._RequestObject(type, documents._OperationType.Replace) 2535 result, self.last_response_headers = self.__Put(path, 2536 request, 2537 resource, 2538 headers) 2539 2540 # update session for request mutates data on server side 2541 self._UpdateSessionIfRequired(headers, result, self.last_response_headers) 2542 return result 2543 2544 def Read(self, path, type, id, initial_headers, options=None): 2545 """Reads a Azure Cosmos resource and returns it. 2546 2547 :param str path: 2548 :param str type: 2549 :param str id: 2550 :param dict initial_headers: 2551 :param dict options: 2552 The request options for the request. 2553 2554 :return: 2555 The upserted Azure Cosmos resource. 2556 :rtype: 2557 dict 2558 2559 """ 2560 if options is None: 2561 options = {} 2562 2563 initial_headers = initial_headers or self.default_headers 2564 headers = base.GetHeaders(self, 2565 initial_headers, 2566 'get', 2567 path, 2568 id, 2569 type, 2570 options) 2571 # Read will use ReadEndpoint since it uses GET operation 2572 request = request_object._RequestObject(type, documents._OperationType.Read) 2573 result, self.last_response_headers = self.__Get(path, 2574 request, 2575 headers) 2576 return result 2577 2578 def DeleteResource(self, path, type, id, initial_headers, options=None): 2579 """Deletes a Azure Cosmos resource and returns it. 2580 2581 :param str path: 2582 :param str type: 2583 :param str id: 2584 :param dict initial_headers: 2585 :param dict options: 2586 The request options for the request. 2587 2588 :return: 2589 The deleted Azure Cosmos resource. 2590 :rtype: 2591 dict 2592 2593 """ 2594 if options is None: 2595 options = {} 2596 2597 initial_headers = initial_headers or self.default_headers 2598 headers = base.GetHeaders(self, 2599 initial_headers, 2600 'delete', 2601 path, 2602 id, 2603 type, 2604 options) 2605 # Delete will use WriteEndpoint since it uses DELETE operation 2606 request = request_object._RequestObject(type, documents._OperationType.Delete) 2607 result, self.last_response_headers = self.__Delete(path, 2608 request, 2609 headers) 2610 2611 # update session for request mutates data on server side 2612 self._UpdateSessionIfRequired(headers, result, self.last_response_headers) 2613 2614 2615 return result 2616 2617 def __Get(self, path, request, headers): 2618 """Azure Cosmos 'GET' http request. 2619 2620 :params str url: 2621 :params str path: 2622 :params dict headers: 2623 2624 :return: 2625 Tuple of (result, headers). 2626 :rtype: 2627 tuple of (dict, dict) 2628 2629 """ 2630 return synchronized_request.SynchronizedRequest(self, 2631 request, 2632 self._global_endpoint_manager, 2633 self.connection_policy, 2634 self._requests_session, 2635 'GET', 2636 path, 2637 None, 2638 None, 2639 headers) 2640 2641 def __Post(self, path, request, body, headers): 2642 """Azure Cosmos 'POST' http request. 2643 2644 :params str url: 2645 :params str path: 2646 :params (str, unicode, dict) body: 2647 :params dict headers: 2648 2649 :return: 2650 Tuple of (result, headers). 2651 :rtype: 2652 tuple of (dict, dict) 2653 2654 """ 2655 return synchronized_request.SynchronizedRequest(self, 2656 request, 2657 self._global_endpoint_manager, 2658 self.connection_policy, 2659 self._requests_session, 2660 'POST', 2661 path, 2662 body, 2663 query_params=None, 2664 headers=headers) 2665 2666 def __Put(self, path, request, body, headers): 2667 """Azure Cosmos 'PUT' http request. 2668 2669 :params str url: 2670 :params str path: 2671 :params (str, unicode, dict) body: 2672 :params dict headers: 2673 2674 :return: 2675 Tuple of (result, headers). 2676 :rtype: 2677 tuple of (dict, dict) 2678 2679 """ 2680 return synchronized_request.SynchronizedRequest(self, 2681 request, 2682 self._global_endpoint_manager, 2683 self.connection_policy, 2684 self._requests_session, 2685 'PUT', 2686 path, 2687 body, 2688 query_params=None, 2689 headers=headers) 2690 2691 def __Delete(self, path, request, headers): 2692 """Azure Cosmos 'DELETE' http request. 2693 2694 :params str url: 2695 :params str path: 2696 :params dict headers: 2697 2698 :return: 2699 Tuple of (result, headers). 2700 :rtype: 2701 tuple of (dict, dict) 2702 2703 """ 2704 return synchronized_request.SynchronizedRequest(self, 2705 request, 2706 self._global_endpoint_manager, 2707 self.connection_policy, 2708 self._requests_session, 2709 'DELETE', 2710 path, 2711 request_data=None, 2712 query_params=None, 2713 headers=headers) 2714 2715 def QueryFeed(self, path, collection_id, query, options, partition_key_range_id = None): 2716 """Query Feed for Document Collection resource. 2717 2718 :param str path: 2719 Path to the document collection. 2720 :param str collection_id: 2721 Id of the document collection. 2722 :param (str or dict) query: 2723 :param dict options: 2724 The request options for the request. 2725 :param str partition_key_range_id: 2726 Partition key range id. 2727 :rtype: 2728 tuple 2729 2730 """ 2731 return self.__QueryFeed(path, 2732 'docs', 2733 collection_id, 2734 lambda r: r['Documents'], 2735 lambda _, b: b, 2736 query, 2737 options, 2738 partition_key_range_id), self.last_response_headers 2739 2740 def __QueryFeed(self, 2741 path, 2742 type, 2743 id, 2744 result_fn, 2745 create_fn, 2746 query, 2747 options=None, 2748 partition_key_range_id=None): 2749 """Query for more than one Azure Cosmos resources. 2750 2751 :param str path: 2752 :param str type: 2753 :param str id: 2754 :param function result_fn: 2755 :param function create_fn: 2756 :param (str or dict) query: 2757 :param dict options: 2758 The request options for the request. 2759 :param str partition_key_range_id: 2760 Specifies partition key range id. 2761 2762 :rtype: 2763 list 2764 2765 :raises SystemError: If the query compatibility mode is undefined. 2766 2767 """ 2768 if options is None: 2769 options = {} 2770 2771 if query: 2772 __GetBodiesFromQueryResult = result_fn 2773 else: 2774 def __GetBodiesFromQueryResult(result): 2775 if result is not None: 2776 return [create_fn(self, body) for body in result_fn(result)] 2777 else: 2778 # If there is no change feed, the result data is empty and result is None. 2779 # This case should be interpreted as an empty array. 2780 return [] 2781 2782 2783 initial_headers = self.default_headers.copy() 2784 # Copy to make sure that default_headers won't be changed. 2785 if query is None: 2786 # Query operations will use ReadEndpoint even though it uses GET(for feed requests) 2787 request = request_object._RequestObject(type, documents._OperationType.ReadFeed) 2788 headers = base.GetHeaders(self, 2789 initial_headers, 2790 'get', 2791 path, 2792 id, 2793 type, 2794 options, 2795 partition_key_range_id) 2796 result, self.last_response_headers = self.__Get(path, 2797 request, 2798 headers) 2799 return __GetBodiesFromQueryResult(result) 2800 else: 2801 query = self.__CheckAndUnifyQueryFormat(query) 2802 2803 initial_headers[http_constants.HttpHeaders.IsQuery] = 'true' 2804 if (self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Default or 2805 self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Query): 2806 initial_headers[http_constants.HttpHeaders.ContentType] = runtime_constants.MediaTypes.QueryJson 2807 elif self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.SqlQuery: 2808 initial_headers[http_constants.HttpHeaders.ContentType] = runtime_constants.MediaTypes.SQL 2809 else: 2810 raise SystemError('Unexpected query compatibility mode.') 2811 2812 # Query operations will use ReadEndpoint even though it uses POST(for regular query operations) 2813 request = request_object._RequestObject(type, documents._OperationType.SqlQuery) 2814 headers = base.GetHeaders(self, 2815 initial_headers, 2816 'post', 2817 path, 2818 id, 2819 type, 2820 options, 2821 partition_key_range_id) 2822 result, self.last_response_headers = self.__Post(path, 2823 request, 2824 query, 2825 headers) 2826 return __GetBodiesFromQueryResult(result) 2827 2828 def __CheckAndUnifyQueryFormat(self, query_body): 2829 """Checks and unifies the format of the query body. 2830 2831 :raises TypeError: If query_body is not of expected type (depending on the query compatibility mode). 2832 :raises ValueError: If query_body is a dict but doesn\'t have valid query text. 2833 :raises SystemError: If the query compatibility mode is undefined. 2834 2835 :param (str or dict) query_body: 2836 2837 :return: 2838 The formatted query body. 2839 :rtype: 2840 dict or string 2841 """ 2842 if (self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Default or 2843 self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Query): 2844 if not isinstance(query_body, dict) and not isinstance(query_body, six.string_types): 2845 raise TypeError('query body must be a dict or string.') 2846 if isinstance(query_body, dict) and not query_body.get('query'): 2847 raise ValueError('query body must have valid query text with key "query".') 2848 if isinstance(query_body, six.string_types): 2849 return {'query': query_body} 2850 elif (self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.SqlQuery and 2851 not isinstance(query_body, six.string_types)): 2852 raise TypeError('query body must be a string.') 2853 else: 2854 raise SystemError('Unexpected query compatibility mode.') 2855 2856 return query_body 2857 2858 @staticmethod 2859 def __ValidateResource(resource): 2860 id = resource.get('id') 2861 if id: 2862 if id.find('/') != -1 or id.find('\\') != -1 or id.find('?') != -1 or id.find('#') != -1: 2863 raise ValueError('Id contains illegal chars.') 2864 2865 if id[-1] == ' ': 2866 raise ValueError('Id ends with a space.') 2867 2868 # Adds the partition key to options 2869 def _AddPartitionKey(self, collection_link, document, options): 2870 collection_link = base.TrimBeginningAndEndingSlashes(collection_link) 2871 2872 #TODO: Refresh the cache if partition is extracted automatically and we get a 400.1001 2873 2874 # If the document collection link is present in the cache, then use the cached partitionkey definition 2875 if collection_link in self.partition_key_definition_cache: 2876 partitionKeyDefinition = self.partition_key_definition_cache.get(collection_link) 2877 # Else read the collection from backend and add it to the cache 2878 else: 2879 collection = self.ReadContainer(collection_link) 2880 partitionKeyDefinition = collection.get('partitionKey') 2881 self.partition_key_definition_cache[collection_link] = partitionKeyDefinition 2882 2883 # If the collection doesn't have a partition key definition, skip it as it's a legacy collection 2884 if partitionKeyDefinition: 2885 # If the user has passed in the partitionKey in options use that elase extract it from the document 2886 if('partitionKey' not in options): 2887 partitionKeyValue = self._ExtractPartitionKey(partitionKeyDefinition, document) 2888 options['partitionKey'] = partitionKeyValue 2889 2890 return options 2891 2892 # Extracts the partition key from the document using the partitionKey definition 2893 def _ExtractPartitionKey(self, partitionKeyDefinition, document): 2894 2895 # Parses the paths into a list of token each representing a property 2896 partition_key_parts = base.ParsePaths(partitionKeyDefinition.get('paths')) 2897 2898 # Navigates the document to retrieve the partitionKey specified in the paths 2899 return self._RetrievePartitionKey(partition_key_parts, document) 2900 2901 # Navigates the document to retrieve the partitionKey specified in the partition key parts 2902 def _RetrievePartitionKey(self, partition_key_parts, document): 2903 expected_matchCount = len(partition_key_parts) 2904 matchCount = 0 2905 partitionKey = document 2906 2907 for part in partition_key_parts: 2908 # At any point if we don't find the value of a sub-property in the document, we return as Undefined 2909 if part not in partitionKey: 2910 return documents.Undefined 2911 else: 2912 partitionKey = partitionKey.get(part) 2913 matchCount += 1 2914 # Once we reach the "leaf" value(not a dict), we break from loop 2915 if not isinstance(partitionKey, dict): 2916 break 2917 2918 # Match the count of hops we did to get the partitionKey with the length of partition key parts and validate that it's not a dict at that level 2919 if ((matchCount != expected_matchCount) or isinstance(partitionKey, dict)): 2920 return documents.Undefined 2921 2922 return partitionKey 2923 2924 def _UpdateSessionIfRequired(self, request_headers, response_result, response_headers): 2925 """ 2926 Updates session if necessary. 2927 2928 :param dict response_result: 2929 :param dict response_headers: 2930 :param dict response_headers 2931 2932 :return: 2933 None, but updates the client session if necessary. 2934 2935 """ 2936 2937 '''if this request was made with consistency level as session, then update 2938 the session''' 2939 2940 if response_result is None or response_headers is None: 2941 return 2942 2943 is_session_consistency = False 2944 if http_constants.HttpHeaders.ConsistencyLevel in request_headers: 2945 if documents.ConsistencyLevel.Session == request_headers[http_constants.HttpHeaders.ConsistencyLevel]: 2946 is_session_consistency = True 2947 2948 if is_session_consistency: 2949 # update session 2950 self.session.update_session(response_result, response_headers) 2951