1# coding=utf-8
2# --------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for license information.
5# Code generated by Microsoft (R) AutoRest Code Generator.
6# Changes may cause incorrect behavior and will be lost if the code is regenerated.
7# --------------------------------------------------------------------------
8from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar, Union
9import warnings
10
11from azure.core.async_paging import AsyncItemPaged, AsyncList
12from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
13from azure.core.pipeline import PipelineResponse
14from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest
15from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod
16from azure.mgmt.core.exceptions import ARMErrorFormat
17from azure.mgmt.core.polling.async_arm_polling import AsyncARMPolling
18
19from ... import models as _models
20
21T = TypeVar('T')
22ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
23
24class DisksOperations:
25    """DisksOperations async operations.
26
27    You should not instantiate this class directly. Instead, you should create a Client instance that
28    instantiates it for you and attaches it as an attribute.
29
30    :ivar models: Alias to model classes used in this operation group.
31    :type models: ~azure.mgmt.compute.v2018_04_01.models
32    :param client: Client for service requests.
33    :param config: Configuration of service client.
34    :param serializer: An object model serializer.
35    :param deserializer: An object model deserializer.
36    """
37
38    models = _models
39
40    def __init__(self, client, config, serializer, deserializer) -> None:
41        self._client = client
42        self._serialize = serializer
43        self._deserialize = deserializer
44        self._config = config
45
46    async def _create_or_update_initial(
47        self,
48        resource_group_name: str,
49        disk_name: str,
50        disk: "_models.Disk",
51        **kwargs: Any
52    ) -> "_models.Disk":
53        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Disk"]
54        error_map = {
55            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
56        }
57        error_map.update(kwargs.pop('error_map', {}))
58        api_version = "2018-04-01"
59        content_type = kwargs.pop("content_type", "application/json")
60        accept = "application/json"
61
62        # Construct URL
63        url = self._create_or_update_initial.metadata['url']  # type: ignore
64        path_format_arguments = {
65            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
66            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
67            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
68        }
69        url = self._client.format_url(url, **path_format_arguments)
70
71        # Construct parameters
72        query_parameters = {}  # type: Dict[str, Any]
73        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
74
75        # Construct headers
76        header_parameters = {}  # type: Dict[str, Any]
77        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
78        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
79
80        body_content_kwargs = {}  # type: Dict[str, Any]
81        body_content = self._serialize.body(disk, 'Disk')
82        body_content_kwargs['content'] = body_content
83        request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs)
84        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
85        response = pipeline_response.http_response
86
87        if response.status_code not in [200, 202]:
88            map_error(status_code=response.status_code, response=response, error_map=error_map)
89            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
90
91        if response.status_code == 200:
92            deserialized = self._deserialize('Disk', pipeline_response)
93
94        if response.status_code == 202:
95            deserialized = self._deserialize('Disk', pipeline_response)
96
97        if cls:
98            return cls(pipeline_response, deserialized, {})
99
100        return deserialized
101    _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}'}  # type: ignore
102
103    async def begin_create_or_update(
104        self,
105        resource_group_name: str,
106        disk_name: str,
107        disk: "_models.Disk",
108        **kwargs: Any
109    ) -> AsyncLROPoller["_models.Disk"]:
110        """Creates or updates a disk.
111
112        :param resource_group_name: The name of the resource group.
113        :type resource_group_name: str
114        :param disk_name: The name of the managed disk that is being created. The name can't be changed
115         after the disk is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The
116         maximum name length is 80 characters.
117        :type disk_name: str
118        :param disk: Disk object supplied in the body of the Put disk operation.
119        :type disk: ~azure.mgmt.compute.v2018_04_01.models.Disk
120        :keyword callable cls: A custom type or function that will be passed the direct response
121        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
122        :keyword polling: By default, your polling method will be AsyncARMPolling.
123         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
124        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
125        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
126        :return: An instance of AsyncLROPoller that returns either Disk or the result of cls(response)
127        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2018_04_01.models.Disk]
128        :raises ~azure.core.exceptions.HttpResponseError:
129        """
130        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
131        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Disk"]
132        lro_delay = kwargs.pop(
133            'polling_interval',
134            self._config.polling_interval
135        )
136        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
137        if cont_token is None:
138            raw_result = await self._create_or_update_initial(
139                resource_group_name=resource_group_name,
140                disk_name=disk_name,
141                disk=disk,
142                cls=lambda x,y,z: x,
143                **kwargs
144            )
145
146        kwargs.pop('error_map', None)
147        kwargs.pop('content_type', None)
148
149        def get_long_running_output(pipeline_response):
150            deserialized = self._deserialize('Disk', pipeline_response)
151
152            if cls:
153                return cls(pipeline_response, deserialized, {})
154            return deserialized
155
156        path_format_arguments = {
157            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
158            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
159            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
160        }
161
162        if polling is True: polling_method = AsyncARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
163        elif polling is False: polling_method = AsyncNoPolling()
164        else: polling_method = polling
165        if cont_token:
166            return AsyncLROPoller.from_continuation_token(
167                polling_method=polling_method,
168                continuation_token=cont_token,
169                client=self._client,
170                deserialization_callback=get_long_running_output
171            )
172        else:
173            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
174    begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}'}  # type: ignore
175
176    async def _update_initial(
177        self,
178        resource_group_name: str,
179        disk_name: str,
180        disk: "_models.DiskUpdate",
181        **kwargs: Any
182    ) -> "_models.Disk":
183        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Disk"]
184        error_map = {
185            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
186        }
187        error_map.update(kwargs.pop('error_map', {}))
188        api_version = "2018-04-01"
189        content_type = kwargs.pop("content_type", "application/json")
190        accept = "application/json"
191
192        # Construct URL
193        url = self._update_initial.metadata['url']  # type: ignore
194        path_format_arguments = {
195            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
196            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
197            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
198        }
199        url = self._client.format_url(url, **path_format_arguments)
200
201        # Construct parameters
202        query_parameters = {}  # type: Dict[str, Any]
203        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
204
205        # Construct headers
206        header_parameters = {}  # type: Dict[str, Any]
207        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
208        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
209
210        body_content_kwargs = {}  # type: Dict[str, Any]
211        body_content = self._serialize.body(disk, 'DiskUpdate')
212        body_content_kwargs['content'] = body_content
213        request = self._client.patch(url, query_parameters, header_parameters, **body_content_kwargs)
214        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
215        response = pipeline_response.http_response
216
217        if response.status_code not in [200, 202]:
218            map_error(status_code=response.status_code, response=response, error_map=error_map)
219            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
220
221        if response.status_code == 200:
222            deserialized = self._deserialize('Disk', pipeline_response)
223
224        if response.status_code == 202:
225            deserialized = self._deserialize('Disk', pipeline_response)
226
227        if cls:
228            return cls(pipeline_response, deserialized, {})
229
230        return deserialized
231    _update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}'}  # type: ignore
232
233    async def begin_update(
234        self,
235        resource_group_name: str,
236        disk_name: str,
237        disk: "_models.DiskUpdate",
238        **kwargs: Any
239    ) -> AsyncLROPoller["_models.Disk"]:
240        """Updates (patches) a disk.
241
242        :param resource_group_name: The name of the resource group.
243        :type resource_group_name: str
244        :param disk_name: The name of the managed disk that is being created. The name can't be changed
245         after the disk is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The
246         maximum name length is 80 characters.
247        :type disk_name: str
248        :param disk: Disk object supplied in the body of the Patch disk operation.
249        :type disk: ~azure.mgmt.compute.v2018_04_01.models.DiskUpdate
250        :keyword callable cls: A custom type or function that will be passed the direct response
251        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
252        :keyword polling: By default, your polling method will be AsyncARMPolling.
253         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
254        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
255        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
256        :return: An instance of AsyncLROPoller that returns either Disk or the result of cls(response)
257        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2018_04_01.models.Disk]
258        :raises ~azure.core.exceptions.HttpResponseError:
259        """
260        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
261        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Disk"]
262        lro_delay = kwargs.pop(
263            'polling_interval',
264            self._config.polling_interval
265        )
266        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
267        if cont_token is None:
268            raw_result = await self._update_initial(
269                resource_group_name=resource_group_name,
270                disk_name=disk_name,
271                disk=disk,
272                cls=lambda x,y,z: x,
273                **kwargs
274            )
275
276        kwargs.pop('error_map', None)
277        kwargs.pop('content_type', None)
278
279        def get_long_running_output(pipeline_response):
280            deserialized = self._deserialize('Disk', pipeline_response)
281
282            if cls:
283                return cls(pipeline_response, deserialized, {})
284            return deserialized
285
286        path_format_arguments = {
287            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
288            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
289            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
290        }
291
292        if polling is True: polling_method = AsyncARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
293        elif polling is False: polling_method = AsyncNoPolling()
294        else: polling_method = polling
295        if cont_token:
296            return AsyncLROPoller.from_continuation_token(
297                polling_method=polling_method,
298                continuation_token=cont_token,
299                client=self._client,
300                deserialization_callback=get_long_running_output
301            )
302        else:
303            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
304    begin_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}'}  # type: ignore
305
306    async def get(
307        self,
308        resource_group_name: str,
309        disk_name: str,
310        **kwargs: Any
311    ) -> "_models.Disk":
312        """Gets information about a disk.
313
314        :param resource_group_name: The name of the resource group.
315        :type resource_group_name: str
316        :param disk_name: The name of the managed disk that is being created. The name can't be changed
317         after the disk is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The
318         maximum name length is 80 characters.
319        :type disk_name: str
320        :keyword callable cls: A custom type or function that will be passed the direct response
321        :return: Disk, or the result of cls(response)
322        :rtype: ~azure.mgmt.compute.v2018_04_01.models.Disk
323        :raises: ~azure.core.exceptions.HttpResponseError
324        """
325        cls = kwargs.pop('cls', None)  # type: ClsType["_models.Disk"]
326        error_map = {
327            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
328        }
329        error_map.update(kwargs.pop('error_map', {}))
330        api_version = "2018-04-01"
331        accept = "application/json"
332
333        # Construct URL
334        url = self.get.metadata['url']  # type: ignore
335        path_format_arguments = {
336            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
337            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
338            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
339        }
340        url = self._client.format_url(url, **path_format_arguments)
341
342        # Construct parameters
343        query_parameters = {}  # type: Dict[str, Any]
344        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
345
346        # Construct headers
347        header_parameters = {}  # type: Dict[str, Any]
348        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
349
350        request = self._client.get(url, query_parameters, header_parameters)
351        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
352        response = pipeline_response.http_response
353
354        if response.status_code not in [200]:
355            map_error(status_code=response.status_code, response=response, error_map=error_map)
356            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
357
358        deserialized = self._deserialize('Disk', pipeline_response)
359
360        if cls:
361            return cls(pipeline_response, deserialized, {})
362
363        return deserialized
364    get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}'}  # type: ignore
365
366    async def _delete_initial(
367        self,
368        resource_group_name: str,
369        disk_name: str,
370        **kwargs: Any
371    ) -> None:
372        cls = kwargs.pop('cls', None)  # type: ClsType[None]
373        error_map = {
374            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
375        }
376        error_map.update(kwargs.pop('error_map', {}))
377        api_version = "2018-04-01"
378
379        # Construct URL
380        url = self._delete_initial.metadata['url']  # type: ignore
381        path_format_arguments = {
382            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
383            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
384            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
385        }
386        url = self._client.format_url(url, **path_format_arguments)
387
388        # Construct parameters
389        query_parameters = {}  # type: Dict[str, Any]
390        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
391
392        # Construct headers
393        header_parameters = {}  # type: Dict[str, Any]
394
395        request = self._client.delete(url, query_parameters, header_parameters)
396        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
397        response = pipeline_response.http_response
398
399        if response.status_code not in [200, 202, 204]:
400            map_error(status_code=response.status_code, response=response, error_map=error_map)
401            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
402
403        if cls:
404            return cls(pipeline_response, None, {})
405
406    _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}'}  # type: ignore
407
408    async def begin_delete(
409        self,
410        resource_group_name: str,
411        disk_name: str,
412        **kwargs: Any
413    ) -> AsyncLROPoller[None]:
414        """Deletes a disk.
415
416        :param resource_group_name: The name of the resource group.
417        :type resource_group_name: str
418        :param disk_name: The name of the managed disk that is being created. The name can't be changed
419         after the disk is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The
420         maximum name length is 80 characters.
421        :type disk_name: str
422        :keyword callable cls: A custom type or function that will be passed the direct response
423        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
424        :keyword polling: By default, your polling method will be AsyncARMPolling.
425         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
426        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
427        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
428        :return: An instance of AsyncLROPoller that returns either None or the result of cls(response)
429        :rtype: ~azure.core.polling.AsyncLROPoller[None]
430        :raises ~azure.core.exceptions.HttpResponseError:
431        """
432        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
433        cls = kwargs.pop('cls', None)  # type: ClsType[None]
434        lro_delay = kwargs.pop(
435            'polling_interval',
436            self._config.polling_interval
437        )
438        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
439        if cont_token is None:
440            raw_result = await self._delete_initial(
441                resource_group_name=resource_group_name,
442                disk_name=disk_name,
443                cls=lambda x,y,z: x,
444                **kwargs
445            )
446
447        kwargs.pop('error_map', None)
448        kwargs.pop('content_type', None)
449
450        def get_long_running_output(pipeline_response):
451            if cls:
452                return cls(pipeline_response, None, {})
453
454        path_format_arguments = {
455            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
456            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
457            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
458        }
459
460        if polling is True: polling_method = AsyncARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
461        elif polling is False: polling_method = AsyncNoPolling()
462        else: polling_method = polling
463        if cont_token:
464            return AsyncLROPoller.from_continuation_token(
465                polling_method=polling_method,
466                continuation_token=cont_token,
467                client=self._client,
468                deserialization_callback=get_long_running_output
469            )
470        else:
471            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
472    begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}'}  # type: ignore
473
474    def list_by_resource_group(
475        self,
476        resource_group_name: str,
477        **kwargs: Any
478    ) -> AsyncIterable["_models.DiskList"]:
479        """Lists all the disks under a resource group.
480
481        :param resource_group_name: The name of the resource group.
482        :type resource_group_name: str
483        :keyword callable cls: A custom type or function that will be passed the direct response
484        :return: An iterator like instance of either DiskList or the result of cls(response)
485        :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.compute.v2018_04_01.models.DiskList]
486        :raises: ~azure.core.exceptions.HttpResponseError
487        """
488        cls = kwargs.pop('cls', None)  # type: ClsType["_models.DiskList"]
489        error_map = {
490            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
491        }
492        error_map.update(kwargs.pop('error_map', {}))
493        api_version = "2018-04-01"
494        accept = "application/json"
495
496        def prepare_request(next_link=None):
497            # Construct headers
498            header_parameters = {}  # type: Dict[str, Any]
499            header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
500
501            if not next_link:
502                # Construct URL
503                url = self.list_by_resource_group.metadata['url']  # type: ignore
504                path_format_arguments = {
505                    'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
506                    'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
507                }
508                url = self._client.format_url(url, **path_format_arguments)
509                # Construct parameters
510                query_parameters = {}  # type: Dict[str, Any]
511                query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
512
513                request = self._client.get(url, query_parameters, header_parameters)
514            else:
515                url = next_link
516                query_parameters = {}  # type: Dict[str, Any]
517                request = self._client.get(url, query_parameters, header_parameters)
518            return request
519
520        async def extract_data(pipeline_response):
521            deserialized = self._deserialize('DiskList', pipeline_response)
522            list_of_elem = deserialized.value
523            if cls:
524                list_of_elem = cls(list_of_elem)
525            return deserialized.next_link or None, AsyncList(list_of_elem)
526
527        async def get_next(next_link=None):
528            request = prepare_request(next_link)
529
530            pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
531            response = pipeline_response.http_response
532
533            if response.status_code not in [200]:
534                map_error(status_code=response.status_code, response=response, error_map=error_map)
535                raise HttpResponseError(response=response, error_format=ARMErrorFormat)
536
537            return pipeline_response
538
539        return AsyncItemPaged(
540            get_next, extract_data
541        )
542    list_by_resource_group.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks'}  # type: ignore
543
544    def list(
545        self,
546        **kwargs: Any
547    ) -> AsyncIterable["_models.DiskList"]:
548        """Lists all the disks under a subscription.
549
550        :keyword callable cls: A custom type or function that will be passed the direct response
551        :return: An iterator like instance of either DiskList or the result of cls(response)
552        :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.compute.v2018_04_01.models.DiskList]
553        :raises: ~azure.core.exceptions.HttpResponseError
554        """
555        cls = kwargs.pop('cls', None)  # type: ClsType["_models.DiskList"]
556        error_map = {
557            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
558        }
559        error_map.update(kwargs.pop('error_map', {}))
560        api_version = "2018-04-01"
561        accept = "application/json"
562
563        def prepare_request(next_link=None):
564            # Construct headers
565            header_parameters = {}  # type: Dict[str, Any]
566            header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
567
568            if not next_link:
569                # Construct URL
570                url = self.list.metadata['url']  # type: ignore
571                path_format_arguments = {
572                    'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
573                }
574                url = self._client.format_url(url, **path_format_arguments)
575                # Construct parameters
576                query_parameters = {}  # type: Dict[str, Any]
577                query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
578
579                request = self._client.get(url, query_parameters, header_parameters)
580            else:
581                url = next_link
582                query_parameters = {}  # type: Dict[str, Any]
583                request = self._client.get(url, query_parameters, header_parameters)
584            return request
585
586        async def extract_data(pipeline_response):
587            deserialized = self._deserialize('DiskList', pipeline_response)
588            list_of_elem = deserialized.value
589            if cls:
590                list_of_elem = cls(list_of_elem)
591            return deserialized.next_link or None, AsyncList(list_of_elem)
592
593        async def get_next(next_link=None):
594            request = prepare_request(next_link)
595
596            pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
597            response = pipeline_response.http_response
598
599            if response.status_code not in [200]:
600                map_error(status_code=response.status_code, response=response, error_map=error_map)
601                raise HttpResponseError(response=response, error_format=ARMErrorFormat)
602
603            return pipeline_response
604
605        return AsyncItemPaged(
606            get_next, extract_data
607        )
608    list.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/disks'}  # type: ignore
609
610    async def _grant_access_initial(
611        self,
612        resource_group_name: str,
613        disk_name: str,
614        grant_access_data: "_models.GrantAccessData",
615        **kwargs: Any
616    ) -> Optional["_models.AccessUri"]:
617        cls = kwargs.pop('cls', None)  # type: ClsType[Optional["_models.AccessUri"]]
618        error_map = {
619            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
620        }
621        error_map.update(kwargs.pop('error_map', {}))
622        api_version = "2018-04-01"
623        content_type = kwargs.pop("content_type", "application/json")
624        accept = "application/json"
625
626        # Construct URL
627        url = self._grant_access_initial.metadata['url']  # type: ignore
628        path_format_arguments = {
629            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
630            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
631            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
632        }
633        url = self._client.format_url(url, **path_format_arguments)
634
635        # Construct parameters
636        query_parameters = {}  # type: Dict[str, Any]
637        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
638
639        # Construct headers
640        header_parameters = {}  # type: Dict[str, Any]
641        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
642        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
643
644        body_content_kwargs = {}  # type: Dict[str, Any]
645        body_content = self._serialize.body(grant_access_data, 'GrantAccessData')
646        body_content_kwargs['content'] = body_content
647        request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
648        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
649        response = pipeline_response.http_response
650
651        if response.status_code not in [200, 202]:
652            map_error(status_code=response.status_code, response=response, error_map=error_map)
653            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
654
655        deserialized = None
656        if response.status_code == 200:
657            deserialized = self._deserialize('AccessUri', pipeline_response)
658
659        if cls:
660            return cls(pipeline_response, deserialized, {})
661
662        return deserialized
663    _grant_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}/beginGetAccess'}  # type: ignore
664
665    async def begin_grant_access(
666        self,
667        resource_group_name: str,
668        disk_name: str,
669        grant_access_data: "_models.GrantAccessData",
670        **kwargs: Any
671    ) -> AsyncLROPoller["_models.AccessUri"]:
672        """Grants access to a disk.
673
674        :param resource_group_name: The name of the resource group.
675        :type resource_group_name: str
676        :param disk_name: The name of the managed disk that is being created. The name can't be changed
677         after the disk is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The
678         maximum name length is 80 characters.
679        :type disk_name: str
680        :param grant_access_data: Access data object supplied in the body of the get disk access
681         operation.
682        :type grant_access_data: ~azure.mgmt.compute.v2018_04_01.models.GrantAccessData
683        :keyword callable cls: A custom type or function that will be passed the direct response
684        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
685        :keyword polling: By default, your polling method will be AsyncARMPolling.
686         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
687        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
688        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
689        :return: An instance of AsyncLROPoller that returns either AccessUri or the result of cls(response)
690        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2018_04_01.models.AccessUri]
691        :raises ~azure.core.exceptions.HttpResponseError:
692        """
693        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
694        cls = kwargs.pop('cls', None)  # type: ClsType["_models.AccessUri"]
695        lro_delay = kwargs.pop(
696            'polling_interval',
697            self._config.polling_interval
698        )
699        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
700        if cont_token is None:
701            raw_result = await self._grant_access_initial(
702                resource_group_name=resource_group_name,
703                disk_name=disk_name,
704                grant_access_data=grant_access_data,
705                cls=lambda x,y,z: x,
706                **kwargs
707            )
708
709        kwargs.pop('error_map', None)
710        kwargs.pop('content_type', None)
711
712        def get_long_running_output(pipeline_response):
713            deserialized = self._deserialize('AccessUri', pipeline_response)
714
715            if cls:
716                return cls(pipeline_response, deserialized, {})
717            return deserialized
718
719        path_format_arguments = {
720            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
721            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
722            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
723        }
724
725        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments,  **kwargs)
726        elif polling is False: polling_method = AsyncNoPolling()
727        else: polling_method = polling
728        if cont_token:
729            return AsyncLROPoller.from_continuation_token(
730                polling_method=polling_method,
731                continuation_token=cont_token,
732                client=self._client,
733                deserialization_callback=get_long_running_output
734            )
735        else:
736            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
737    begin_grant_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}/beginGetAccess'}  # type: ignore
738
739    async def _revoke_access_initial(
740        self,
741        resource_group_name: str,
742        disk_name: str,
743        **kwargs: Any
744    ) -> None:
745        cls = kwargs.pop('cls', None)  # type: ClsType[None]
746        error_map = {
747            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
748        }
749        error_map.update(kwargs.pop('error_map', {}))
750        api_version = "2018-04-01"
751
752        # Construct URL
753        url = self._revoke_access_initial.metadata['url']  # type: ignore
754        path_format_arguments = {
755            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
756            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
757            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
758        }
759        url = self._client.format_url(url, **path_format_arguments)
760
761        # Construct parameters
762        query_parameters = {}  # type: Dict[str, Any]
763        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
764
765        # Construct headers
766        header_parameters = {}  # type: Dict[str, Any]
767
768        request = self._client.post(url, query_parameters, header_parameters)
769        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
770        response = pipeline_response.http_response
771
772        if response.status_code not in [200, 202]:
773            map_error(status_code=response.status_code, response=response, error_map=error_map)
774            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
775
776        if cls:
777            return cls(pipeline_response, None, {})
778
779    _revoke_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}/endGetAccess'}  # type: ignore
780
781    async def begin_revoke_access(
782        self,
783        resource_group_name: str,
784        disk_name: str,
785        **kwargs: Any
786    ) -> AsyncLROPoller[None]:
787        """Revokes access to a disk.
788
789        :param resource_group_name: The name of the resource group.
790        :type resource_group_name: str
791        :param disk_name: The name of the managed disk that is being created. The name can't be changed
792         after the disk is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The
793         maximum name length is 80 characters.
794        :type disk_name: str
795        :keyword callable cls: A custom type or function that will be passed the direct response
796        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
797        :keyword polling: By default, your polling method will be AsyncARMPolling.
798         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
799        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
800        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
801        :return: An instance of AsyncLROPoller that returns either None or the result of cls(response)
802        :rtype: ~azure.core.polling.AsyncLROPoller[None]
803        :raises ~azure.core.exceptions.HttpResponseError:
804        """
805        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
806        cls = kwargs.pop('cls', None)  # type: ClsType[None]
807        lro_delay = kwargs.pop(
808            'polling_interval',
809            self._config.polling_interval
810        )
811        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
812        if cont_token is None:
813            raw_result = await self._revoke_access_initial(
814                resource_group_name=resource_group_name,
815                disk_name=disk_name,
816                cls=lambda x,y,z: x,
817                **kwargs
818            )
819
820        kwargs.pop('error_map', None)
821        kwargs.pop('content_type', None)
822
823        def get_long_running_output(pipeline_response):
824            if cls:
825                return cls(pipeline_response, None, {})
826
827        path_format_arguments = {
828            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
829            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
830            'diskName': self._serialize.url("disk_name", disk_name, 'str'),
831        }
832
833        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments,  **kwargs)
834        elif polling is False: polling_method = AsyncNoPolling()
835        else: polling_method = polling
836        if cont_token:
837            return AsyncLROPoller.from_continuation_token(
838                polling_method=polling_method,
839                continuation_token=cont_token,
840                client=self._client,
841                deserialization_callback=get_long_running_output
842            )
843        else:
844            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
845    begin_revoke_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/disks/{diskName}/endGetAccess'}  # type: ignore
846