1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar, Union 9import warnings 10 11from azure.core.async_paging import AsyncItemPaged, AsyncList 12from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 13from azure.core.pipeline import PipelineResponse 14from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest 15from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod 16from azure.mgmt.core.exceptions import ARMErrorFormat 17from azure.mgmt.core.polling.async_arm_polling import AsyncARMPolling 18 19from ... import models as _models 20 21T = TypeVar('T') 22ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]] 23 24class SnapshotsOperations: 25 """SnapshotsOperations async operations. 26 27 You should not instantiate this class directly. Instead, you should create a Client instance that 28 instantiates it for you and attaches it as an attribute. 29 30 :ivar models: Alias to model classes used in this operation group. 31 :type models: ~azure.mgmt.compute.v2020_06_30.models 32 :param client: Client for service requests. 33 :param config: Configuration of service client. 34 :param serializer: An object model serializer. 35 :param deserializer: An object model deserializer. 36 """ 37 38 models = _models 39 40 def __init__(self, client, config, serializer, deserializer) -> None: 41 self._client = client 42 self._serialize = serializer 43 self._deserialize = deserializer 44 self._config = config 45 46 async def _create_or_update_initial( 47 self, 48 resource_group_name: str, 49 snapshot_name: str, 50 snapshot: "_models.Snapshot", 51 **kwargs: Any 52 ) -> "_models.Snapshot": 53 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 54 error_map = { 55 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 56 } 57 error_map.update(kwargs.pop('error_map', {})) 58 api_version = "2020-06-30" 59 content_type = kwargs.pop("content_type", "application/json") 60 accept = "application/json" 61 62 # Construct URL 63 url = self._create_or_update_initial.metadata['url'] # type: ignore 64 path_format_arguments = { 65 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 66 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 67 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 68 } 69 url = self._client.format_url(url, **path_format_arguments) 70 71 # Construct parameters 72 query_parameters = {} # type: Dict[str, Any] 73 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 74 75 # Construct headers 76 header_parameters = {} # type: Dict[str, Any] 77 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 78 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 79 80 body_content_kwargs = {} # type: Dict[str, Any] 81 body_content = self._serialize.body(snapshot, 'Snapshot') 82 body_content_kwargs['content'] = body_content 83 request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs) 84 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 85 response = pipeline_response.http_response 86 87 if response.status_code not in [200, 202]: 88 map_error(status_code=response.status_code, response=response, error_map=error_map) 89 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 90 91 if response.status_code == 200: 92 deserialized = self._deserialize('Snapshot', pipeline_response) 93 94 if response.status_code == 202: 95 deserialized = self._deserialize('Snapshot', pipeline_response) 96 97 if cls: 98 return cls(pipeline_response, deserialized, {}) 99 100 return deserialized 101 _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 102 103 async def begin_create_or_update( 104 self, 105 resource_group_name: str, 106 snapshot_name: str, 107 snapshot: "_models.Snapshot", 108 **kwargs: Any 109 ) -> AsyncLROPoller["_models.Snapshot"]: 110 """Creates or updates a snapshot. 111 112 :param resource_group_name: The name of the resource group. 113 :type resource_group_name: str 114 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 115 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 116 max name length is 80 characters. 117 :type snapshot_name: str 118 :param snapshot: Snapshot object supplied in the body of the Put disk operation. 119 :type snapshot: ~azure.mgmt.compute.v2020_06_30.models.Snapshot 120 :keyword callable cls: A custom type or function that will be passed the direct response 121 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 122 :keyword polling: By default, your polling method will be AsyncARMPolling. 123 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 124 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 125 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 126 :return: An instance of AsyncLROPoller that returns either Snapshot or the result of cls(response) 127 :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2020_06_30.models.Snapshot] 128 :raises ~azure.core.exceptions.HttpResponseError: 129 """ 130 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 131 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 132 lro_delay = kwargs.pop( 133 'polling_interval', 134 self._config.polling_interval 135 ) 136 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 137 if cont_token is None: 138 raw_result = await self._create_or_update_initial( 139 resource_group_name=resource_group_name, 140 snapshot_name=snapshot_name, 141 snapshot=snapshot, 142 cls=lambda x,y,z: x, 143 **kwargs 144 ) 145 146 kwargs.pop('error_map', None) 147 kwargs.pop('content_type', None) 148 149 def get_long_running_output(pipeline_response): 150 deserialized = self._deserialize('Snapshot', pipeline_response) 151 152 if cls: 153 return cls(pipeline_response, deserialized, {}) 154 return deserialized 155 156 path_format_arguments = { 157 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 158 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 159 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 160 } 161 162 if polling is True: polling_method = AsyncARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 163 elif polling is False: polling_method = AsyncNoPolling() 164 else: polling_method = polling 165 if cont_token: 166 return AsyncLROPoller.from_continuation_token( 167 polling_method=polling_method, 168 continuation_token=cont_token, 169 client=self._client, 170 deserialization_callback=get_long_running_output 171 ) 172 else: 173 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 174 begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 175 176 async def _update_initial( 177 self, 178 resource_group_name: str, 179 snapshot_name: str, 180 snapshot: "_models.SnapshotUpdate", 181 **kwargs: Any 182 ) -> "_models.Snapshot": 183 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 184 error_map = { 185 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 186 } 187 error_map.update(kwargs.pop('error_map', {})) 188 api_version = "2020-06-30" 189 content_type = kwargs.pop("content_type", "application/json") 190 accept = "application/json" 191 192 # Construct URL 193 url = self._update_initial.metadata['url'] # type: ignore 194 path_format_arguments = { 195 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 196 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 197 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 198 } 199 url = self._client.format_url(url, **path_format_arguments) 200 201 # Construct parameters 202 query_parameters = {} # type: Dict[str, Any] 203 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 204 205 # Construct headers 206 header_parameters = {} # type: Dict[str, Any] 207 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 208 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 209 210 body_content_kwargs = {} # type: Dict[str, Any] 211 body_content = self._serialize.body(snapshot, 'SnapshotUpdate') 212 body_content_kwargs['content'] = body_content 213 request = self._client.patch(url, query_parameters, header_parameters, **body_content_kwargs) 214 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 215 response = pipeline_response.http_response 216 217 if response.status_code not in [200, 202]: 218 map_error(status_code=response.status_code, response=response, error_map=error_map) 219 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 220 221 if response.status_code == 200: 222 deserialized = self._deserialize('Snapshot', pipeline_response) 223 224 if response.status_code == 202: 225 deserialized = self._deserialize('Snapshot', pipeline_response) 226 227 if cls: 228 return cls(pipeline_response, deserialized, {}) 229 230 return deserialized 231 _update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 232 233 async def begin_update( 234 self, 235 resource_group_name: str, 236 snapshot_name: str, 237 snapshot: "_models.SnapshotUpdate", 238 **kwargs: Any 239 ) -> AsyncLROPoller["_models.Snapshot"]: 240 """Updates (patches) a snapshot. 241 242 :param resource_group_name: The name of the resource group. 243 :type resource_group_name: str 244 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 245 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 246 max name length is 80 characters. 247 :type snapshot_name: str 248 :param snapshot: Snapshot object supplied in the body of the Patch snapshot operation. 249 :type snapshot: ~azure.mgmt.compute.v2020_06_30.models.SnapshotUpdate 250 :keyword callable cls: A custom type or function that will be passed the direct response 251 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 252 :keyword polling: By default, your polling method will be AsyncARMPolling. 253 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 254 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 255 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 256 :return: An instance of AsyncLROPoller that returns either Snapshot or the result of cls(response) 257 :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2020_06_30.models.Snapshot] 258 :raises ~azure.core.exceptions.HttpResponseError: 259 """ 260 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 261 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 262 lro_delay = kwargs.pop( 263 'polling_interval', 264 self._config.polling_interval 265 ) 266 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 267 if cont_token is None: 268 raw_result = await self._update_initial( 269 resource_group_name=resource_group_name, 270 snapshot_name=snapshot_name, 271 snapshot=snapshot, 272 cls=lambda x,y,z: x, 273 **kwargs 274 ) 275 276 kwargs.pop('error_map', None) 277 kwargs.pop('content_type', None) 278 279 def get_long_running_output(pipeline_response): 280 deserialized = self._deserialize('Snapshot', pipeline_response) 281 282 if cls: 283 return cls(pipeline_response, deserialized, {}) 284 return deserialized 285 286 path_format_arguments = { 287 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 288 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 289 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 290 } 291 292 if polling is True: polling_method = AsyncARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 293 elif polling is False: polling_method = AsyncNoPolling() 294 else: polling_method = polling 295 if cont_token: 296 return AsyncLROPoller.from_continuation_token( 297 polling_method=polling_method, 298 continuation_token=cont_token, 299 client=self._client, 300 deserialization_callback=get_long_running_output 301 ) 302 else: 303 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 304 begin_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 305 306 async def get( 307 self, 308 resource_group_name: str, 309 snapshot_name: str, 310 **kwargs: Any 311 ) -> "_models.Snapshot": 312 """Gets information about a snapshot. 313 314 :param resource_group_name: The name of the resource group. 315 :type resource_group_name: str 316 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 317 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 318 max name length is 80 characters. 319 :type snapshot_name: str 320 :keyword callable cls: A custom type or function that will be passed the direct response 321 :return: Snapshot, or the result of cls(response) 322 :rtype: ~azure.mgmt.compute.v2020_06_30.models.Snapshot 323 :raises: ~azure.core.exceptions.HttpResponseError 324 """ 325 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 326 error_map = { 327 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 328 } 329 error_map.update(kwargs.pop('error_map', {})) 330 api_version = "2020-06-30" 331 accept = "application/json" 332 333 # Construct URL 334 url = self.get.metadata['url'] # type: ignore 335 path_format_arguments = { 336 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 337 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 338 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 339 } 340 url = self._client.format_url(url, **path_format_arguments) 341 342 # Construct parameters 343 query_parameters = {} # type: Dict[str, Any] 344 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 345 346 # Construct headers 347 header_parameters = {} # type: Dict[str, Any] 348 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 349 350 request = self._client.get(url, query_parameters, header_parameters) 351 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 352 response = pipeline_response.http_response 353 354 if response.status_code not in [200]: 355 map_error(status_code=response.status_code, response=response, error_map=error_map) 356 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 357 358 deserialized = self._deserialize('Snapshot', pipeline_response) 359 360 if cls: 361 return cls(pipeline_response, deserialized, {}) 362 363 return deserialized 364 get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 365 366 async def _delete_initial( 367 self, 368 resource_group_name: str, 369 snapshot_name: str, 370 **kwargs: Any 371 ) -> None: 372 cls = kwargs.pop('cls', None) # type: ClsType[None] 373 error_map = { 374 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 375 } 376 error_map.update(kwargs.pop('error_map', {})) 377 api_version = "2020-06-30" 378 379 # Construct URL 380 url = self._delete_initial.metadata['url'] # type: ignore 381 path_format_arguments = { 382 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 383 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 384 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 385 } 386 url = self._client.format_url(url, **path_format_arguments) 387 388 # Construct parameters 389 query_parameters = {} # type: Dict[str, Any] 390 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 391 392 # Construct headers 393 header_parameters = {} # type: Dict[str, Any] 394 395 request = self._client.delete(url, query_parameters, header_parameters) 396 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 397 response = pipeline_response.http_response 398 399 if response.status_code not in [200, 202, 204]: 400 map_error(status_code=response.status_code, response=response, error_map=error_map) 401 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 402 403 if cls: 404 return cls(pipeline_response, None, {}) 405 406 _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 407 408 async def begin_delete( 409 self, 410 resource_group_name: str, 411 snapshot_name: str, 412 **kwargs: Any 413 ) -> AsyncLROPoller[None]: 414 """Deletes a snapshot. 415 416 :param resource_group_name: The name of the resource group. 417 :type resource_group_name: str 418 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 419 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 420 max name length is 80 characters. 421 :type snapshot_name: str 422 :keyword callable cls: A custom type or function that will be passed the direct response 423 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 424 :keyword polling: By default, your polling method will be AsyncARMPolling. 425 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 426 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 427 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 428 :return: An instance of AsyncLROPoller that returns either None or the result of cls(response) 429 :rtype: ~azure.core.polling.AsyncLROPoller[None] 430 :raises ~azure.core.exceptions.HttpResponseError: 431 """ 432 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 433 cls = kwargs.pop('cls', None) # type: ClsType[None] 434 lro_delay = kwargs.pop( 435 'polling_interval', 436 self._config.polling_interval 437 ) 438 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 439 if cont_token is None: 440 raw_result = await self._delete_initial( 441 resource_group_name=resource_group_name, 442 snapshot_name=snapshot_name, 443 cls=lambda x,y,z: x, 444 **kwargs 445 ) 446 447 kwargs.pop('error_map', None) 448 kwargs.pop('content_type', None) 449 450 def get_long_running_output(pipeline_response): 451 if cls: 452 return cls(pipeline_response, None, {}) 453 454 path_format_arguments = { 455 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 456 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 457 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 458 } 459 460 if polling is True: polling_method = AsyncARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 461 elif polling is False: polling_method = AsyncNoPolling() 462 else: polling_method = polling 463 if cont_token: 464 return AsyncLROPoller.from_continuation_token( 465 polling_method=polling_method, 466 continuation_token=cont_token, 467 client=self._client, 468 deserialization_callback=get_long_running_output 469 ) 470 else: 471 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 472 begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 473 474 def list_by_resource_group( 475 self, 476 resource_group_name: str, 477 **kwargs: Any 478 ) -> AsyncIterable["_models.SnapshotList"]: 479 """Lists snapshots under a resource group. 480 481 :param resource_group_name: The name of the resource group. 482 :type resource_group_name: str 483 :keyword callable cls: A custom type or function that will be passed the direct response 484 :return: An iterator like instance of either SnapshotList or the result of cls(response) 485 :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.compute.v2020_06_30.models.SnapshotList] 486 :raises: ~azure.core.exceptions.HttpResponseError 487 """ 488 cls = kwargs.pop('cls', None) # type: ClsType["_models.SnapshotList"] 489 error_map = { 490 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 491 } 492 error_map.update(kwargs.pop('error_map', {})) 493 api_version = "2020-06-30" 494 accept = "application/json" 495 496 def prepare_request(next_link=None): 497 # Construct headers 498 header_parameters = {} # type: Dict[str, Any] 499 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 500 501 if not next_link: 502 # Construct URL 503 url = self.list_by_resource_group.metadata['url'] # type: ignore 504 path_format_arguments = { 505 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 506 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 507 } 508 url = self._client.format_url(url, **path_format_arguments) 509 # Construct parameters 510 query_parameters = {} # type: Dict[str, Any] 511 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 512 513 request = self._client.get(url, query_parameters, header_parameters) 514 else: 515 url = next_link 516 query_parameters = {} # type: Dict[str, Any] 517 request = self._client.get(url, query_parameters, header_parameters) 518 return request 519 520 async def extract_data(pipeline_response): 521 deserialized = self._deserialize('SnapshotList', pipeline_response) 522 list_of_elem = deserialized.value 523 if cls: 524 list_of_elem = cls(list_of_elem) 525 return deserialized.next_link or None, AsyncList(list_of_elem) 526 527 async def get_next(next_link=None): 528 request = prepare_request(next_link) 529 530 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 531 response = pipeline_response.http_response 532 533 if response.status_code not in [200]: 534 map_error(status_code=response.status_code, response=response, error_map=error_map) 535 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 536 537 return pipeline_response 538 539 return AsyncItemPaged( 540 get_next, extract_data 541 ) 542 list_by_resource_group.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots'} # type: ignore 543 544 def list( 545 self, 546 **kwargs: Any 547 ) -> AsyncIterable["_models.SnapshotList"]: 548 """Lists snapshots under a subscription. 549 550 :keyword callable cls: A custom type or function that will be passed the direct response 551 :return: An iterator like instance of either SnapshotList or the result of cls(response) 552 :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.compute.v2020_06_30.models.SnapshotList] 553 :raises: ~azure.core.exceptions.HttpResponseError 554 """ 555 cls = kwargs.pop('cls', None) # type: ClsType["_models.SnapshotList"] 556 error_map = { 557 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 558 } 559 error_map.update(kwargs.pop('error_map', {})) 560 api_version = "2020-06-30" 561 accept = "application/json" 562 563 def prepare_request(next_link=None): 564 # Construct headers 565 header_parameters = {} # type: Dict[str, Any] 566 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 567 568 if not next_link: 569 # Construct URL 570 url = self.list.metadata['url'] # type: ignore 571 path_format_arguments = { 572 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 573 } 574 url = self._client.format_url(url, **path_format_arguments) 575 # Construct parameters 576 query_parameters = {} # type: Dict[str, Any] 577 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 578 579 request = self._client.get(url, query_parameters, header_parameters) 580 else: 581 url = next_link 582 query_parameters = {} # type: Dict[str, Any] 583 request = self._client.get(url, query_parameters, header_parameters) 584 return request 585 586 async def extract_data(pipeline_response): 587 deserialized = self._deserialize('SnapshotList', pipeline_response) 588 list_of_elem = deserialized.value 589 if cls: 590 list_of_elem = cls(list_of_elem) 591 return deserialized.next_link or None, AsyncList(list_of_elem) 592 593 async def get_next(next_link=None): 594 request = prepare_request(next_link) 595 596 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 597 response = pipeline_response.http_response 598 599 if response.status_code not in [200]: 600 map_error(status_code=response.status_code, response=response, error_map=error_map) 601 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 602 603 return pipeline_response 604 605 return AsyncItemPaged( 606 get_next, extract_data 607 ) 608 list.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/snapshots'} # type: ignore 609 610 async def _grant_access_initial( 611 self, 612 resource_group_name: str, 613 snapshot_name: str, 614 grant_access_data: "_models.GrantAccessData", 615 **kwargs: Any 616 ) -> Optional["_models.AccessUri"]: 617 cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.AccessUri"]] 618 error_map = { 619 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 620 } 621 error_map.update(kwargs.pop('error_map', {})) 622 api_version = "2020-06-30" 623 content_type = kwargs.pop("content_type", "application/json") 624 accept = "application/json" 625 626 # Construct URL 627 url = self._grant_access_initial.metadata['url'] # type: ignore 628 path_format_arguments = { 629 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 630 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 631 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 632 } 633 url = self._client.format_url(url, **path_format_arguments) 634 635 # Construct parameters 636 query_parameters = {} # type: Dict[str, Any] 637 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 638 639 # Construct headers 640 header_parameters = {} # type: Dict[str, Any] 641 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 642 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 643 644 body_content_kwargs = {} # type: Dict[str, Any] 645 body_content = self._serialize.body(grant_access_data, 'GrantAccessData') 646 body_content_kwargs['content'] = body_content 647 request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs) 648 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 649 response = pipeline_response.http_response 650 651 if response.status_code not in [200, 202]: 652 map_error(status_code=response.status_code, response=response, error_map=error_map) 653 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 654 655 deserialized = None 656 if response.status_code == 200: 657 deserialized = self._deserialize('AccessUri', pipeline_response) 658 659 if cls: 660 return cls(pipeline_response, deserialized, {}) 661 662 return deserialized 663 _grant_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/beginGetAccess'} # type: ignore 664 665 async def begin_grant_access( 666 self, 667 resource_group_name: str, 668 snapshot_name: str, 669 grant_access_data: "_models.GrantAccessData", 670 **kwargs: Any 671 ) -> AsyncLROPoller["_models.AccessUri"]: 672 """Grants access to a snapshot. 673 674 :param resource_group_name: The name of the resource group. 675 :type resource_group_name: str 676 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 677 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 678 max name length is 80 characters. 679 :type snapshot_name: str 680 :param grant_access_data: Access data object supplied in the body of the get snapshot access 681 operation. 682 :type grant_access_data: ~azure.mgmt.compute.v2020_06_30.models.GrantAccessData 683 :keyword callable cls: A custom type or function that will be passed the direct response 684 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 685 :keyword polling: By default, your polling method will be AsyncARMPolling. 686 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 687 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 688 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 689 :return: An instance of AsyncLROPoller that returns either AccessUri or the result of cls(response) 690 :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2020_06_30.models.AccessUri] 691 :raises ~azure.core.exceptions.HttpResponseError: 692 """ 693 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 694 cls = kwargs.pop('cls', None) # type: ClsType["_models.AccessUri"] 695 lro_delay = kwargs.pop( 696 'polling_interval', 697 self._config.polling_interval 698 ) 699 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 700 if cont_token is None: 701 raw_result = await self._grant_access_initial( 702 resource_group_name=resource_group_name, 703 snapshot_name=snapshot_name, 704 grant_access_data=grant_access_data, 705 cls=lambda x,y,z: x, 706 **kwargs 707 ) 708 709 kwargs.pop('error_map', None) 710 kwargs.pop('content_type', None) 711 712 def get_long_running_output(pipeline_response): 713 deserialized = self._deserialize('AccessUri', pipeline_response) 714 715 if cls: 716 return cls(pipeline_response, deserialized, {}) 717 return deserialized 718 719 path_format_arguments = { 720 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 721 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 722 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 723 } 724 725 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 726 elif polling is False: polling_method = AsyncNoPolling() 727 else: polling_method = polling 728 if cont_token: 729 return AsyncLROPoller.from_continuation_token( 730 polling_method=polling_method, 731 continuation_token=cont_token, 732 client=self._client, 733 deserialization_callback=get_long_running_output 734 ) 735 else: 736 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 737 begin_grant_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/beginGetAccess'} # type: ignore 738 739 async def _revoke_access_initial( 740 self, 741 resource_group_name: str, 742 snapshot_name: str, 743 **kwargs: Any 744 ) -> None: 745 cls = kwargs.pop('cls', None) # type: ClsType[None] 746 error_map = { 747 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 748 } 749 error_map.update(kwargs.pop('error_map', {})) 750 api_version = "2020-06-30" 751 752 # Construct URL 753 url = self._revoke_access_initial.metadata['url'] # type: ignore 754 path_format_arguments = { 755 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 756 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 757 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 758 } 759 url = self._client.format_url(url, **path_format_arguments) 760 761 # Construct parameters 762 query_parameters = {} # type: Dict[str, Any] 763 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 764 765 # Construct headers 766 header_parameters = {} # type: Dict[str, Any] 767 768 request = self._client.post(url, query_parameters, header_parameters) 769 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 770 response = pipeline_response.http_response 771 772 if response.status_code not in [200, 202]: 773 map_error(status_code=response.status_code, response=response, error_map=error_map) 774 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 775 776 if cls: 777 return cls(pipeline_response, None, {}) 778 779 _revoke_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/endGetAccess'} # type: ignore 780 781 async def begin_revoke_access( 782 self, 783 resource_group_name: str, 784 snapshot_name: str, 785 **kwargs: Any 786 ) -> AsyncLROPoller[None]: 787 """Revokes access to a snapshot. 788 789 :param resource_group_name: The name of the resource group. 790 :type resource_group_name: str 791 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 792 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 793 max name length is 80 characters. 794 :type snapshot_name: str 795 :keyword callable cls: A custom type or function that will be passed the direct response 796 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 797 :keyword polling: By default, your polling method will be AsyncARMPolling. 798 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 799 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 800 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 801 :return: An instance of AsyncLROPoller that returns either None or the result of cls(response) 802 :rtype: ~azure.core.polling.AsyncLROPoller[None] 803 :raises ~azure.core.exceptions.HttpResponseError: 804 """ 805 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 806 cls = kwargs.pop('cls', None) # type: ClsType[None] 807 lro_delay = kwargs.pop( 808 'polling_interval', 809 self._config.polling_interval 810 ) 811 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 812 if cont_token is None: 813 raw_result = await self._revoke_access_initial( 814 resource_group_name=resource_group_name, 815 snapshot_name=snapshot_name, 816 cls=lambda x,y,z: x, 817 **kwargs 818 ) 819 820 kwargs.pop('error_map', None) 821 kwargs.pop('content_type', None) 822 823 def get_long_running_output(pipeline_response): 824 if cls: 825 return cls(pipeline_response, None, {}) 826 827 path_format_arguments = { 828 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 829 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 830 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 831 } 832 833 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 834 elif polling is False: polling_method = AsyncNoPolling() 835 else: polling_method = polling 836 if cont_token: 837 return AsyncLROPoller.from_continuation_token( 838 polling_method=polling_method, 839 continuation_token=cont_token, 840 client=self._client, 841 deserialization_callback=get_long_running_output 842 ) 843 else: 844 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 845 begin_revoke_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/endGetAccess'} # type: ignore 846