1# coding=utf-8
2# --------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for license information.
5# Code generated by Microsoft (R) AutoRest Code Generator.
6# Changes may cause incorrect behavior and will be lost if the code is regenerated.
7# --------------------------------------------------------------------------
8from typing import TYPE_CHECKING
9import warnings
10
11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
12from azure.core.paging import ItemPaged
13from azure.core.pipeline import PipelineResponse
14from azure.core.pipeline.transport import HttpRequest, HttpResponse
15from azure.core.polling import LROPoller, NoPolling, PollingMethod
16from azure.mgmt.core.exceptions import ARMErrorFormat
17from azure.mgmt.core.polling.arm_polling import ARMPolling
18
19from .. import models as _models
20
21if TYPE_CHECKING:
22    # pylint: disable=unused-import,ungrouped-imports
23    from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union
24
25    T = TypeVar('T')
26    ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
27
28class SnapshotsOperations(object):
29    """SnapshotsOperations operations.
30
31    You should not instantiate this class directly. Instead, you should create a Client instance that
32    instantiates it for you and attaches it as an attribute.
33
34    :ivar models: Alias to model classes used in this operation group.
35    :type models: ~azure.mgmt.compute.v2016_04_30_preview.models
36    :param client: Client for service requests.
37    :param config: Configuration of service client.
38    :param serializer: An object model serializer.
39    :param deserializer: An object model deserializer.
40    """
41
42    models = _models
43
44    def __init__(self, client, config, serializer, deserializer):
45        self._client = client
46        self._serialize = serializer
47        self._deserialize = deserializer
48        self._config = config
49
50    def _create_or_update_initial(
51        self,
52        resource_group_name,  # type: str
53        snapshot_name,  # type: str
54        snapshot,  # type: "_models.Snapshot"
55        **kwargs  # type: Any
56    ):
57        # type: (...) -> "_models.Snapshot"
58        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Snapshot"]
59        error_map = {
60            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
61        }
62        error_map.update(kwargs.pop('error_map', {}))
63        api_version = "2016-04-30-preview"
64        content_type = kwargs.pop("content_type", "application/json")
65        accept = "application/json"
66
67        # Construct URL
68        url = self._create_or_update_initial.metadata['url']  # type: ignore
69        path_format_arguments = {
70            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
71            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
72            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
73        }
74        url = self._client.format_url(url, **path_format_arguments)
75
76        # Construct parameters
77        query_parameters = {}  # type: Dict[str, Any]
78        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
79
80        # Construct headers
81        header_parameters = {}  # type: Dict[str, Any]
82        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
83        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
84
85        body_content_kwargs = {}  # type: Dict[str, Any]
86        body_content = self._serialize.body(snapshot, 'Snapshot')
87        body_content_kwargs['content'] = body_content
88        request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs)
89        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
90        response = pipeline_response.http_response
91
92        if response.status_code not in [200, 202]:
93            map_error(status_code=response.status_code, response=response, error_map=error_map)
94            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
95
96        if response.status_code == 200:
97            deserialized = self._deserialize('Snapshot', pipeline_response)
98
99        if response.status_code == 202:
100            deserialized = self._deserialize('Snapshot', pipeline_response)
101
102        if cls:
103            return cls(pipeline_response, deserialized, {})
104
105        return deserialized
106    _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'}  # type: ignore
107
108    def begin_create_or_update(
109        self,
110        resource_group_name,  # type: str
111        snapshot_name,  # type: str
112        snapshot,  # type: "_models.Snapshot"
113        **kwargs  # type: Any
114    ):
115        # type: (...) -> LROPoller["_models.Snapshot"]
116        """Creates or updates a snapshot.
117
118        :param resource_group_name: The name of the resource group.
119        :type resource_group_name: str
120        :param snapshot_name: The name of the snapshot within the given subscription and resource
121         group.
122        :type snapshot_name: str
123        :param snapshot: Snapshot object supplied in the body of the Put disk operation.
124        :type snapshot: ~azure.mgmt.compute.v2016_04_30_preview.models.Snapshot
125        :keyword callable cls: A custom type or function that will be passed the direct response
126        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
127        :keyword polling: By default, your polling method will be ARMPolling.
128         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
129        :paramtype polling: bool or ~azure.core.polling.PollingMethod
130        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
131        :return: An instance of LROPoller that returns either Snapshot or the result of cls(response)
132        :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2016_04_30_preview.models.Snapshot]
133        :raises ~azure.core.exceptions.HttpResponseError:
134        """
135        polling = kwargs.pop('polling', True)  # type: Union[bool, PollingMethod]
136        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Snapshot"]
137        lro_delay = kwargs.pop(
138            'polling_interval',
139            self._config.polling_interval
140        )
141        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
142        if cont_token is None:
143            raw_result = self._create_or_update_initial(
144                resource_group_name=resource_group_name,
145                snapshot_name=snapshot_name,
146                snapshot=snapshot,
147                cls=lambda x,y,z: x,
148                **kwargs
149            )
150
151        kwargs.pop('error_map', None)
152        kwargs.pop('content_type', None)
153
154        def get_long_running_output(pipeline_response):
155            deserialized = self._deserialize('Snapshot', pipeline_response)
156
157            if cls:
158                return cls(pipeline_response, deserialized, {})
159            return deserialized
160
161        path_format_arguments = {
162            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
163            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
164            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
165        }
166
167        if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
168        elif polling is False: polling_method = NoPolling()
169        else: polling_method = polling
170        if cont_token:
171            return LROPoller.from_continuation_token(
172                polling_method=polling_method,
173                continuation_token=cont_token,
174                client=self._client,
175                deserialization_callback=get_long_running_output
176            )
177        else:
178            return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
179    begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'}  # type: ignore
180
181    def _update_initial(
182        self,
183        resource_group_name,  # type: str
184        snapshot_name,  # type: str
185        snapshot,  # type: "_models.SnapshotUpdate"
186        **kwargs  # type: Any
187    ):
188        # type: (...) -> "_models.Snapshot"
189        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Snapshot"]
190        error_map = {
191            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
192        }
193        error_map.update(kwargs.pop('error_map', {}))
194        api_version = "2016-04-30-preview"
195        content_type = kwargs.pop("content_type", "application/json")
196        accept = "application/json"
197
198        # Construct URL
199        url = self._update_initial.metadata['url']  # type: ignore
200        path_format_arguments = {
201            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
202            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
203            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
204        }
205        url = self._client.format_url(url, **path_format_arguments)
206
207        # Construct parameters
208        query_parameters = {}  # type: Dict[str, Any]
209        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
210
211        # Construct headers
212        header_parameters = {}  # type: Dict[str, Any]
213        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
214        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
215
216        body_content_kwargs = {}  # type: Dict[str, Any]
217        body_content = self._serialize.body(snapshot, 'SnapshotUpdate')
218        body_content_kwargs['content'] = body_content
219        request = self._client.patch(url, query_parameters, header_parameters, **body_content_kwargs)
220        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
221        response = pipeline_response.http_response
222
223        if response.status_code not in [200, 202]:
224            map_error(status_code=response.status_code, response=response, error_map=error_map)
225            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
226
227        if response.status_code == 200:
228            deserialized = self._deserialize('Snapshot', pipeline_response)
229
230        if response.status_code == 202:
231            deserialized = self._deserialize('Snapshot', pipeline_response)
232
233        if cls:
234            return cls(pipeline_response, deserialized, {})
235
236        return deserialized
237    _update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'}  # type: ignore
238
239    def begin_update(
240        self,
241        resource_group_name,  # type: str
242        snapshot_name,  # type: str
243        snapshot,  # type: "_models.SnapshotUpdate"
244        **kwargs  # type: Any
245    ):
246        # type: (...) -> LROPoller["_models.Snapshot"]
247        """Updates (patches) a snapshot.
248
249        :param resource_group_name: The name of the resource group.
250        :type resource_group_name: str
251        :param snapshot_name: The name of the snapshot within the given subscription and resource
252         group.
253        :type snapshot_name: str
254        :param snapshot: Snapshot object supplied in the body of the Patch snapshot operation.
255        :type snapshot: ~azure.mgmt.compute.v2016_04_30_preview.models.SnapshotUpdate
256        :keyword callable cls: A custom type or function that will be passed the direct response
257        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
258        :keyword polling: By default, your polling method will be ARMPolling.
259         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
260        :paramtype polling: bool or ~azure.core.polling.PollingMethod
261        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
262        :return: An instance of LROPoller that returns either Snapshot or the result of cls(response)
263        :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2016_04_30_preview.models.Snapshot]
264        :raises ~azure.core.exceptions.HttpResponseError:
265        """
266        polling = kwargs.pop('polling', True)  # type: Union[bool, PollingMethod]
267        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Snapshot"]
268        lro_delay = kwargs.pop(
269            'polling_interval',
270            self._config.polling_interval
271        )
272        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
273        if cont_token is None:
274            raw_result = self._update_initial(
275                resource_group_name=resource_group_name,
276                snapshot_name=snapshot_name,
277                snapshot=snapshot,
278                cls=lambda x,y,z: x,
279                **kwargs
280            )
281
282        kwargs.pop('error_map', None)
283        kwargs.pop('content_type', None)
284
285        def get_long_running_output(pipeline_response):
286            deserialized = self._deserialize('Snapshot', pipeline_response)
287
288            if cls:
289                return cls(pipeline_response, deserialized, {})
290            return deserialized
291
292        path_format_arguments = {
293            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
294            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
295            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
296        }
297
298        if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
299        elif polling is False: polling_method = NoPolling()
300        else: polling_method = polling
301        if cont_token:
302            return LROPoller.from_continuation_token(
303                polling_method=polling_method,
304                continuation_token=cont_token,
305                client=self._client,
306                deserialization_callback=get_long_running_output
307            )
308        else:
309            return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
310    begin_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'}  # type: ignore
311
312    def get(
313        self,
314        resource_group_name,  # type: str
315        snapshot_name,  # type: str
316        **kwargs  # type: Any
317    ):
318        # type: (...) -> "_models.Snapshot"
319        """Gets information about a snapshot.
320
321        :param resource_group_name: The name of the resource group.
322        :type resource_group_name: str
323        :param snapshot_name: The name of the snapshot within the given subscription and resource
324         group.
325        :type snapshot_name: str
326        :keyword callable cls: A custom type or function that will be passed the direct response
327        :return: Snapshot, or the result of cls(response)
328        :rtype: ~azure.mgmt.compute.v2016_04_30_preview.models.Snapshot
329        :raises: ~azure.core.exceptions.HttpResponseError
330        """
331        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Snapshot"]
332        error_map = {
333            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
334        }
335        error_map.update(kwargs.pop('error_map', {}))
336        api_version = "2016-04-30-preview"
337        accept = "application/json"
338
339        # Construct URL
340        url = self.get.metadata['url']  # type: ignore
341        path_format_arguments = {
342            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
343            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
344            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
345        }
346        url = self._client.format_url(url, **path_format_arguments)
347
348        # Construct parameters
349        query_parameters = {}  # type: Dict[str, Any]
350        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
351
352        # Construct headers
353        header_parameters = {}  # type: Dict[str, Any]
354        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
355
356        request = self._client.get(url, query_parameters, header_parameters)
357        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
358        response = pipeline_response.http_response
359
360        if response.status_code not in [200]:
361            map_error(status_code=response.status_code, response=response, error_map=error_map)
362            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
363
364        deserialized = self._deserialize('Snapshot', pipeline_response)
365
366        if cls:
367            return cls(pipeline_response, deserialized, {})
368
369        return deserialized
370    get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'}  # type: ignore
371
372    def _delete_initial(
373        self,
374        resource_group_name,  # type: str
375        snapshot_name,  # type: str
376        **kwargs  # type: Any
377    ):
378        # type: (...) -> Optional["_models.OperationStatusResponse"]
379        cls = kwargs.pop('cls', None)  # type: ClsType[Optional["_models.OperationStatusResponse"]]
380        error_map = {
381            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
382        }
383        error_map.update(kwargs.pop('error_map', {}))
384        api_version = "2016-04-30-preview"
385        accept = "application/json"
386
387        # Construct URL
388        url = self._delete_initial.metadata['url']  # type: ignore
389        path_format_arguments = {
390            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
391            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
392            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
393        }
394        url = self._client.format_url(url, **path_format_arguments)
395
396        # Construct parameters
397        query_parameters = {}  # type: Dict[str, Any]
398        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
399
400        # Construct headers
401        header_parameters = {}  # type: Dict[str, Any]
402        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
403
404        request = self._client.delete(url, query_parameters, header_parameters)
405        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
406        response = pipeline_response.http_response
407
408        if response.status_code not in [200, 202, 204]:
409            map_error(status_code=response.status_code, response=response, error_map=error_map)
410            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
411
412        deserialized = None
413        if response.status_code == 200:
414            deserialized = self._deserialize('OperationStatusResponse', pipeline_response)
415
416        if cls:
417            return cls(pipeline_response, deserialized, {})
418
419        return deserialized
420    _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'}  # type: ignore
421
422    def begin_delete(
423        self,
424        resource_group_name,  # type: str
425        snapshot_name,  # type: str
426        **kwargs  # type: Any
427    ):
428        # type: (...) -> LROPoller["_models.OperationStatusResponse"]
429        """Deletes a snapshot.
430
431        :param resource_group_name: The name of the resource group.
432        :type resource_group_name: str
433        :param snapshot_name: The name of the snapshot within the given subscription and resource
434         group.
435        :type snapshot_name: str
436        :keyword callable cls: A custom type or function that will be passed the direct response
437        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
438        :keyword polling: By default, your polling method will be ARMPolling.
439         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
440        :paramtype polling: bool or ~azure.core.polling.PollingMethod
441        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
442        :return: An instance of LROPoller that returns either OperationStatusResponse or the result of cls(response)
443        :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2016_04_30_preview.models.OperationStatusResponse]
444        :raises ~azure.core.exceptions.HttpResponseError:
445        """
446        polling = kwargs.pop('polling', True)  # type: Union[bool, PollingMethod]
447        cls = kwargs.pop('cls', None)  # type: ClsType["_models.OperationStatusResponse"]
448        lro_delay = kwargs.pop(
449            'polling_interval',
450            self._config.polling_interval
451        )
452        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
453        if cont_token is None:
454            raw_result = self._delete_initial(
455                resource_group_name=resource_group_name,
456                snapshot_name=snapshot_name,
457                cls=lambda x,y,z: x,
458                **kwargs
459            )
460
461        kwargs.pop('error_map', None)
462        kwargs.pop('content_type', None)
463
464        def get_long_running_output(pipeline_response):
465            deserialized = self._deserialize('OperationStatusResponse', pipeline_response)
466
467            if cls:
468                return cls(pipeline_response, deserialized, {})
469            return deserialized
470
471        path_format_arguments = {
472            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
473            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
474            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
475        }
476
477        if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
478        elif polling is False: polling_method = NoPolling()
479        else: polling_method = polling
480        if cont_token:
481            return LROPoller.from_continuation_token(
482                polling_method=polling_method,
483                continuation_token=cont_token,
484                client=self._client,
485                deserialization_callback=get_long_running_output
486            )
487        else:
488            return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
489    begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'}  # type: ignore
490
491    def list_by_resource_group(
492        self,
493        resource_group_name,  # type: str
494        **kwargs  # type: Any
495    ):
496        # type: (...) -> Iterable["_models.SnapshotList"]
497        """Lists snapshots under a resource group.
498
499        :param resource_group_name: The name of the resource group.
500        :type resource_group_name: str
501        :keyword callable cls: A custom type or function that will be passed the direct response
502        :return: An iterator like instance of either SnapshotList or the result of cls(response)
503        :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.compute.v2016_04_30_preview.models.SnapshotList]
504        :raises: ~azure.core.exceptions.HttpResponseError
505        """
506        cls = kwargs.pop('cls', None)  # type: ClsType["_models.SnapshotList"]
507        error_map = {
508            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
509        }
510        error_map.update(kwargs.pop('error_map', {}))
511        api_version = "2016-04-30-preview"
512        accept = "application/json"
513
514        def prepare_request(next_link=None):
515            # Construct headers
516            header_parameters = {}  # type: Dict[str, Any]
517            header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
518
519            if not next_link:
520                # Construct URL
521                url = self.list_by_resource_group.metadata['url']  # type: ignore
522                path_format_arguments = {
523                    'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
524                    'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
525                }
526                url = self._client.format_url(url, **path_format_arguments)
527                # Construct parameters
528                query_parameters = {}  # type: Dict[str, Any]
529                query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
530
531                request = self._client.get(url, query_parameters, header_parameters)
532            else:
533                url = next_link
534                query_parameters = {}  # type: Dict[str, Any]
535                request = self._client.get(url, query_parameters, header_parameters)
536            return request
537
538        def extract_data(pipeline_response):
539            deserialized = self._deserialize('SnapshotList', pipeline_response)
540            list_of_elem = deserialized.value
541            if cls:
542                list_of_elem = cls(list_of_elem)
543            return deserialized.next_link or None, iter(list_of_elem)
544
545        def get_next(next_link=None):
546            request = prepare_request(next_link)
547
548            pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
549            response = pipeline_response.http_response
550
551            if response.status_code not in [200]:
552                map_error(status_code=response.status_code, response=response, error_map=error_map)
553                raise HttpResponseError(response=response, error_format=ARMErrorFormat)
554
555            return pipeline_response
556
557        return ItemPaged(
558            get_next, extract_data
559        )
560    list_by_resource_group.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots'}  # type: ignore
561
562    def list(
563        self,
564        **kwargs  # type: Any
565    ):
566        # type: (...) -> Iterable["_models.SnapshotList"]
567        """Lists snapshots under a subscription.
568
569        :keyword callable cls: A custom type or function that will be passed the direct response
570        :return: An iterator like instance of either SnapshotList or the result of cls(response)
571        :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.compute.v2016_04_30_preview.models.SnapshotList]
572        :raises: ~azure.core.exceptions.HttpResponseError
573        """
574        cls = kwargs.pop('cls', None)  # type: ClsType["_models.SnapshotList"]
575        error_map = {
576            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
577        }
578        error_map.update(kwargs.pop('error_map', {}))
579        api_version = "2016-04-30-preview"
580        accept = "application/json"
581
582        def prepare_request(next_link=None):
583            # Construct headers
584            header_parameters = {}  # type: Dict[str, Any]
585            header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
586
587            if not next_link:
588                # Construct URL
589                url = self.list.metadata['url']  # type: ignore
590                path_format_arguments = {
591                    'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
592                }
593                url = self._client.format_url(url, **path_format_arguments)
594                # Construct parameters
595                query_parameters = {}  # type: Dict[str, Any]
596                query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
597
598                request = self._client.get(url, query_parameters, header_parameters)
599            else:
600                url = next_link
601                query_parameters = {}  # type: Dict[str, Any]
602                request = self._client.get(url, query_parameters, header_parameters)
603            return request
604
605        def extract_data(pipeline_response):
606            deserialized = self._deserialize('SnapshotList', pipeline_response)
607            list_of_elem = deserialized.value
608            if cls:
609                list_of_elem = cls(list_of_elem)
610            return deserialized.next_link or None, iter(list_of_elem)
611
612        def get_next(next_link=None):
613            request = prepare_request(next_link)
614
615            pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
616            response = pipeline_response.http_response
617
618            if response.status_code not in [200]:
619                map_error(status_code=response.status_code, response=response, error_map=error_map)
620                raise HttpResponseError(response=response, error_format=ARMErrorFormat)
621
622            return pipeline_response
623
624        return ItemPaged(
625            get_next, extract_data
626        )
627    list.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/snapshots'}  # type: ignore
628
629    def _grant_access_initial(
630        self,
631        resource_group_name,  # type: str
632        snapshot_name,  # type: str
633        grant_access_data,  # type: "_models.GrantAccessData"
634        **kwargs  # type: Any
635    ):
636        # type: (...) -> Optional["_models.AccessUri"]
637        cls = kwargs.pop('cls', None)  # type: ClsType[Optional["_models.AccessUri"]]
638        error_map = {
639            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
640        }
641        error_map.update(kwargs.pop('error_map', {}))
642        api_version = "2016-04-30-preview"
643        content_type = kwargs.pop("content_type", "application/json")
644        accept = "application/json"
645
646        # Construct URL
647        url = self._grant_access_initial.metadata['url']  # type: ignore
648        path_format_arguments = {
649            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
650            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
651            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
652        }
653        url = self._client.format_url(url, **path_format_arguments)
654
655        # Construct parameters
656        query_parameters = {}  # type: Dict[str, Any]
657        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
658
659        # Construct headers
660        header_parameters = {}  # type: Dict[str, Any]
661        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
662        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
663
664        body_content_kwargs = {}  # type: Dict[str, Any]
665        body_content = self._serialize.body(grant_access_data, 'GrantAccessData')
666        body_content_kwargs['content'] = body_content
667        request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
668        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
669        response = pipeline_response.http_response
670
671        if response.status_code not in [200, 202]:
672            map_error(status_code=response.status_code, response=response, error_map=error_map)
673            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
674
675        deserialized = None
676        if response.status_code == 200:
677            deserialized = self._deserialize('AccessUri', pipeline_response)
678
679        if cls:
680            return cls(pipeline_response, deserialized, {})
681
682        return deserialized
683    _grant_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/beginGetAccess'}  # type: ignore
684
685    def begin_grant_access(
686        self,
687        resource_group_name,  # type: str
688        snapshot_name,  # type: str
689        grant_access_data,  # type: "_models.GrantAccessData"
690        **kwargs  # type: Any
691    ):
692        # type: (...) -> LROPoller["_models.AccessUri"]
693        """Grants access to a snapshot.
694
695        :param resource_group_name: The name of the resource group.
696        :type resource_group_name: str
697        :param snapshot_name: The name of the snapshot within the given subscription and resource
698         group.
699        :type snapshot_name: str
700        :param grant_access_data: Access data object supplied in the body of the get snapshot access
701         operation.
702        :type grant_access_data: ~azure.mgmt.compute.v2016_04_30_preview.models.GrantAccessData
703        :keyword callable cls: A custom type or function that will be passed the direct response
704        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
705        :keyword polling: By default, your polling method will be ARMPolling.
706         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
707        :paramtype polling: bool or ~azure.core.polling.PollingMethod
708        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
709        :return: An instance of LROPoller that returns either AccessUri or the result of cls(response)
710        :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2016_04_30_preview.models.AccessUri]
711        :raises ~azure.core.exceptions.HttpResponseError:
712        """
713        polling = kwargs.pop('polling', True)  # type: Union[bool, PollingMethod]
714        cls = kwargs.pop('cls', None)  # type: ClsType["_models.AccessUri"]
715        lro_delay = kwargs.pop(
716            'polling_interval',
717            self._config.polling_interval
718        )
719        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
720        if cont_token is None:
721            raw_result = self._grant_access_initial(
722                resource_group_name=resource_group_name,
723                snapshot_name=snapshot_name,
724                grant_access_data=grant_access_data,
725                cls=lambda x,y,z: x,
726                **kwargs
727            )
728
729        kwargs.pop('error_map', None)
730        kwargs.pop('content_type', None)
731
732        def get_long_running_output(pipeline_response):
733            deserialized = self._deserialize('AccessUri', pipeline_response)
734
735            if cls:
736                return cls(pipeline_response, deserialized, {})
737            return deserialized
738
739        path_format_arguments = {
740            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
741            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
742            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
743        }
744
745        if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
746        elif polling is False: polling_method = NoPolling()
747        else: polling_method = polling
748        if cont_token:
749            return LROPoller.from_continuation_token(
750                polling_method=polling_method,
751                continuation_token=cont_token,
752                client=self._client,
753                deserialization_callback=get_long_running_output
754            )
755        else:
756            return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
757    begin_grant_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/beginGetAccess'}  # type: ignore
758
759    def _revoke_access_initial(
760        self,
761        resource_group_name,  # type: str
762        snapshot_name,  # type: str
763        **kwargs  # type: Any
764    ):
765        # type: (...) -> Optional["_models.OperationStatusResponse"]
766        cls = kwargs.pop('cls', None)  # type: ClsType[Optional["_models.OperationStatusResponse"]]
767        error_map = {
768            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
769        }
770        error_map.update(kwargs.pop('error_map', {}))
771        api_version = "2016-04-30-preview"
772        accept = "application/json"
773
774        # Construct URL
775        url = self._revoke_access_initial.metadata['url']  # type: ignore
776        path_format_arguments = {
777            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
778            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
779            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
780        }
781        url = self._client.format_url(url, **path_format_arguments)
782
783        # Construct parameters
784        query_parameters = {}  # type: Dict[str, Any]
785        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
786
787        # Construct headers
788        header_parameters = {}  # type: Dict[str, Any]
789        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
790
791        request = self._client.post(url, query_parameters, header_parameters)
792        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
793        response = pipeline_response.http_response
794
795        if response.status_code not in [200, 202]:
796            map_error(status_code=response.status_code, response=response, error_map=error_map)
797            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
798
799        deserialized = None
800        if response.status_code == 200:
801            deserialized = self._deserialize('OperationStatusResponse', pipeline_response)
802
803        if cls:
804            return cls(pipeline_response, deserialized, {})
805
806        return deserialized
807    _revoke_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/endGetAccess'}  # type: ignore
808
809    def begin_revoke_access(
810        self,
811        resource_group_name,  # type: str
812        snapshot_name,  # type: str
813        **kwargs  # type: Any
814    ):
815        # type: (...) -> LROPoller["_models.OperationStatusResponse"]
816        """Revokes access to a snapshot.
817
818        :param resource_group_name: The name of the resource group.
819        :type resource_group_name: str
820        :param snapshot_name: The name of the snapshot within the given subscription and resource
821         group.
822        :type snapshot_name: str
823        :keyword callable cls: A custom type or function that will be passed the direct response
824        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
825        :keyword polling: By default, your polling method will be ARMPolling.
826         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
827        :paramtype polling: bool or ~azure.core.polling.PollingMethod
828        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
829        :return: An instance of LROPoller that returns either OperationStatusResponse or the result of cls(response)
830        :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2016_04_30_preview.models.OperationStatusResponse]
831        :raises ~azure.core.exceptions.HttpResponseError:
832        """
833        polling = kwargs.pop('polling', True)  # type: Union[bool, PollingMethod]
834        cls = kwargs.pop('cls', None)  # type: ClsType["_models.OperationStatusResponse"]
835        lro_delay = kwargs.pop(
836            'polling_interval',
837            self._config.polling_interval
838        )
839        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
840        if cont_token is None:
841            raw_result = self._revoke_access_initial(
842                resource_group_name=resource_group_name,
843                snapshot_name=snapshot_name,
844                cls=lambda x,y,z: x,
845                **kwargs
846            )
847
848        kwargs.pop('error_map', None)
849        kwargs.pop('content_type', None)
850
851        def get_long_running_output(pipeline_response):
852            deserialized = self._deserialize('OperationStatusResponse', pipeline_response)
853
854            if cls:
855                return cls(pipeline_response, deserialized, {})
856            return deserialized
857
858        path_format_arguments = {
859            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
860            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
861            'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'),
862        }
863
864        if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments,  **kwargs)
865        elif polling is False: polling_method = NoPolling()
866        else: polling_method = polling
867        if cont_token:
868            return LROPoller.from_continuation_token(
869                polling_method=polling_method,
870                continuation_token=cont_token,
871                client=self._client,
872                deserialization_callback=get_long_running_output
873            )
874        else:
875            return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
876    begin_revoke_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/endGetAccess'}  # type: ignore
877