1#The MIT License (MIT)
2#Copyright (c) 2014 Microsoft Corporation
3
4#Permission is hereby granted, free of charge, to any person obtaining a copy
5#of this software and associated documentation files (the "Software"), to deal
6#in the Software without restriction, including without limitation the rights
7#to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8#copies of the Software, and to permit persons to whom the Software is
9#furnished to do so, subject to the following conditions:
10
11#The above copyright notice and this permission notice shall be included in all
12#copies or substantial portions of the Software.
13
14#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15#IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18#LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20#SOFTWARE.
21
22"""Document client class for the Azure Cosmos database service.
23"""
24
25import requests
26from requests.adapters import HTTPAdapter
27
28import six
29import azure.cosmos.base as base
30import azure.cosmos.documents as documents
31import azure.cosmos.constants as constants
32import azure.cosmos.http_constants as http_constants
33import azure.cosmos.query_iterable as query_iterable
34import azure.cosmos.runtime_constants as runtime_constants
35import azure.cosmos.request_object as request_object
36import azure.cosmos.synchronized_request as synchronized_request
37import azure.cosmos.global_endpoint_manager as global_endpoint_manager
38import azure.cosmos.routing.routing_map_provider as routing_map_provider
39import azure.cosmos.session as session
40import azure.cosmos.utils as utils
41import os
42
43class CosmosClient(object):
44    """Represents a document client.
45
46    Provides a client-side logical representation of the Azure Cosmos
47    service. This client is used to configure and execute requests against the
48    service.
49
50    The service client encapsulates the endpoint and credentials used to access
51    the Azure Cosmos service.
52    """
53
54    class _QueryCompatibilityMode:
55        Default = 0
56        Query = 1
57        SqlQuery = 2
58
59    # default number precisions
60    _DefaultNumberHashPrecision = 3
61    _DefaultNumberRangePrecision = -1
62
63    # default string precision
64    _DefaultStringHashPrecision = 3
65    _DefaultStringRangePrecision = -1
66
67    def __init__(self,
68                 url_connection,
69                 auth,
70                 connection_policy=None,
71                 consistency_level=documents.ConsistencyLevel.Session):
72        """
73        :param str url_connection:
74            The URL for connecting to the DB server.
75        :param dict auth:
76            Contains 'masterKey' or 'resourceTokens', where
77            auth['masterKey'] is the default authorization key to use to
78            create the client, and auth['resourceTokens'] is the alternative
79            authorization key.
80        :param documents.ConnectionPolicy connection_policy:
81            The connection policy for the client.
82        :param documents.ConsistencyLevel consistency_level:
83            The default consistency policy for client operations.
84
85        if url_connection and auth are not provided,
86            COSMOS_ENDPOINT and COSMOS_KEY environment variables will be used.
87        """
88
89        self.url_connection = url_connection or os.environ.get('COSMOS_ENDPOINT')
90
91        self.master_key = None
92        self.resource_tokens = None
93        if auth is not None:
94            self.master_key = auth.get('masterKey')
95            self.resource_tokens = auth.get('resourceTokens')
96
97            if auth.get('permissionFeed'):
98                self.resource_tokens = {}
99                for permission_feed in auth['permissionFeed']:
100                    resource_parts = permission_feed['resource'].split('/')
101                    id = resource_parts[-1]
102                    self.resource_tokens[id] = permission_feed['_token']
103        else:
104            self.master_key = os.environ.get('COSMOS_KEY')
105
106        self.connection_policy = (connection_policy or
107                                  documents.ConnectionPolicy())
108
109        self.partition_resolvers = {}
110
111        self.partition_key_definition_cache = {}
112
113        self.default_headers = {
114            http_constants.HttpHeaders.CacheControl: 'no-cache',
115            http_constants.HttpHeaders.Version:
116                http_constants.Versions.CurrentVersion,
117            http_constants.HttpHeaders.UserAgent:
118                utils._get_user_agent(),
119            # For single partition query with aggregate functions we would try to accumulate the results on the SDK.
120            # We need to set continuation as not expected.
121            http_constants.HttpHeaders.IsContinuationExpected: False
122        }
123
124        if consistency_level != None:
125            self.default_headers[
126                http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
127
128        # Keeps the latest response headers from server.
129        self.last_response_headers = None
130
131        if consistency_level == documents.ConsistencyLevel.Session:
132            '''create a session - this is maintained only if the default consistency level
133            on the client is set to session, or if the user explicitly sets it as a property
134            via setter'''
135            self.session = session.Session(self.url_connection)
136        else:
137            self.session = None
138
139        self._useMultipleWriteLocations = False
140        self._global_endpoint_manager = global_endpoint_manager._GlobalEndpointManager(self)
141
142        # creating a requests session used for connection pooling and re-used by all requests
143        self._requests_session = requests.Session()
144
145        if self.connection_policy.ConnectionRetryConfiguration is not None:
146            adapter = HTTPAdapter(max_retries=self.connection_policy.ConnectionRetryConfiguration)
147            self._requests_session.mount('http://', adapter)
148            self._requests_session.mount('https://', adapter)
149
150        if self.connection_policy.ProxyConfiguration and self.connection_policy.ProxyConfiguration.Host:
151            host = connection_policy.ProxyConfiguration.Host
152            url = six.moves.urllib.parse.urlparse(host)
153            proxy = host if url.port else host + ":" + str(connection_policy.ProxyConfiguration.Port)
154            proxyDict = {url.scheme : proxy}
155            self._requests_session.proxies.update(proxyDict)
156
157        # Query compatibility mode.
158        # Allows to specify compatibility mode used by client when making query requests. Should be removed when
159        # application/sql is no longer supported.
160        self._query_compatibility_mode = CosmosClient._QueryCompatibilityMode.Default
161
162        # Routing map provider
163        self._routing_map_provider = routing_map_provider._SmartRoutingMapProvider(self)
164
165        database_account = self._global_endpoint_manager._GetDatabaseAccount()
166        self._global_endpoint_manager.force_refresh(database_account)
167
168    @property
169    def Session(self):
170        """ Gets the session object from the client """
171        return self.session
172
173    @Session.setter
174    def Session(self, session):
175        """ Sets a session object on the document client
176            This will override the existing session
177        """
178        self.session = session
179
180    @property
181    def WriteEndpoint(self):
182        """Gets the curent write endpoint for a geo-replicated database account.
183        """
184        return self._global_endpoint_manager.get_write_endpoint()
185
186    @property
187    def ReadEndpoint(self):
188        """Gets the curent read endpoint for a geo-replicated database account.
189        """
190        return self._global_endpoint_manager.get_read_endpoint()
191
192    def RegisterPartitionResolver(self, database_link, partition_resolver):
193        """Registers the partition resolver associated with the database link
194
195        :param str database_link:
196            Database Self Link or ID based link.
197        :param object partition_resolver:
198            An instance of PartitionResolver.
199
200        """
201        if not database_link:
202            raise ValueError("database_link is None or empty.")
203
204        if partition_resolver is None:
205            raise ValueError("partition_resolver is None.")
206
207        self.partition_resolvers = {base.TrimBeginningAndEndingSlashes(database_link): partition_resolver}
208
209
210    def GetPartitionResolver(self, database_link):
211        """Gets the partition resolver associated with the database link
212
213        :param str database_link:
214            Database self link or ID based link.
215
216        :return:
217            An instance of PartitionResolver.
218        :rtype: object
219
220        """
221        if not database_link:
222            raise ValueError("database_link is None or empty.")
223
224        return self.partition_resolvers.get(base.TrimBeginningAndEndingSlashes(database_link))
225
226
227    def CreateDatabase(self, database, options=None):
228        """Creates a database.
229
230        :param dict database:
231            The Azure Cosmos database to create.
232        :param dict options:
233            The request options for the request.
234
235        :return:
236            The Database that was created.
237        :rtype: dict
238
239        """
240        if options is None:
241            options = {}
242
243        CosmosClient.__ValidateResource(database)
244        path = '/dbs'
245        return self.Create(database, path, 'dbs', None, None, options)
246
247    def ReadDatabase(self, database_link, options=None):
248        """Reads a database.
249
250        :param str database_link:
251            The link to the database.
252        :param dict options:
253            The request options for the request.
254
255        :return:
256            The Database that was read.
257        :rtype: dict
258
259        """
260        if options is None:
261            options = {}
262
263        path = base.GetPathFromLink(database_link)
264        database_id = base.GetResourceIdOrFullNameFromLink(database_link)
265        return self.Read(path, 'dbs', database_id, None, options)
266
267    def ReadDatabases(self, options=None):
268        """Reads all databases.
269
270        :param dict options:
271            The request options for the request.
272
273        :return:
274            Query Iterable of Databases.
275        :rtype:
276            query_iterable.QueryIterable
277
278        """
279        if options is None:
280            options = {}
281
282        return self.QueryDatabases(None, options)
283
284    def QueryDatabases(self, query, options=None):
285        """Queries databases.
286
287        :param (str or dict) query:
288        :param dict options:
289            The request options for the request.
290
291        :return: Query Iterable of Databases.
292        :rtype:
293            query_iterable.QueryIterable
294
295        """
296        if options is None:
297            options = {}
298
299        def fetch_fn(options):
300            return self.__QueryFeed('/dbs',
301                                    'dbs',
302                                    '',
303                                    lambda r: r['Databases'],
304                                    lambda _, b: b,
305                                    query,
306                                    options), self.last_response_headers
307        return query_iterable.QueryIterable(self, query, options, fetch_fn)
308
309    def ReadContainers(self, database_link, options=None):
310        """Reads all collections in a database.
311
312        :param str database_link:
313            The link to the database.
314        :param dict options:
315            The request options for the request.
316
317        :return: Query Iterable of Collections.
318        :rtype:
319            query_iterable.QueryIterable
320
321        """
322        if options is None:
323            options = {}
324
325        return self.QueryContainers(database_link, None, options)
326
327    def QueryContainers(self, database_link, query, options=None):
328        """Queries collections in a database.
329
330        :param str database_link:
331            The link to the database.
332        :param (str or dict) query:
333        :param dict options:
334            The request options for the request.
335
336        :return: Query Iterable of Collections.
337        :rtype:
338            query_iterable.QueryIterable
339
340        """
341        if options is None:
342            options = {}
343
344        path = base.GetPathFromLink(database_link, 'colls')
345        database_id = base.GetResourceIdOrFullNameFromLink(database_link)
346        def fetch_fn(options):
347            return self.__QueryFeed(path,
348                                    'colls',
349                                    database_id,
350                                    lambda r: r['DocumentCollections'],
351                                    lambda _, body: body,
352                                    query,
353                                    options), self.last_response_headers
354        return query_iterable.QueryIterable(self, query, options, fetch_fn)
355
356    def CreateContainer(self, database_link, collection, options=None):
357        """Creates a collection in a database.
358
359        :param str database_link:
360            The link to the database.
361        :param dict collection:
362            The Azure Cosmos collection to create.
363        :param dict options:
364            The request options for the request.
365
366        :return: The Collection that was created.
367        :rtype: dict
368
369        """
370        if options is None:
371            options = {}
372
373        CosmosClient.__ValidateResource(collection)
374        path = base.GetPathFromLink(database_link, 'colls')
375        database_id = base.GetResourceIdOrFullNameFromLink(database_link)
376        return self.Create(collection,
377                           path,
378                           'colls',
379                           database_id,
380                           None,
381                           options)
382
383    def ReplaceContainer(self, collection_link, collection, options=None):
384        """Replaces a collection and return it.
385
386        :param str collection_link:
387            The link to the collection entity.
388        :param dict collection:
389            The collection to be used.
390        :param dict options:
391            The request options for the request.
392
393        :return:
394            The new Collection.
395        :rtype:
396            dict
397
398        """
399        if options is None:
400            options = {}
401
402        CosmosClient.__ValidateResource(collection)
403        path = base.GetPathFromLink(collection_link)
404        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
405        return self.Replace(collection,
406                            path,
407                            'colls',
408                            collection_id,
409                            None,
410                            options)
411
412    def ReadContainer(self, collection_link, options=None):
413        """Reads a collection.
414
415        :param str collection_link:
416            The link to the document collection.
417        :param dict options:
418            The request options for the request.
419
420        :return:
421            The read Collection.
422        :rtype:
423            dict
424
425        """
426        if options is None:
427            options = {}
428
429        path = base.GetPathFromLink(collection_link)
430        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
431        return self.Read(path,
432                         'colls',
433                         collection_id,
434                         None,
435                         options)
436
437    def CreateUser(self, database_link, user, options=None):
438        """Creates a user.
439
440        :param str database_link:
441            The link to the database.
442        :param dict user:
443            The Azure Cosmos user to create.
444        :param dict options:
445            The request options for the request.
446
447        :return:
448            The created User.
449        :rtype:
450            dict
451
452        """
453        if options is None:
454            options = {}
455
456        database_id, path = self._GetDatabaseIdWithPathForUser(database_link, user)
457        return self.Create(user,
458                           path,
459                           'users',
460                           database_id,
461                           None,
462                           options)
463
464    def UpsertUser(self, database_link, user, options=None):
465        """Upserts a user.
466
467        :param str database_link:
468            The link to the database.
469        :param dict user:
470            The Azure Cosmos user to upsert.
471        :param dict options:
472            The request options for the request.
473
474        :return:
475            The upserted User.
476        :rtype: dict
477        """
478        if options is None:
479            options = {}
480
481        database_id, path = self._GetDatabaseIdWithPathForUser(database_link, user)
482        return self.Upsert(user,
483                           path,
484                           'users',
485                           database_id,
486                           None,
487                           options)
488
489    def _GetDatabaseIdWithPathForUser(self, database_link, user):
490        CosmosClient.__ValidateResource(user)
491        path = base.GetPathFromLink(database_link, 'users')
492        database_id = base.GetResourceIdOrFullNameFromLink(database_link)
493        return database_id, path
494
495
496    def ReadUser(self, user_link, options=None):
497        """Reads a user.
498
499        :param str user_link:
500            The link to the user entity.
501        :param dict options:
502            The request options for the request.
503
504        :return:
505            The read User.
506        :rtype:
507            dict
508
509        """
510        if options is None:
511            options = {}
512
513        path = base.GetPathFromLink(user_link)
514        user_id = base.GetResourceIdOrFullNameFromLink(user_link)
515        return self.Read(path, 'users', user_id, None, options)
516
517    def ReadUsers(self, database_link, options=None):
518        """Reads all users in a database.
519
520        :params str database_link:
521            The link to the database.
522        :params dict options:
523            The request options for the request.
524        :return:
525            Query iterable of Users.
526        :rtype:
527            query_iterable.QueryIterable
528
529        """
530        if options is None:
531            options = {}
532
533        return self.QueryUsers(database_link, None, options)
534
535    def QueryUsers(self, database_link, query, options=None):
536        """Queries users in a database.
537
538        :param str database_link:
539            The link to the database.
540        :param (str or dict) query:
541        :param dict options:
542            The request options for the request.
543
544        :return:
545            Query Iterable of Users.
546        :rtype:
547            query_iterable.QueryIterable
548
549        """
550        if options is None:
551            options = {}
552
553        path = base.GetPathFromLink(database_link, 'users')
554        database_id = base.GetResourceIdOrFullNameFromLink(database_link)
555        def fetch_fn(options):
556            return self.__QueryFeed(path,
557                                    'users',
558                                    database_id,
559                                    lambda r: r['Users'],
560                                    lambda _, b: b,
561                                    query,
562                                    options), self.last_response_headers
563        return query_iterable.QueryIterable(self, query, options, fetch_fn)
564
565    def DeleteDatabase(self, database_link, options=None):
566        """Deletes a database.
567
568        :param str database_link:
569            The link to the database.
570        :param dict options:
571            The request options for the request.
572
573        :return:
574            The deleted Database.
575        :rtype:
576            dict
577
578        """
579        if options is None:
580            options = {}
581
582        path = base.GetPathFromLink(database_link)
583        database_id = base.GetResourceIdOrFullNameFromLink(database_link)
584        return self.DeleteResource(path,
585                                   'dbs',
586                                   database_id,
587                                   None,
588                                   options)
589
590    def CreatePermission(self, user_link, permission, options=None):
591        """Creates a permission for a user.
592
593        :param str user_link:
594            The link to the user entity.
595        :param dict permission:
596            The Azure Cosmos user permission to create.
597        :param dict options:
598            The request options for the request.
599
600        :return:
601            The created Permission.
602        :rtype:
603            dict
604
605        """
606        if options is None:
607            options = {}
608
609        path, user_id = self._GetUserIdWithPathForPermission(permission, user_link)
610        return self.Create(permission,
611                           path,
612                           'permissions',
613                           user_id,
614                           None,
615                           options)
616
617    def UpsertPermission(self, user_link, permission, options=None):
618        """Upserts a permission for a user.
619
620        :param str user_link:
621            The link to the user entity.
622        :param dict permission:
623            The Azure Cosmos user permission to upsert.
624        :param dict options:
625            The request options for the request.
626
627        :return:
628            The upserted permission.
629        :rtype:
630            dict
631
632        """
633        if options is None:
634            options = {}
635
636        path, user_id = self._GetUserIdWithPathForPermission(permission, user_link)
637        return self.Upsert(permission,
638                            path,
639                            'permissions',
640                            user_id,
641                            None,
642                            options)
643
644    def _GetUserIdWithPathForPermission(self, permission, user_link):
645        CosmosClient.__ValidateResource(permission)
646        path = base.GetPathFromLink(user_link, 'permissions')
647        user_id = base.GetResourceIdOrFullNameFromLink(user_link)
648        return path, user_id
649
650
651    def ReadPermission(self, permission_link, options=None):
652        """Reads a permission.
653
654        :param str permission_link:
655            The link to the permission.
656        :param dict options:
657            The request options for the request.
658
659        :return:
660            The read permission.
661        :rtype:
662            dict
663
664        """
665        if options is None:
666            options = {}
667
668        path = base.GetPathFromLink(permission_link)
669        permission_id = base.GetResourceIdOrFullNameFromLink(permission_link)
670        return self.Read(path,
671                         'permissions',
672                          permission_id,
673                          None,
674                          options)
675
676    def ReadPermissions(self, user_link, options=None):
677        """Reads all permissions for a user.
678
679        :param str user_link:
680            The link to the user entity.
681        :param dict options:
682            The request options for the request.
683
684        :return:
685            Query Iterable of Permissions.
686        :rtype:
687            query_iterable.QueryIterable
688
689        """
690        if options is None:
691            options = {}
692
693        return self.QueryPermissions(user_link, None, options)
694
695    def QueryPermissions(self, user_link, query, options=None):
696        """Queries permissions for a user.
697
698        :param str user_link:
699            The link to the user entity.
700        :param (str or dict) query:
701        :param dict options:
702            The request options for the request.
703
704        :return:
705            Query Iterable of Permissions.
706        :rtype:
707            query_iterable.QueryIterable
708
709        """
710        if options is None:
711            options = {}
712
713        path = base.GetPathFromLink(user_link, 'permissions')
714        user_id = base.GetResourceIdOrFullNameFromLink(user_link)
715        def fetch_fn(options):
716            return self.__QueryFeed(path,
717                                    'permissions',
718                                    user_id,
719                                    lambda r: r['Permissions'],
720                                    lambda _, b: b,
721                                    query,
722                                    options), self.last_response_headers
723        return query_iterable.QueryIterable(self, query, options, fetch_fn)
724
725    def ReplaceUser(self, user_link, user, options=None):
726        """Replaces a user and return it.
727
728        :param str user_link:
729            The link to the user entity.
730        :param dict user:
731        :param dict options:
732            The request options for the request.
733
734        :return:
735            The new User.
736        :rtype:
737            dict
738
739        """
740        if options is None:
741            options = {}
742
743        CosmosClient.__ValidateResource(user)
744        path = base.GetPathFromLink(user_link)
745        user_id = base.GetResourceIdOrFullNameFromLink(user_link)
746        return self.Replace(user,
747                            path,
748                            'users',
749                            user_id,
750                            None,
751                            options)
752
753    def DeleteUser(self, user_link, options=None):
754        """Deletes a user.
755
756        :param str user_link:
757            The link to the user entity.
758        :param dict options:
759            The request options for the request.
760
761        :return:
762            The deleted user.
763        :rtype:
764            dict
765
766        """
767        if options is None:
768            options = {}
769
770        path = base.GetPathFromLink(user_link)
771        user_id = base.GetResourceIdOrFullNameFromLink(user_link)
772        return self.DeleteResource(path,
773                                   'users',
774                                   user_id,
775                                   None,
776                                   options)
777
778    def ReplacePermission(self, permission_link, permission, options=None):
779        """Replaces a permission and return it.
780
781        :param str permission_link:
782            The link to the permission.
783        :param dict permission:
784        :param dict options:
785            The request options for the request.
786
787        :return:
788            The new Permission.
789        :rtype:
790            dict
791
792        """
793        if options is None:
794            options = {}
795
796        CosmosClient.__ValidateResource(permission)
797        path = base.GetPathFromLink(permission_link)
798        permission_id = base.GetResourceIdOrFullNameFromLink(permission_link)
799        return self.Replace(permission,
800                            path,
801                            'permissions',
802                            permission_id,
803                            None,
804                            options)
805
806    def DeletePermission(self, permission_link, options=None):
807        """Deletes a permission.
808
809        :param str permission_link:
810            The link to the permission.
811        :param dict options:
812            The request options for the request.
813
814        :return:
815            The deleted Permission.
816        :rtype:
817            dict
818
819        """
820        if options is None:
821            options = {}
822
823        path = base.GetPathFromLink(permission_link)
824        permission_id = base.GetResourceIdOrFullNameFromLink(permission_link)
825        return self.DeleteResource(path,
826                                   'permissions',
827                                   permission_id,
828                                   None,
829                                   options)
830
831    def ReadItems(self, collection_link, feed_options=None):
832        """Reads all documents in a collection.
833
834        :param str collection_link:
835            The link to the document collection.
836        :param dict feed_options:
837
838        :return:
839            Query Iterable of Documents.
840        :rtype:
841            query_iterable.QueryIterable
842
843        """
844        if feed_options is None:
845            feed_options = {}
846
847        return self.QueryItems(collection_link, None, feed_options)
848
849    def QueryItems(self, database_or_Container_link, query, options=None, partition_key=None):
850        """Queries documents in a collection.
851
852        :param str database_or_Container_link:
853            The link to the database when using partitioning, otherwise link to the document collection.
854        :param (str or dict) query:
855        :param dict options:
856            The request options for the request.
857        :param str partition_key:
858            Partition key for the query(default value None)
859
860        :return:
861            Query Iterable of Documents.
862        :rtype:
863            query_iterable.QueryIterable
864
865        """
866        database_or_Container_link = base.TrimBeginningAndEndingSlashes(database_or_Container_link)
867
868        if options is None:
869            options = {}
870
871        if(base.IsDatabaseLink(database_or_Container_link)):
872            # Python doesn't have a good way of specifying an overloaded constructor, and this is how it's generally overloaded constructors are specified(by calling a @classmethod) and returning the 'self' instance
873            return query_iterable.QueryIterable.PartitioningQueryIterable(self, query, options, database_or_Container_link, partition_key)
874        else:
875            path = base.GetPathFromLink(database_or_Container_link, 'docs')
876            collection_id = base.GetResourceIdOrFullNameFromLink(database_or_Container_link)
877            def fetch_fn(options):
878                return self.__QueryFeed(path,
879                                        'docs',
880                                        collection_id,
881                                        lambda r: r['Documents'],
882                                        lambda _, b: b,
883                                        query,
884                                        options), self.last_response_headers
885            return query_iterable.QueryIterable(self, query, options, fetch_fn, database_or_Container_link)
886
887    def QueryItemsChangeFeed(self, collection_link, options=None):
888        """Queries documents change feed in a collection.
889
890        :param str collection_link:
891            The link to the document collection.
892        :param dict options:
893            The request options for the request.
894            options may also specify partition key range id.
895
896        :return:
897            Query Iterable of Documents.
898        :rtype:
899            query_iterable.QueryIterable
900
901        """
902
903        partition_key_range_id = None
904        if options is not None and 'partitionKeyRangeId' in options:
905            partition_key_range_id = options['partitionKeyRangeId']
906
907        return self._QueryChangeFeed(collection_link, "Documents" , options, partition_key_range_id)
908
909    def _QueryChangeFeed(self, collection_link, resource_type, options=None, partition_key_range_id=None):
910        """Queries change feed of a resource in a collection.
911
912        :param str collection_link:
913            The link to the document collection.
914        :param str resource_type:
915            The type of the resource.
916        :param dict options:
917            The request options for the request.
918        :param str partition_key_range_id:
919            Specifies partition key range id.
920
921        :return:
922            Query Iterable of Documents.
923        :rtype:
924            query_iterable.QueryIterable
925
926        """
927        if options is None:
928            options = {}
929        options['changeFeed'] = True
930
931        resource_key_map = {'Documents' : 'docs'}
932
933        # For now, change feed only supports Documents and Partition Key Range resouce type
934        if resource_type not in resource_key_map:
935            raise NotImplementedError(resource_type + " change feed query is not supported.")
936
937        resource_key = resource_key_map[resource_type]
938        path = base.GetPathFromLink(collection_link, resource_key)
939        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
940        def fetch_fn(options):
941            return self.__QueryFeed(path,
942                                    resource_key,
943                                    collection_id,
944                                    lambda r: r[resource_type],
945                                    lambda _, b: b,
946                                    None,
947                                    options,
948                                    partition_key_range_id), self.last_response_headers
949        return query_iterable.QueryIterable(self, None, options, fetch_fn, collection_link)
950
951    def _ReadPartitionKeyRanges(self, collection_link, feed_options=None):
952        """Reads Partition Key Ranges.
953
954        :param str collection_link:
955            The link to the document collection.
956        :param dict feed_options:
957
958        :return:
959            Query Iterable of PartitionKeyRanges.
960        :rtype:
961            query_iterable.QueryIterable
962
963        """
964        if feed_options is None:
965            feed_options = {}
966
967        return self._QueryPartitionKeyRanges(collection_link, None, feed_options)
968
969    def _QueryPartitionKeyRanges(self, collection_link, query, options=None):
970        """Queries Partition Key Ranges in a collection.
971
972        :param str collection_link:
973            The link to the document collection.
974        :param (str or dict) query:
975        :param dict options:
976            The request options for the request.
977
978        :return:
979            Query Iterable of PartitionKeyRanges.
980        :rtype:
981            query_iterable.QueryIterable
982
983        """
984        if options is None:
985            options = {}
986
987        path = base.GetPathFromLink(collection_link, 'pkranges')
988        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
989        def fetch_fn(options):
990            return self.__QueryFeed(path,
991                                    'pkranges',
992                                    collection_id,
993                                    lambda r: r['PartitionKeyRanges'],
994                                    lambda _, b: b,
995                                    query,
996                                    options), self.last_response_headers
997        return query_iterable.QueryIterable(self, query, options, fetch_fn)
998
999    def CreateItem(self, database_or_Container_link, document, options=None):
1000        """Creates a document in a collection.
1001
1002        :param str database_or_Container_link:
1003            The link to the database when using partitioning, otherwise link to the document collection.
1004        :param dict document:
1005            The Azure Cosmos document to create.
1006        :param dict options:
1007            The request options for the request.
1008        :param bool options['disableAutomaticIdGeneration']:
1009            Disables the automatic id generation. If id is missing in the body and this
1010            option is true, an error will be returned.
1011
1012        :return:
1013            The created Document.
1014        :rtype:
1015            dict
1016
1017        """
1018        # Python's default arguments are evaluated once when the function is defined, not each time the function is called (like it is in say, Ruby).
1019        # This means that if you use a mutable default argument and mutate it, you will and have mutated that object for all future calls to the function as well.
1020        # So, using a non-mutable deafult in this case(None) and assigning an empty dict(mutable) inside the method
1021        # For more details on this gotcha, please refer http://docs.python-guide.org/en/latest/writing/gotchas/
1022        if options is None:
1023            options = {}
1024
1025        # We check the link to be document collection link since it can be database link in case of client side partitioning
1026        if(base.IsItemContainerLink(database_or_Container_link)):
1027            options = self._AddPartitionKey(database_or_Container_link, document, options)
1028
1029        collection_id, document, path = self._GetContainerIdWithPathForItem(database_or_Container_link, document, options)
1030        return self.Create(document,
1031                           path,
1032                           'docs',
1033                           collection_id,
1034                           None,
1035                           options)
1036
1037    def UpsertItem(self, database_or_Container_link, document, options=None):
1038        """Upserts a document in a collection.
1039
1040        :param str database_or_Container_link:
1041            The link to the database when using partitioning, otherwise link to the document collection.
1042        :param dict document:
1043            The Azure Cosmos document to upsert.
1044        :param dict options:
1045            The request options for the request.
1046        :param bool options['disableAutomaticIdGeneration']:
1047            Disables the automatic id generation. If id is missing in the body and this
1048            option is true, an error will be returned.
1049
1050        :return:
1051            The upserted Document.
1052        :rtype:
1053            dict
1054
1055        """
1056        # Python's default arguments are evaluated once when the function is defined, not each time the function is called (like it is in say, Ruby).
1057        # This means that if you use a mutable default argument and mutate it, you will and have mutated that object for all future calls to the function as well.
1058        # So, using a non-mutable deafult in this case(None) and assigning an empty dict(mutable) inside the method
1059        # For more details on this gotcha, please refer http://docs.python-guide.org/en/latest/writing/gotchas/
1060        if options is None:
1061            options = {}
1062
1063        # We check the link to be document collection link since it can be database link in case of client side partitioning
1064        if(base.IsItemContainerLink(database_or_Container_link)):
1065            options = self._AddPartitionKey(database_or_Container_link, document, options)
1066
1067        collection_id, document, path = self._GetContainerIdWithPathForItem(database_or_Container_link, document, options)
1068        return self.Upsert(document,
1069                           path,
1070                           'docs',
1071                           collection_id,
1072                           None,
1073                           options)
1074
1075    PartitionResolverErrorMessage = "Couldn't find any partition resolvers for the database link provided. Ensure that the link you used when registering the partition resolvers matches the link provided or you need to register both types of database link(self link as well as ID based link)."
1076
1077    # Gets the collection id and path for the document
1078    def _GetContainerIdWithPathForItem(self, database_or_Container_link, document, options):
1079
1080        if not database_or_Container_link:
1081            raise ValueError("database_or_Container_link is None or empty.")
1082
1083        if document is None:
1084            raise ValueError("document is None.")
1085
1086        CosmosClient.__ValidateResource(document)
1087        document = document.copy()
1088        if (not document.get('id') and
1089            not options.get('disableAutomaticIdGeneration')):
1090            document['id'] = base.GenerateGuidId()
1091
1092        collection_link = database_or_Container_link
1093
1094        if(base.IsDatabaseLink(database_or_Container_link)):
1095            partition_resolver = self.GetPartitionResolver(database_or_Container_link)
1096
1097            if(partition_resolver != None):
1098                collection_link = partition_resolver.ResolveForCreate(document)
1099            else:
1100                raise ValueError(CosmosClient.PartitionResolverErrorMessage)
1101
1102        path = base.GetPathFromLink(collection_link, 'docs')
1103        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1104        return collection_id, document, path
1105
1106    def ReadItem(self, document_link, options=None):
1107        """Reads a document.
1108
1109        :param str document_link:
1110            The link to the document.
1111        :param dict options:
1112            The request options for the request.
1113
1114        :return:
1115            The read Document.
1116        :rtype:
1117            dict
1118
1119        """
1120        if options is None:
1121            options = {}
1122
1123        path = base.GetPathFromLink(document_link)
1124        document_id = base.GetResourceIdOrFullNameFromLink(document_link)
1125        return self.Read(path,
1126                         'docs',
1127                         document_id,
1128                         None,
1129                         options)
1130
1131    def ReadTriggers(self, collection_link, options=None):
1132        """Reads all triggers in a collection.
1133
1134        :param str collection_link:
1135            The link to the document collection.
1136        :param dict options:
1137            The request options for the request.
1138
1139        :return:
1140            Query Iterable of Triggers.
1141        :rtype:
1142            query_iterable.QueryIterable
1143
1144        """
1145        if options is None:
1146            options = {}
1147
1148        return self.QueryTriggers(collection_link, None, options)
1149
1150    def QueryTriggers(self, collection_link, query, options=None):
1151        """Queries triggers in a collection.
1152
1153        :param str collection_link:
1154            The link to the document collection.
1155        :param (str or dict) query:
1156        :param dict options:
1157            The request options for the request.
1158
1159        :return:
1160            Query Iterable of Triggers.
1161        :rtype:
1162            query_iterable.QueryIterable
1163
1164        """
1165        if options is None:
1166            options = {}
1167
1168        path = base.GetPathFromLink(collection_link, 'triggers')
1169        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1170        def fetch_fn(options):
1171            return self.__QueryFeed(path,
1172                                    'triggers',
1173                                    collection_id,
1174                                    lambda r: r['Triggers'],
1175                                    lambda _, b: b,
1176                                    query,
1177                                    options), self.last_response_headers
1178        return query_iterable.QueryIterable(self, query, options, fetch_fn)
1179
1180    def CreateTrigger(self, collection_link, trigger, options=None):
1181        """Creates a trigger in a collection.
1182
1183        :param str collection_link:
1184            The link to the document collection.
1185        :param dict trigger:
1186        :param dict options:
1187            The request options for the request.
1188
1189        :return:
1190            The created Trigger.
1191        :rtype:
1192            dict
1193
1194        """
1195        if options is None:
1196            options = {}
1197
1198        collection_id, path, trigger = self._GetContainerIdWithPathForTrigger(collection_link, trigger)
1199        return self.Create(trigger,
1200                           path,
1201                           'triggers',
1202                           collection_id,
1203                           None,
1204                           options)
1205
1206    def UpsertTrigger(self, collection_link, trigger, options=None):
1207        """Upserts a trigger in a collection.
1208
1209        :param str collection_link:
1210            The link to the document collection.
1211        :param dict trigger:
1212        :param dict options:
1213            The request options for the request.
1214
1215        :return:
1216            The upserted Trigger.
1217        :rtype:
1218            dict
1219
1220        """
1221        if options is None:
1222            options = {}
1223
1224        collection_id, path, trigger = self._GetContainerIdWithPathForTrigger(collection_link, trigger)
1225        return self.Upsert(trigger,
1226                           path,
1227                           'triggers',
1228                           collection_id,
1229                           None,
1230                           options)
1231
1232    def _GetContainerIdWithPathForTrigger(self, collection_link, trigger):
1233        CosmosClient.__ValidateResource(trigger)
1234        trigger = trigger.copy()
1235        if  trigger.get('serverScript'):
1236            trigger['body'] = str(trigger.pop('serverScript', ''))
1237        elif trigger.get('body'):
1238            trigger['body'] = str(trigger['body'])
1239
1240        path = base.GetPathFromLink(collection_link, 'triggers')
1241        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1242        return collection_id, path, trigger
1243
1244
1245    def ReadTrigger(self, trigger_link, options=None):
1246        """Reads a trigger.
1247
1248        :param str trigger_link:
1249            The link to the trigger.
1250        :param dict options:
1251            The request options for the request.
1252
1253        :return:
1254            The read Trigger.
1255        :rtype:
1256            dict
1257
1258        """
1259        if options is None:
1260            options = {}
1261
1262        path = base.GetPathFromLink(trigger_link)
1263        trigger_id = base.GetResourceIdOrFullNameFromLink(trigger_link)
1264        return self.Read(path, 'triggers', trigger_id, None, options)
1265
1266    def ReadUserDefinedFunctions(self, collection_link, options=None):
1267        """Reads all user defined functions in a collection.
1268
1269        :param str collection_link:
1270            The link to the document collection.
1271        :param dict options:
1272            The request options for the request.
1273
1274        :return:
1275            Query Iterable of UDFs.
1276        :rtype:
1277            query_iterable.QueryIterable
1278
1279        """
1280        if options is None:
1281            options = {}
1282
1283        return self.QueryUserDefinedFunctions(collection_link, None, options)
1284
1285    def QueryUserDefinedFunctions(self, collection_link, query, options=None):
1286        """Queries user defined functions in a collection.
1287
1288        :param str collection_link:
1289            The link to the collection.
1290        :param (str or dict) query:
1291        :param dict options:
1292            The request options for the request.
1293
1294        :return:
1295            Query Iterable of UDFs.
1296        :rtype:
1297            query_iterable.QueryIterable
1298
1299        """
1300        if options is None:
1301            options = {}
1302
1303        path = base.GetPathFromLink(collection_link, 'udfs')
1304        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1305        def fetch_fn(options):
1306            return self.__QueryFeed(path,
1307                                    'udfs',
1308                                    collection_id,
1309                                    lambda r: r['UserDefinedFunctions'],
1310                                    lambda _, b: b,
1311                                    query,
1312                                    options), self.last_response_headers
1313        return query_iterable.QueryIterable(self, query, options, fetch_fn)
1314
1315    def CreateUserDefinedFunction(self, collection_link, udf, options=None):
1316        """Creates a user defined function in a collection.
1317
1318        :param str collection_link:
1319            The link to the collection.
1320        :param str udf:
1321        :param dict options:
1322            The request options for the request.
1323
1324        :return:
1325            The created UDF.
1326        :rtype:
1327            dict
1328
1329        """
1330        if options is None:
1331            options = {}
1332
1333        collection_id, path, udf = self._GetContainerIdWithPathForUDF(collection_link, udf)
1334        return self.Create(udf,
1335                           path,
1336                           'udfs',
1337                           collection_id,
1338                           None,
1339                           options)
1340
1341    def UpsertUserDefinedFunction(self, collection_link, udf, options=None):
1342        """Upserts a user defined function in a collection.
1343
1344        :param str collection_link:
1345            The link to the collection.
1346        :param str udf:
1347        :param dict options:
1348            The request options for the request.
1349
1350        :return:
1351            The upserted UDF.
1352        :rtype:
1353            dict
1354
1355        """
1356        if options is None:
1357            options = {}
1358
1359        collection_id, path, udf = self._GetContainerIdWithPathForUDF(collection_link, udf)
1360        return self.Upsert(udf,
1361                           path,
1362                           'udfs',
1363                           collection_id,
1364                           None,
1365                           options)
1366
1367    def _GetContainerIdWithPathForUDF(self, collection_link, udf):
1368        CosmosClient.__ValidateResource(udf)
1369        udf = udf.copy()
1370        if udf.get('serverScript'):
1371            udf['body'] = str(udf.pop('serverScript', ''))
1372        elif udf.get('body'):
1373            udf['body'] = str(udf['body'])
1374
1375        path = base.GetPathFromLink(collection_link, 'udfs')
1376        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1377        return collection_id, path, udf
1378
1379
1380    def ReadUserDefinedFunction(self, udf_link, options=None):
1381        """Reads a user defined function.
1382
1383        :param str udf_link:
1384            The link to the user defined function.
1385        :param dict options:
1386            The request options for the request.
1387
1388        :return:
1389            The read UDF.
1390        :rtype:
1391            dict
1392
1393        """
1394        if options is None:
1395            options = {}
1396
1397        path = base.GetPathFromLink(udf_link)
1398        udf_id = base.GetResourceIdOrFullNameFromLink(udf_link)
1399        return self.Read(path, 'udfs', udf_id, None, options)
1400
1401    def ReadStoredProcedures(self, collection_link, options=None):
1402        """Reads all store procedures in a collection.
1403
1404        :param str collection_link:
1405            The link to the document collection.
1406        :param dict options:
1407            The request options for the request.
1408
1409        :return:
1410            Query Iterable of Stored Procedures.
1411        :rtype:
1412            query_iterable.QueryIterable
1413
1414        """
1415        if options is None:
1416            options = {}
1417
1418        return self.QueryStoredProcedures(collection_link, None, options)
1419
1420    def QueryStoredProcedures(self, collection_link, query, options=None):
1421        """Queries stored procedures in a collection.
1422
1423        :param str collection_link:
1424            The link to the document collection.
1425        :param (str or dict) query:
1426        :param dict options:
1427            The request options for the request.
1428
1429        :return:
1430            Query Iterable of Stored Procedures.
1431        :rtype:
1432            query_iterable.QueryIterable
1433
1434        """
1435        if options is None:
1436            options = {}
1437
1438        path = base.GetPathFromLink(collection_link, 'sprocs')
1439        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1440        def fetch_fn(options):
1441            return self.__QueryFeed(path,
1442                                    'sprocs',
1443                                    collection_id,
1444                                    lambda r: r['StoredProcedures'],
1445                                    lambda _, b: b,
1446                                    query,
1447                                    options), self.last_response_headers
1448        return query_iterable.QueryIterable(self, query, options, fetch_fn)
1449
1450    def CreateStoredProcedure(self, collection_link, sproc, options=None):
1451        """Creates a stored procedure in a collection.
1452
1453        :param str collection_link:
1454            The link to the document collection.
1455        :param str sproc:
1456        :param dict options:
1457            The request options for the request.
1458
1459        :return:
1460            The created Stored Procedure.
1461        :rtype:
1462            dict
1463
1464        """
1465        if options is None:
1466            options = {}
1467
1468        collection_id, path, sproc = self._GetContainerIdWithPathForSproc(collection_link, sproc)
1469        return self.Create(sproc,
1470                           path,
1471                           'sprocs',
1472                           collection_id,
1473                           None,
1474                           options)
1475
1476    def UpsertStoredProcedure(self, collection_link, sproc, options=None):
1477        """Upserts a stored procedure in a collection.
1478
1479        :param str collection_link:
1480            The link to the document collection.
1481        :param str sproc:
1482        :param dict options:
1483            The request options for the request.
1484
1485        :return:
1486            The upserted Stored Procedure.
1487        :rtype:
1488            dict
1489
1490        """
1491        if options is None:
1492            options = {}
1493
1494        collection_id, path, sproc = self._GetContainerIdWithPathForSproc(collection_link, sproc)
1495        return self.Upsert(sproc,
1496                           path,
1497                           'sprocs',
1498                           collection_id,
1499                           None,
1500                           options)
1501
1502    def _GetContainerIdWithPathForSproc(self, collection_link, sproc):
1503        CosmosClient.__ValidateResource(sproc)
1504        sproc = sproc.copy()
1505        if sproc.get('serverScript'):
1506            sproc['body'] = str(sproc.pop('serverScript', ''))
1507        elif sproc.get('body'):
1508            sproc['body'] = str(sproc['body'])
1509        path = base.GetPathFromLink(collection_link, 'sprocs')
1510        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1511        return collection_id, path, sproc
1512
1513
1514    def ReadStoredProcedure(self, sproc_link, options=None):
1515        """Reads a stored procedure.
1516
1517        :param str sproc_link:
1518            The link to the stored procedure.
1519        :param dict options:
1520            The request options for the request.
1521
1522        :return:
1523            The read Stored Procedure.
1524        :rtype:
1525            dict
1526
1527        """
1528        if options is None:
1529            options = {}
1530
1531        path = base.GetPathFromLink(sproc_link)
1532        sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link)
1533        return self.Read(path, 'sprocs', sproc_id, None, options)
1534
1535    def ReadConflicts(self, collection_link, feed_options=None):
1536        """Reads conflicts.
1537
1538        :param str collection_link:
1539            The link to the document collection.
1540        :param dict feed_options:
1541
1542        :return:
1543            Query Iterable of Conflicts.
1544        :rtype:
1545            query_iterable.QueryIterable
1546
1547        """
1548        if feed_options is None:
1549            feed_options = {}
1550
1551        return self.QueryConflicts(collection_link, None, feed_options)
1552
1553    def QueryConflicts(self, collection_link, query, options=None):
1554        """Queries conflicts in a collection.
1555
1556        :param str collection_link:
1557            The link to the document collection.
1558        :param (str or dict) query:
1559        :param dict options:
1560            The request options for the request.
1561
1562        :return:
1563            Query Iterable of Conflicts.
1564        :rtype:
1565            query_iterable.QueryIterable
1566
1567        """
1568        if options is None:
1569            options = {}
1570
1571        path = base.GetPathFromLink(collection_link, 'conflicts')
1572        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1573        def fetch_fn(options):
1574            return self.__QueryFeed(path,
1575                                    'conflicts',
1576                                    collection_id,
1577                                    lambda r: r['Conflicts'],
1578                                    lambda _, b: b,
1579                                    query,
1580                                    options), self.last_response_headers
1581        return query_iterable.QueryIterable(self, query, options, fetch_fn)
1582
1583    def ReadConflict(self, conflict_link, options=None):
1584        """Reads a conflict.
1585
1586        :param str conflict_link:
1587            The link to the conflict.
1588        :param dict options:
1589
1590        :return:
1591            The read Conflict.
1592        :rtype:
1593            dict
1594
1595        """
1596        if options is None:
1597            options = {}
1598
1599        path = base.GetPathFromLink(conflict_link)
1600        conflict_id = base.GetResourceIdOrFullNameFromLink(conflict_link)
1601        return self.Read(path,
1602                         'conflicts',
1603                         conflict_id,
1604                         None,
1605                         options)
1606
1607    def DeleteContainer(self, collection_link, options=None):
1608        """Deletes a collection.
1609
1610        :param str collection_link:
1611            The link to the document collection.
1612        :param dict options:
1613            The request options for the request.
1614
1615        :return:
1616            The deleted Collection.
1617        :rtype:
1618            dict
1619
1620        """
1621        if options is None:
1622            options = {}
1623
1624        path = base.GetPathFromLink(collection_link)
1625        collection_id = base.GetResourceIdOrFullNameFromLink(collection_link)
1626        return self.DeleteResource(path,
1627                                   'colls',
1628                                   collection_id,
1629                                   None,
1630                                   options)
1631
1632    def ReplaceItem(self, document_link, new_document, options=None):
1633        """Replaces a document and returns it.
1634
1635        :param str document_link:
1636            The link to the document.
1637        :param dict new_document:
1638        :param dict options:
1639            The request options for the request.
1640
1641        :return:
1642            The new Document.
1643        :rtype:
1644            dict
1645
1646        """
1647        CosmosClient.__ValidateResource(new_document)
1648        path = base.GetPathFromLink(document_link)
1649        document_id = base.GetResourceIdOrFullNameFromLink(document_link)
1650
1651        # Python's default arguments are evaluated once when the function is defined, not each time the function is called (like it is in say, Ruby).
1652        # This means that if you use a mutable default argument and mutate it, you will and have mutated that object for all future calls to the function as well.
1653        # So, using a non-mutable deafult in this case(None) and assigning an empty dict(mutable) inside the function so that it remains local
1654        # For more details on this gotcha, please refer http://docs.python-guide.org/en/latest/writing/gotchas/
1655        if options is None:
1656            options = {}
1657
1658        # Extract the document collection link and add the partition key to options
1659        collection_link = base.GetItemContainerLink(document_link)
1660        options = self._AddPartitionKey(collection_link, new_document, options)
1661
1662        return self.Replace(new_document,
1663                            path,
1664                            'docs',
1665                            document_id,
1666                            None,
1667                            options)
1668
1669    def DeleteItem(self, document_link, options=None):
1670        """Deletes a document.
1671
1672        :param str document_link:
1673            The link to the document.
1674        :param dict options:
1675            The request options for the request.
1676
1677        :return:
1678            The deleted Document.
1679        :rtype:
1680            dict
1681
1682        """
1683        if options is None:
1684            options = {}
1685
1686        path = base.GetPathFromLink(document_link)
1687        document_id = base.GetResourceIdOrFullNameFromLink(document_link)
1688        return self.DeleteResource(path,
1689                                   'docs',
1690                                   document_id,
1691                                   None,
1692                                   options)
1693
1694    def CreateAttachment(self, document_link, attachment, options=None):
1695        """Creates an attachment in a document.
1696
1697        :param str document_link:
1698            The link to the document.
1699        :param dict attachment:
1700            The Azure Cosmos attachment to create.
1701        :param dict options:
1702            The request options for the request.
1703
1704        :return:
1705            The created Attachment.
1706        :rtype:
1707            dict
1708
1709        """
1710        if options is None:
1711            options = {}
1712
1713        document_id, path = self._GetItemIdWithPathForAttachment(attachment, document_link)
1714        return self.Create(attachment,
1715                           path,
1716                           'attachments',
1717                           document_id,
1718                           None,
1719                           options)
1720
1721    def UpsertAttachment(self, document_link, attachment, options=None):
1722        """Upserts an attachment in a document.
1723
1724        :param str document_link:
1725            The link to the document.
1726        :param dict attachment:
1727            The Azure Cosmos attachment to upsert.
1728        :param dict options:
1729            The request options for the request.
1730
1731        :return:
1732            The upserted Attachment.
1733        :rtype:
1734            dict
1735
1736        """
1737        if options is None:
1738            options = {}
1739
1740        document_id, path = self._GetItemIdWithPathForAttachment(attachment, document_link)
1741        return self.Upsert(attachment,
1742                           path,
1743                           'attachments',
1744                           document_id,
1745                           None,
1746                           options)
1747
1748    def _GetItemIdWithPathForAttachment(self, attachment, document_link):
1749        CosmosClient.__ValidateResource(attachment)
1750        path = base.GetPathFromLink(document_link, 'attachments')
1751        document_id = base.GetResourceIdOrFullNameFromLink(document_link)
1752        return document_id, path
1753
1754    def CreateAttachmentAndUploadMedia(self,
1755                                       document_link,
1756                                       readable_stream,
1757                                       options=None):
1758        """Creates an attachment and upload media.
1759
1760        :param str document_link:
1761            The link to the document.
1762        :param (file-like stream object) readable_stream:
1763        :param dict options:
1764            The request options for the request.
1765
1766        :return:
1767            The created Attachment.
1768        :rtype:
1769            dict
1770
1771        """
1772        if options is None:
1773            options = {}
1774
1775        document_id, initial_headers, path = self._GetItemIdWithPathForAttachmentMedia(document_link, options)
1776        return self.Create(readable_stream,
1777                           path,
1778                           'attachments',
1779                           document_id,
1780                           initial_headers,
1781                           options)
1782
1783    def UpsertAttachmentAndUploadMedia(self,
1784                                       document_link,
1785                                       readable_stream,
1786                                       options=None):
1787        """Upserts an attachment and upload media.
1788
1789        :param str document_link:
1790            The link to the document.
1791        :param (file-like stream object) readable_stream:
1792        :param dict options:
1793            The request options for the request.
1794
1795        :return:
1796            The upserted Attachment.
1797        :rtype:
1798            dict
1799
1800        """
1801        if options is None:
1802            options = {}
1803
1804        document_id, initial_headers, path = self._GetItemIdWithPathForAttachmentMedia(document_link, options)
1805        return self.Upsert(readable_stream,
1806                           path,
1807                           'attachments',
1808                           document_id,
1809                           initial_headers,
1810                           options)
1811
1812    def _GetItemIdWithPathForAttachmentMedia(self, document_link, options):
1813        initial_headers = dict(self.default_headers)
1814
1815        # Add required headers slug and content-type.
1816        if options.get('slug'):
1817            initial_headers[http_constants.HttpHeaders.Slug] = options['slug']
1818
1819        if options.get('contentType'):
1820            initial_headers[http_constants.HttpHeaders.ContentType] = (
1821                options['contentType'])
1822        else:
1823            initial_headers[http_constants.HttpHeaders.ContentType] = (
1824                runtime_constants.MediaTypes.OctetStream)
1825
1826        path = base.GetPathFromLink(document_link, 'attachments')
1827        document_id = base.GetResourceIdOrFullNameFromLink(document_link)
1828        return document_id, initial_headers, path
1829
1830
1831    def ReadAttachment(self, attachment_link, options=None):
1832        """Reads an attachment.
1833
1834        :param str attachment_link:
1835            The link to the attachment.
1836        :param dict options:
1837            The request options for the request.
1838
1839        :return:
1840            The read Attachment.
1841        :rtype:
1842            dict
1843
1844        """
1845        if options is None:
1846            options = {}
1847
1848        path = base.GetPathFromLink(attachment_link)
1849        attachment_id = base.GetResourceIdOrFullNameFromLink(attachment_link)
1850        return self.Read(path,
1851                         'attachments',
1852                         attachment_id,
1853                         None,
1854                         options)
1855
1856    def ReadAttachments(self, document_link, options=None):
1857        """Reads all attachments in a document.
1858
1859        :param str document_link:
1860            The link to the document.
1861        :param dict options:
1862            The request options for the request.
1863
1864        :return:
1865            Query Iterable of Attachments.
1866        :rtype:
1867            query_iterable.QueryIterable
1868
1869        """
1870        if options is None:
1871            options = {}
1872
1873        return self.QueryAttachments(document_link, None, options)
1874
1875    def QueryAttachments(self, document_link, query, options=None):
1876        """Queries attachments in a document.
1877
1878        :param str document_link:
1879            The link to the document.
1880        :param (str or dict) query:
1881        :param dict options:
1882            The request options for the request.
1883
1884        :return:
1885            Query Iterable of Attachments.
1886        :rtype:
1887            query_iterable.QueryIterable
1888
1889        """
1890        if options is None:
1891            options = {}
1892
1893        path = base.GetPathFromLink(document_link, 'attachments')
1894        document_id = base.GetResourceIdOrFullNameFromLink(document_link)
1895
1896        def fetch_fn(options):
1897            return self.__QueryFeed(path,
1898                                    'attachments',
1899                                    document_id,
1900                                    lambda r: r['Attachments'],
1901                                    lambda _, b: b,
1902                                    query,
1903                                    options), self.last_response_headers
1904        return query_iterable.QueryIterable(self, query, options, fetch_fn)
1905
1906
1907    def ReadMedia(self, media_link):
1908        """Reads a media.
1909
1910        When self.connection_policy.MediaReadMode ==
1911        documents.MediaReadMode.Streamed, returns a file-like stream object;
1912        otherwise, returns a str.
1913
1914        :param str media_link:
1915            The link to the media.
1916
1917        :return:
1918            The read Media.
1919        :rtype:
1920            str or file-like stream object
1921
1922        """
1923        default_headers = self.default_headers
1924
1925        path = base.GetPathFromLink(media_link)
1926        media_id = base.GetResourceIdOrFullNameFromLink(media_link)
1927        attachment_id = base.GetAttachmentIdFromMediaId(media_id)
1928        headers = base.GetHeaders(self,
1929                                  default_headers,
1930                                  'get',
1931                                  path,
1932                                  attachment_id,
1933                                  'media',
1934                                  {})
1935
1936        # ReadMedia will always use WriteEndpoint since it's not replicated in readable Geo regions
1937        request = request_object._RequestObject('media', documents._OperationType.Read)
1938        result, self.last_response_headers = self.__Get(path,
1939                                                        request,
1940                                                        headers)
1941        return result
1942
1943    def UpdateMedia(self, media_link, readable_stream, options=None):
1944        """Updates a media and returns it.
1945
1946        :param str media_link:
1947            The link to the media.
1948        :param (file-like stream object) readable_stream:
1949        :param dict options:
1950            The request options for the request.
1951
1952        :return:
1953            The updated Media.
1954        :rtype:
1955            str or file-like stream object
1956
1957        """
1958        if options is None:
1959            options = {}
1960
1961        initial_headers = dict(self.default_headers)
1962
1963        # Add required headers slug and content-type in case the body is a stream
1964        if options.get('slug'):
1965            initial_headers[http_constants.HttpHeaders.Slug] = options['slug']
1966
1967        if options.get('contentType'):
1968            initial_headers[http_constants.HttpHeaders.ContentType] = (
1969                options['contentType'])
1970        else:
1971            initial_headers[http_constants.HttpHeaders.ContentType] = (
1972                runtime_constants.MediaTypes.OctetStream)
1973
1974        path = base.GetPathFromLink(media_link)
1975        media_id = base.GetResourceIdOrFullNameFromLink(media_link)
1976        attachment_id = base.GetAttachmentIdFromMediaId(media_id)
1977        headers = base.GetHeaders(self,
1978                                  initial_headers,
1979                                  'put',
1980                                  path,
1981                                  attachment_id,
1982                                  'media',
1983                                  options)
1984
1985        # UpdateMedia will use WriteEndpoint since it uses PUT operation
1986        request = request_object._RequestObject('media', documents._OperationType.Update)
1987        result, self.last_response_headers = self.__Put(path,
1988                                                        request,
1989                                                        readable_stream,
1990                                                        headers)
1991
1992        self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
1993        return result
1994
1995    def ReplaceAttachment(self, attachment_link, attachment, options=None):
1996        """Replaces an attachment and returns it.
1997
1998        :param str attachment_link:
1999            The link to the attachment.
2000        :param dict attachment:
2001        :param dict options:
2002            The request options for the request.
2003
2004        :return:
2005            The replaced Attachment
2006        :rtype:
2007            dict
2008
2009        """
2010        if options is None:
2011            options = {}
2012
2013        CosmosClient.__ValidateResource(attachment)
2014        path = base.GetPathFromLink(attachment_link)
2015        attachment_id = base.GetResourceIdOrFullNameFromLink(attachment_link)
2016        return self.Replace(attachment,
2017                            path,
2018                            'attachments',
2019                            attachment_id,
2020                            None,
2021                            options)
2022
2023    def DeleteAttachment(self, attachment_link, options=None):
2024        """Deletes an attachment.
2025
2026        :param str attachment_link:
2027            The link to the attachment.
2028        :param dict options:
2029            The request options for the request.
2030
2031        :return:
2032            The deleted Attachment.
2033        :rtype:
2034            dict
2035
2036        """
2037        if options is None:
2038            options = {}
2039
2040        path = base.GetPathFromLink(attachment_link)
2041        attachment_id = base.GetResourceIdOrFullNameFromLink(attachment_link)
2042        return self.DeleteResource(path,
2043                                   'attachments',
2044                                   attachment_id,
2045                                   None,
2046                                   options)
2047
2048    def ReplaceTrigger(self, trigger_link, trigger, options=None):
2049        """Replaces a trigger and returns it.
2050
2051        :param str trigger_link:
2052            The link to the trigger.
2053        :param dict trigger:
2054        :param dict options:
2055            The request options for the request.
2056
2057        :return:
2058            The replaced Trigger.
2059        :rtype:
2060            dict
2061
2062        """
2063        if options is None:
2064            options = {}
2065
2066        CosmosClient.__ValidateResource(trigger)
2067        trigger = trigger.copy()
2068        if trigger.get('serverScript'):
2069            trigger['body'] = str(trigger['serverScript'])
2070        elif trigger.get('body'):
2071            trigger['body'] = str(trigger['body'])
2072
2073        path = base.GetPathFromLink(trigger_link)
2074        trigger_id = base.GetResourceIdOrFullNameFromLink(trigger_link)
2075        return self.Replace(trigger,
2076                            path,
2077                            'triggers',
2078                            trigger_id,
2079                            None,
2080                            options)
2081
2082    def DeleteTrigger(self, trigger_link, options=None):
2083        """Deletes a trigger.
2084
2085        :param str trigger_link:
2086            The link to the trigger.
2087        :param dict options:
2088            The request options for the request.
2089
2090        :return:
2091            The deleted Trigger.
2092        :rtype:
2093            dict
2094
2095        """
2096        if options is None:
2097            options = {}
2098
2099        path = base.GetPathFromLink(trigger_link)
2100        trigger_id = base.GetResourceIdOrFullNameFromLink(trigger_link)
2101        return self.DeleteResource(path,
2102                                   'triggers',
2103                                   trigger_id,
2104                                   None,
2105                                   options)
2106
2107    def ReplaceUserDefinedFunction(self, udf_link, udf, options=None):
2108        """Replaces a user defined function and returns it.
2109
2110        :param str udf_link:
2111            The link to the user defined function.
2112        :param dict udf:
2113        :param dict options:
2114            The request options for the request.
2115
2116        :return:
2117            The new UDF.
2118        :rtype:
2119            dict
2120
2121        """
2122        if options is None:
2123            options = {}
2124
2125        CosmosClient.__ValidateResource(udf)
2126        udf = udf.copy()
2127        if udf.get('serverScript'):
2128            udf['body'] = str(udf['serverScript'])
2129        elif udf.get('body'):
2130            udf['body'] = str(udf['body'])
2131
2132        path = base.GetPathFromLink(udf_link)
2133        udf_id = base.GetResourceIdOrFullNameFromLink(udf_link)
2134        return self.Replace(udf,
2135                            path,
2136                            'udfs',
2137                            udf_id,
2138                            None,
2139                            options)
2140
2141    def DeleteUserDefinedFunction(self, udf_link, options=None):
2142        """Deletes a user defined function.
2143
2144        :param str udf_link:
2145            The link to the user defined function.
2146        :param dict options:
2147            The request options for the request.
2148
2149        :return:
2150            The deleted UDF.
2151        :rtype:
2152            dict
2153
2154        """
2155        if options is None:
2156            options = {}
2157
2158        path = base.GetPathFromLink(udf_link)
2159        udf_id = base.GetResourceIdOrFullNameFromLink(udf_link)
2160        return self.DeleteResource(path,
2161                                   'udfs',
2162                                   udf_id,
2163                                   None,
2164                                   options)
2165
2166    def ExecuteStoredProcedure(self, sproc_link, params, options=None):
2167        """Executes a store procedure.
2168
2169        :param str sproc_link:
2170            The link to the stored procedure.
2171        :param dict params:
2172            List or None
2173        :param dict options:
2174            The request options for the request.
2175
2176        :return:
2177            The Stored Procedure response.
2178        :rtype:
2179            dict
2180
2181        """
2182        if options is None:
2183            options = {}
2184
2185        initial_headers = dict(self.default_headers)
2186        initial_headers.update({
2187            http_constants.HttpHeaders.Accept: (
2188                runtime_constants.MediaTypes.Json)
2189        })
2190
2191        if params and not type(params) is list:
2192            params = [params]
2193
2194        path = base.GetPathFromLink(sproc_link)
2195        sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link)
2196        headers = base.GetHeaders(self,
2197                                  initial_headers,
2198                                  'post',
2199                                  path,
2200                                  sproc_id,
2201                                  'sprocs',
2202                                  options)
2203
2204        # ExecuteStoredProcedure will use WriteEndpoint since it uses POST operation
2205        request = request_object._RequestObject('sprocs', documents._OperationType.ExecuteJavaScript)
2206        result, self.last_response_headers = self.__Post(path,
2207                                                         request,
2208                                                         params,
2209                                                         headers)
2210        return result
2211
2212    def ReplaceStoredProcedure(self, sproc_link, sproc, options=None):
2213        """Replaces a stored procedure and returns it.
2214
2215        :param str sproc_link:
2216            The link to the stored procedure.
2217        :param dict sproc:
2218        :param dict options:
2219            The request options for the request.
2220
2221        :return:
2222            The replaced Stored Procedure.
2223        :rtype:
2224            dict
2225
2226        """
2227        if options is None:
2228            options = {}
2229
2230        CosmosClient.__ValidateResource(sproc)
2231        sproc = sproc.copy()
2232        if sproc.get('serverScript'):
2233            sproc['body'] = str(sproc['serverScript'])
2234        elif sproc.get('body'):
2235            sproc['body'] = str(sproc['body'])
2236
2237        path = base.GetPathFromLink(sproc_link)
2238        sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link)
2239        return self.Replace(sproc,
2240                            path,
2241                            'sprocs',
2242                            sproc_id,
2243                            None,
2244                            options)
2245
2246    def DeleteStoredProcedure(self, sproc_link, options=None):
2247        """Deletes a stored procedure.
2248
2249        :param str sproc_link:
2250            The link to the stored procedure.
2251        :param dict options:
2252            The request options for the request.
2253
2254        :return:
2255            The deleted Stored Procedure.
2256        :rtype:
2257            dict
2258
2259        """
2260        if options is None:
2261            options = {}
2262
2263        path = base.GetPathFromLink(sproc_link)
2264        sproc_id = base.GetResourceIdOrFullNameFromLink(sproc_link)
2265        return self.DeleteResource(path,
2266                                   'sprocs',
2267                                   sproc_id,
2268                                   None,
2269                                   options)
2270
2271    def DeleteConflict(self, conflict_link, options=None):
2272        """Deletes a conflict.
2273
2274        :param str conflict_link:
2275            The link to the conflict.
2276        :param dict options:
2277            The request options for the request.
2278
2279        :return:
2280            The deleted Conflict.
2281        :rtype:
2282            dict
2283
2284        """
2285        if options is None:
2286            options = {}
2287
2288        path = base.GetPathFromLink(conflict_link)
2289        conflict_id = base.GetResourceIdOrFullNameFromLink(conflict_link)
2290        return self.DeleteResource(path,
2291                                   'conflicts',
2292                                   conflict_id,
2293                                   None,
2294                                   options)
2295
2296    def ReplaceOffer(self, offer_link, offer):
2297        """Replaces an offer and returns it.
2298
2299        :param str offer_link:
2300            The link to the offer.
2301        :param dict offer:
2302
2303        :return:
2304            The replaced Offer.
2305        :rtype:
2306            dict
2307
2308        """
2309        CosmosClient.__ValidateResource(offer)
2310        path = base.GetPathFromLink(offer_link)
2311        offer_id = base.GetResourceIdOrFullNameFromLink(offer_link)
2312        return self.Replace(offer, path, 'offers', offer_id, None, None)
2313
2314    def ReadOffer(self, offer_link):
2315        """Reads an offer.
2316
2317        :param str offer_link:
2318            The link to the offer.
2319
2320        :return:
2321            The read Offer.
2322        :rtype:
2323            dict
2324
2325        """
2326        path = base.GetPathFromLink(offer_link)
2327        offer_id = base.GetResourceIdOrFullNameFromLink(offer_link)
2328        return self.Read(path, 'offers', offer_id, None, {})
2329
2330    def ReadOffers(self, options=None):
2331        """Reads all offers.
2332
2333        :param dict options:
2334            The request options for the request
2335
2336        :return:
2337            Query Iterable of Offers.
2338        :rtype:
2339            query_iterable.QueryIterable
2340
2341        """
2342        if options is None:
2343            options = {}
2344
2345        return self.QueryOffers(None, options)
2346
2347    def QueryOffers(self, query, options=None):
2348        """Query for all offers.
2349
2350        :param (str or dict) query:
2351        :param dict options:
2352            The request options for the request
2353
2354        :return:
2355            Query Iterable of Offers.
2356        :rtype:
2357            query_iterable.QueryIterable
2358
2359        """
2360        if options is None:
2361            options = {}
2362
2363        def fetch_fn(options):
2364            return self.__QueryFeed('/offers',
2365                                    'offers',
2366                                    '',
2367                                    lambda r: r['Offers'],
2368                                    lambda _, b: b,
2369                                    query,
2370                                    options), self.last_response_headers
2371        return query_iterable.QueryIterable(self, query, options, fetch_fn)
2372
2373    def GetDatabaseAccount(self, url_connection=None):
2374        """Gets database account info.
2375
2376        :return:
2377            The Database Account.
2378        :rtype:
2379            documents.DatabaseAccount
2380
2381        """
2382        if url_connection is None:
2383            url_connection = self.url_connection
2384
2385        initial_headers = dict(self.default_headers)
2386        headers = base.GetHeaders(self,
2387                                  initial_headers,
2388                                  'get',
2389                                  '',  # path
2390                                  '',  # id
2391                                  '',  # type
2392                                  {})
2393
2394        request = request_object._RequestObject('databaseaccount', documents._OperationType.Read, url_connection)
2395        result, self.last_response_headers = self.__Get('',
2396                                                        request,
2397                                                        headers)
2398        database_account = documents.DatabaseAccount()
2399        database_account.DatabasesLink = '/dbs/'
2400        database_account.MediaLink = '/media/'
2401        if (http_constants.HttpHeaders.MaxMediaStorageUsageInMB in
2402            self.last_response_headers):
2403            database_account.MaxMediaStorageUsageInMB = (
2404                self.last_response_headers[
2405                    http_constants.HttpHeaders.MaxMediaStorageUsageInMB])
2406        if (http_constants.HttpHeaders.CurrentMediaStorageUsageInMB in
2407            self.last_response_headers):
2408            database_account.CurrentMediaStorageUsageInMB = (
2409                self.last_response_headers[
2410                    http_constants.HttpHeaders.CurrentMediaStorageUsageInMB])
2411        database_account.ConsistencyPolicy = result.get(constants._Constants.UserConsistencyPolicy)
2412
2413        # WritableLocations and ReadableLocations fields will be available only for geo-replicated database accounts
2414        if constants._Constants.WritableLocations in result:
2415            database_account._WritableLocations = result[constants._Constants.WritableLocations]
2416        if constants._Constants.ReadableLocations in result:
2417            database_account._ReadableLocations = result[constants._Constants.ReadableLocations]
2418        if constants._Constants.EnableMultipleWritableLocations in result:
2419            database_account._EnableMultipleWritableLocations = result[constants._Constants.EnableMultipleWritableLocations]
2420
2421        self._useMultipleWriteLocations = self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations
2422        return database_account
2423
2424    def Create(self, body, path, type, id, initial_headers, options=None):
2425        """Creates a Azure Cosmos resource and returns it.
2426
2427        :param dict body:
2428        :param str path:
2429        :param str type:
2430        :param str id:
2431        :param dict initial_headers:
2432        :param dict options:
2433            The request options for the request.
2434
2435        :return:
2436            The created Azure Cosmos resource.
2437        :rtype:
2438            dict
2439
2440        """
2441        if options is None:
2442            options = {}
2443
2444        initial_headers = initial_headers or self.default_headers
2445        headers = base.GetHeaders(self,
2446                                  initial_headers,
2447                                  'post',
2448                                  path,
2449                                  id,
2450                                  type,
2451                                  options)
2452        # Create will use WriteEndpoint since it uses POST operation
2453
2454        request = request_object._RequestObject(type, documents._OperationType.Create)
2455        result, self.last_response_headers = self.__Post(path,
2456                                                         request,
2457                                                         body,
2458                                                         headers)
2459
2460        # update session for write request
2461        self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
2462        return result
2463
2464    def Upsert(self, body, path, type, id, initial_headers, options=None):
2465        """Upserts a Azure Cosmos resource and returns it.
2466
2467        :param dict body:
2468        :param str path:
2469        :param str type:
2470        :param str id:
2471        :param dict initial_headers:
2472        :param dict options:
2473            The request options for the request.
2474
2475        :return:
2476            The upserted Azure Cosmos resource.
2477        :rtype:
2478            dict
2479
2480        """
2481        if options is None:
2482            options = {}
2483
2484        initial_headers = initial_headers or self.default_headers
2485        headers = base.GetHeaders(self,
2486                                  initial_headers,
2487                                  'post',
2488                                  path,
2489                                  id,
2490                                  type,
2491                                  options)
2492
2493        headers[http_constants.HttpHeaders.IsUpsert] = True
2494
2495        # Upsert will use WriteEndpoint since it uses POST operation
2496        request = request_object._RequestObject(type, documents._OperationType.Upsert)
2497        result, self.last_response_headers = self.__Post(path,
2498                                                         request,
2499                                                         body,
2500                                                         headers)
2501        # update session for write request
2502        self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
2503        return result
2504
2505    def Replace(self, resource, path, type, id, initial_headers, options=None):
2506        """Replaces a Azure Cosmos resource and returns it.
2507
2508        :param dict resource:
2509        :param str path:
2510        :param str type:
2511        :param str id:
2512        :param dict initial_headers:
2513        :param dict options:
2514            The request options for the request.
2515
2516        :return:
2517            The new Azure Cosmos resource.
2518        :rtype:
2519            dict
2520
2521        """
2522        if options is None:
2523            options = {}
2524
2525        initial_headers = initial_headers or self.default_headers
2526        headers = base.GetHeaders(self,
2527                                  initial_headers,
2528                                  'put',
2529                                  path,
2530                                  id,
2531                                  type,
2532                                  options)
2533        # Replace will use WriteEndpoint since it uses PUT operation
2534        request = request_object._RequestObject(type, documents._OperationType.Replace)
2535        result, self.last_response_headers = self.__Put(path,
2536                                                        request,
2537                                                        resource,
2538                                                        headers)
2539
2540        # update session for request mutates data on server side
2541        self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
2542        return result
2543
2544    def Read(self, path, type, id, initial_headers, options=None):
2545        """Reads a Azure Cosmos resource and returns it.
2546
2547        :param str path:
2548        :param str type:
2549        :param str id:
2550        :param dict initial_headers:
2551        :param dict options:
2552            The request options for the request.
2553
2554        :return:
2555            The upserted Azure Cosmos resource.
2556        :rtype:
2557            dict
2558
2559        """
2560        if options is None:
2561            options = {}
2562
2563        initial_headers = initial_headers or self.default_headers
2564        headers = base.GetHeaders(self,
2565                                  initial_headers,
2566                                  'get',
2567                                  path,
2568                                  id,
2569                                  type,
2570                                  options)
2571        # Read will use ReadEndpoint since it uses GET operation
2572        request = request_object._RequestObject(type, documents._OperationType.Read)
2573        result, self.last_response_headers = self.__Get(path,
2574                                                        request,
2575                                                        headers)
2576        return result
2577
2578    def DeleteResource(self, path, type, id, initial_headers, options=None):
2579        """Deletes a Azure Cosmos resource and returns it.
2580
2581        :param str path:
2582        :param str type:
2583        :param str id:
2584        :param dict initial_headers:
2585        :param dict options:
2586            The request options for the request.
2587
2588        :return:
2589            The deleted Azure Cosmos resource.
2590        :rtype:
2591            dict
2592
2593        """
2594        if options is None:
2595            options = {}
2596
2597        initial_headers = initial_headers or self.default_headers
2598        headers = base.GetHeaders(self,
2599                                  initial_headers,
2600                                  'delete',
2601                                  path,
2602                                  id,
2603                                  type,
2604                                  options)
2605        # Delete will use WriteEndpoint since it uses DELETE operation
2606        request = request_object._RequestObject(type, documents._OperationType.Delete)
2607        result, self.last_response_headers = self.__Delete(path,
2608                                                           request,
2609                                                           headers)
2610
2611        # update session for request mutates data on server side
2612        self._UpdateSessionIfRequired(headers, result, self.last_response_headers)
2613
2614
2615        return result
2616
2617    def __Get(self, path, request, headers):
2618        """Azure Cosmos 'GET' http request.
2619
2620        :params str url:
2621        :params str path:
2622        :params dict headers:
2623
2624        :return:
2625            Tuple of (result, headers).
2626        :rtype:
2627            tuple of (dict, dict)
2628
2629        """
2630        return synchronized_request.SynchronizedRequest(self,
2631                                                        request,
2632                                                        self._global_endpoint_manager,
2633                                                        self.connection_policy,
2634                                                        self._requests_session,
2635                                                        'GET',
2636                                                        path,
2637                                                        None,
2638                                                        None,
2639                                                        headers)
2640
2641    def __Post(self, path, request, body, headers):
2642        """Azure Cosmos 'POST' http request.
2643
2644        :params str url:
2645        :params str path:
2646        :params (str, unicode, dict) body:
2647        :params dict headers:
2648
2649        :return:
2650            Tuple of (result, headers).
2651        :rtype:
2652            tuple of (dict, dict)
2653
2654        """
2655        return synchronized_request.SynchronizedRequest(self,
2656                                                        request,
2657                                                        self._global_endpoint_manager,
2658                                                        self.connection_policy,
2659                                                        self._requests_session,
2660                                                        'POST',
2661                                                        path,
2662                                                        body,
2663                                                        query_params=None,
2664                                                        headers=headers)
2665
2666    def __Put(self, path, request, body, headers):
2667        """Azure Cosmos 'PUT' http request.
2668
2669        :params str url:
2670        :params str path:
2671        :params (str, unicode, dict) body:
2672        :params dict headers:
2673
2674        :return:
2675            Tuple of (result, headers).
2676        :rtype:
2677            tuple of (dict, dict)
2678
2679        """
2680        return synchronized_request.SynchronizedRequest(self,
2681                                                        request,
2682                                                        self._global_endpoint_manager,
2683                                                        self.connection_policy,
2684                                                        self._requests_session,
2685                                                        'PUT',
2686                                                        path,
2687                                                        body,
2688                                                        query_params=None,
2689                                                        headers=headers)
2690
2691    def __Delete(self, path, request, headers):
2692        """Azure Cosmos 'DELETE' http request.
2693
2694        :params str url:
2695        :params str path:
2696        :params dict headers:
2697
2698        :return:
2699            Tuple of (result, headers).
2700        :rtype:
2701            tuple of (dict, dict)
2702
2703        """
2704        return synchronized_request.SynchronizedRequest(self,
2705                                                        request,
2706                                                        self._global_endpoint_manager,
2707                                                        self.connection_policy,
2708                                                        self._requests_session,
2709                                                        'DELETE',
2710                                                        path,
2711                                                        request_data=None,
2712                                                        query_params=None,
2713                                                        headers=headers)
2714
2715    def QueryFeed(self, path, collection_id, query, options, partition_key_range_id = None):
2716        """Query Feed for Document Collection resource.
2717
2718        :param str path:
2719            Path to the document collection.
2720        :param str collection_id:
2721            Id of the document collection.
2722        :param (str or dict) query:
2723        :param dict options:
2724            The request options for the request.
2725        :param str partition_key_range_id:
2726            Partition key range id.
2727        :rtype:
2728            tuple
2729
2730        """
2731        return self.__QueryFeed(path,
2732                                'docs',
2733                                collection_id,
2734                                lambda r: r['Documents'],
2735                                lambda _, b: b,
2736                                query,
2737                                options,
2738                                partition_key_range_id), self.last_response_headers
2739
2740    def __QueryFeed(self,
2741                    path,
2742                    type,
2743                    id,
2744                    result_fn,
2745                    create_fn,
2746                    query,
2747                    options=None,
2748                    partition_key_range_id=None):
2749        """Query for more than one Azure Cosmos resources.
2750
2751        :param str path:
2752        :param str type:
2753        :param str id:
2754        :param function result_fn:
2755        :param function create_fn:
2756        :param (str or dict) query:
2757        :param dict options:
2758            The request options for the request.
2759        :param str partition_key_range_id:
2760            Specifies partition key range id.
2761
2762        :rtype:
2763            list
2764
2765        :raises SystemError: If the query compatibility mode is undefined.
2766
2767        """
2768        if options is None:
2769            options = {}
2770
2771        if query:
2772            __GetBodiesFromQueryResult = result_fn
2773        else:
2774            def __GetBodiesFromQueryResult(result):
2775                if result is not None:
2776                    return [create_fn(self, body) for body in result_fn(result)]
2777                else:
2778                    # If there is no change feed, the result data is empty and result is None.
2779                    # This case should be interpreted as an empty array.
2780                    return []
2781
2782
2783        initial_headers = self.default_headers.copy()
2784        # Copy to make sure that default_headers won't be changed.
2785        if query is None:
2786            # Query operations will use ReadEndpoint even though it uses GET(for feed requests)
2787            request = request_object._RequestObject(type, documents._OperationType.ReadFeed)
2788            headers = base.GetHeaders(self,
2789                                      initial_headers,
2790                                      'get',
2791                                      path,
2792                                      id,
2793                                      type,
2794                                      options,
2795                                      partition_key_range_id)
2796            result, self.last_response_headers = self.__Get(path,
2797                                                            request,
2798                                                            headers)
2799            return __GetBodiesFromQueryResult(result)
2800        else:
2801            query = self.__CheckAndUnifyQueryFormat(query)
2802
2803            initial_headers[http_constants.HttpHeaders.IsQuery] = 'true'
2804            if (self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Default or
2805                    self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Query):
2806                initial_headers[http_constants.HttpHeaders.ContentType] = runtime_constants.MediaTypes.QueryJson
2807            elif self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.SqlQuery:
2808                initial_headers[http_constants.HttpHeaders.ContentType] = runtime_constants.MediaTypes.SQL
2809            else:
2810                raise SystemError('Unexpected query compatibility mode.')
2811
2812            # Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
2813            request = request_object._RequestObject(type, documents._OperationType.SqlQuery)
2814            headers = base.GetHeaders(self,
2815                                      initial_headers,
2816                                      'post',
2817                                      path,
2818                                      id,
2819                                      type,
2820                                      options,
2821                                      partition_key_range_id)
2822            result, self.last_response_headers = self.__Post(path,
2823                                                             request,
2824                                                             query,
2825                                                             headers)
2826            return __GetBodiesFromQueryResult(result)
2827
2828    def __CheckAndUnifyQueryFormat(self, query_body):
2829        """Checks and unifies the format of the query body.
2830
2831        :raises TypeError: If query_body is not of expected type (depending on the query compatibility mode).
2832        :raises ValueError: If query_body is a dict but doesn\'t have valid query text.
2833        :raises SystemError: If the query compatibility mode is undefined.
2834
2835        :param (str or dict) query_body:
2836
2837        :return:
2838            The formatted query body.
2839        :rtype:
2840            dict or string
2841        """
2842        if (self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Default or
2843               self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.Query):
2844            if not isinstance(query_body, dict) and not isinstance(query_body, six.string_types):
2845                raise TypeError('query body must be a dict or string.')
2846            if isinstance(query_body, dict) and not query_body.get('query'):
2847                raise ValueError('query body must have valid query text with key "query".')
2848            if isinstance(query_body, six.string_types):
2849                return {'query': query_body}
2850        elif (self._query_compatibility_mode == CosmosClient._QueryCompatibilityMode.SqlQuery and
2851              not isinstance(query_body, six.string_types)):
2852            raise TypeError('query body must be a string.')
2853        else:
2854            raise SystemError('Unexpected query compatibility mode.')
2855
2856        return query_body
2857
2858    @staticmethod
2859    def __ValidateResource(resource):
2860        id = resource.get('id')
2861        if id:
2862            if id.find('/') != -1 or id.find('\\') != -1 or id.find('?') != -1 or id.find('#') != -1:
2863                raise ValueError('Id contains illegal chars.')
2864
2865            if id[-1] == ' ':
2866                raise ValueError('Id ends with a space.')
2867
2868    # Adds the partition key to options
2869    def _AddPartitionKey(self, collection_link, document, options):
2870        collection_link = base.TrimBeginningAndEndingSlashes(collection_link)
2871
2872        #TODO: Refresh the cache if partition is extracted automatically and we get a 400.1001
2873
2874        # If the document collection link is present in the cache, then use the cached partitionkey definition
2875        if collection_link in self.partition_key_definition_cache:
2876            partitionKeyDefinition = self.partition_key_definition_cache.get(collection_link)
2877        # Else read the collection from backend and add it to the cache
2878        else:
2879            collection = self.ReadContainer(collection_link)
2880            partitionKeyDefinition = collection.get('partitionKey')
2881            self.partition_key_definition_cache[collection_link] = partitionKeyDefinition
2882
2883        # If the collection doesn't have a partition key definition, skip it as it's a legacy collection
2884        if partitionKeyDefinition:
2885            # If the user has passed in the partitionKey in options use that elase extract it from the document
2886            if('partitionKey' not in options):
2887                partitionKeyValue = self._ExtractPartitionKey(partitionKeyDefinition, document)
2888                options['partitionKey'] = partitionKeyValue
2889
2890        return options
2891
2892    # Extracts the partition key from the document using the partitionKey definition
2893    def _ExtractPartitionKey(self, partitionKeyDefinition, document):
2894
2895        # Parses the paths into a list of token each representing a property
2896        partition_key_parts = base.ParsePaths(partitionKeyDefinition.get('paths'))
2897
2898        # Navigates the document to retrieve the partitionKey specified in the paths
2899        return self._RetrievePartitionKey(partition_key_parts, document)
2900
2901    # Navigates the document to retrieve the partitionKey specified in the partition key parts
2902    def _RetrievePartitionKey(self, partition_key_parts, document):
2903        expected_matchCount = len(partition_key_parts)
2904        matchCount = 0
2905        partitionKey = document
2906
2907        for part in partition_key_parts:
2908            # At any point if we don't find the value of a sub-property in the document, we return as Undefined
2909            if part not in partitionKey:
2910                return documents.Undefined
2911            else:
2912                partitionKey = partitionKey.get(part)
2913                matchCount += 1
2914                # Once we reach the "leaf" value(not a dict), we break from loop
2915                if not isinstance(partitionKey, dict):
2916                    break
2917
2918        # Match the count of hops we did to get the partitionKey with the length of partition key parts and validate that it's not a dict at that level
2919        if ((matchCount != expected_matchCount) or isinstance(partitionKey, dict)):
2920            return documents.Undefined
2921
2922        return partitionKey
2923
2924    def _UpdateSessionIfRequired(self, request_headers, response_result, response_headers):
2925        """
2926        Updates session if necessary.
2927
2928        :param dict response_result:
2929        :param dict response_headers:
2930        :param dict response_headers
2931
2932        :return:
2933            None, but updates the client session if necessary.
2934
2935        """
2936
2937        '''if this request was made with consistency level as session, then update
2938        the session'''
2939
2940        if response_result is None or response_headers is None:
2941            return
2942
2943        is_session_consistency = False
2944        if http_constants.HttpHeaders.ConsistencyLevel in request_headers:
2945            if documents.ConsistencyLevel.Session == request_headers[http_constants.HttpHeaders.ConsistencyLevel]:
2946                is_session_consistency = True
2947
2948        if is_session_consistency:
2949            # update session
2950            self.session.update_session(response_result, response_headers)
2951