1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import TYPE_CHECKING 9import warnings 10 11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 12from azure.core.paging import ItemPaged 13from azure.core.pipeline import PipelineResponse 14from azure.core.pipeline.transport import HttpRequest, HttpResponse 15from azure.core.polling import LROPoller, NoPolling, PollingMethod 16from azure.mgmt.core.exceptions import ARMErrorFormat 17from azure.mgmt.core.polling.arm_polling import ARMPolling 18 19from .. import models as _models 20 21if TYPE_CHECKING: 22 # pylint: disable=unused-import,ungrouped-imports 23 from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union 24 25 T = TypeVar('T') 26 ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]] 27 28class SnapshotsOperations(object): 29 """SnapshotsOperations operations. 30 31 You should not instantiate this class directly. Instead, you should create a Client instance that 32 instantiates it for you and attaches it as an attribute. 33 34 :ivar models: Alias to model classes used in this operation group. 35 :type models: ~azure.mgmt.compute.v2020_06_30.models 36 :param client: Client for service requests. 37 :param config: Configuration of service client. 38 :param serializer: An object model serializer. 39 :param deserializer: An object model deserializer. 40 """ 41 42 models = _models 43 44 def __init__(self, client, config, serializer, deserializer): 45 self._client = client 46 self._serialize = serializer 47 self._deserialize = deserializer 48 self._config = config 49 50 def _create_or_update_initial( 51 self, 52 resource_group_name, # type: str 53 snapshot_name, # type: str 54 snapshot, # type: "_models.Snapshot" 55 **kwargs # type: Any 56 ): 57 # type: (...) -> "_models.Snapshot" 58 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 59 error_map = { 60 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 61 } 62 error_map.update(kwargs.pop('error_map', {})) 63 api_version = "2020-06-30" 64 content_type = kwargs.pop("content_type", "application/json") 65 accept = "application/json" 66 67 # Construct URL 68 url = self._create_or_update_initial.metadata['url'] # type: ignore 69 path_format_arguments = { 70 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 71 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 72 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 73 } 74 url = self._client.format_url(url, **path_format_arguments) 75 76 # Construct parameters 77 query_parameters = {} # type: Dict[str, Any] 78 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 79 80 # Construct headers 81 header_parameters = {} # type: Dict[str, Any] 82 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 83 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 84 85 body_content_kwargs = {} # type: Dict[str, Any] 86 body_content = self._serialize.body(snapshot, 'Snapshot') 87 body_content_kwargs['content'] = body_content 88 request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs) 89 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 90 response = pipeline_response.http_response 91 92 if response.status_code not in [200, 202]: 93 map_error(status_code=response.status_code, response=response, error_map=error_map) 94 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 95 96 if response.status_code == 200: 97 deserialized = self._deserialize('Snapshot', pipeline_response) 98 99 if response.status_code == 202: 100 deserialized = self._deserialize('Snapshot', pipeline_response) 101 102 if cls: 103 return cls(pipeline_response, deserialized, {}) 104 105 return deserialized 106 _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 107 108 def begin_create_or_update( 109 self, 110 resource_group_name, # type: str 111 snapshot_name, # type: str 112 snapshot, # type: "_models.Snapshot" 113 **kwargs # type: Any 114 ): 115 # type: (...) -> LROPoller["_models.Snapshot"] 116 """Creates or updates a snapshot. 117 118 :param resource_group_name: The name of the resource group. 119 :type resource_group_name: str 120 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 121 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 122 max name length is 80 characters. 123 :type snapshot_name: str 124 :param snapshot: Snapshot object supplied in the body of the Put disk operation. 125 :type snapshot: ~azure.mgmt.compute.v2020_06_30.models.Snapshot 126 :keyword callable cls: A custom type or function that will be passed the direct response 127 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 128 :keyword polling: By default, your polling method will be ARMPolling. 129 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 130 :paramtype polling: bool or ~azure.core.polling.PollingMethod 131 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 132 :return: An instance of LROPoller that returns either Snapshot or the result of cls(response) 133 :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2020_06_30.models.Snapshot] 134 :raises ~azure.core.exceptions.HttpResponseError: 135 """ 136 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 137 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 138 lro_delay = kwargs.pop( 139 'polling_interval', 140 self._config.polling_interval 141 ) 142 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 143 if cont_token is None: 144 raw_result = self._create_or_update_initial( 145 resource_group_name=resource_group_name, 146 snapshot_name=snapshot_name, 147 snapshot=snapshot, 148 cls=lambda x,y,z: x, 149 **kwargs 150 ) 151 152 kwargs.pop('error_map', None) 153 kwargs.pop('content_type', None) 154 155 def get_long_running_output(pipeline_response): 156 deserialized = self._deserialize('Snapshot', pipeline_response) 157 158 if cls: 159 return cls(pipeline_response, deserialized, {}) 160 return deserialized 161 162 path_format_arguments = { 163 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 164 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 165 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 166 } 167 168 if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 169 elif polling is False: polling_method = NoPolling() 170 else: polling_method = polling 171 if cont_token: 172 return LROPoller.from_continuation_token( 173 polling_method=polling_method, 174 continuation_token=cont_token, 175 client=self._client, 176 deserialization_callback=get_long_running_output 177 ) 178 else: 179 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 180 begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 181 182 def _update_initial( 183 self, 184 resource_group_name, # type: str 185 snapshot_name, # type: str 186 snapshot, # type: "_models.SnapshotUpdate" 187 **kwargs # type: Any 188 ): 189 # type: (...) -> "_models.Snapshot" 190 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 191 error_map = { 192 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 193 } 194 error_map.update(kwargs.pop('error_map', {})) 195 api_version = "2020-06-30" 196 content_type = kwargs.pop("content_type", "application/json") 197 accept = "application/json" 198 199 # Construct URL 200 url = self._update_initial.metadata['url'] # type: ignore 201 path_format_arguments = { 202 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 203 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 204 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 205 } 206 url = self._client.format_url(url, **path_format_arguments) 207 208 # Construct parameters 209 query_parameters = {} # type: Dict[str, Any] 210 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 211 212 # Construct headers 213 header_parameters = {} # type: Dict[str, Any] 214 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 215 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 216 217 body_content_kwargs = {} # type: Dict[str, Any] 218 body_content = self._serialize.body(snapshot, 'SnapshotUpdate') 219 body_content_kwargs['content'] = body_content 220 request = self._client.patch(url, query_parameters, header_parameters, **body_content_kwargs) 221 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 222 response = pipeline_response.http_response 223 224 if response.status_code not in [200, 202]: 225 map_error(status_code=response.status_code, response=response, error_map=error_map) 226 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 227 228 if response.status_code == 200: 229 deserialized = self._deserialize('Snapshot', pipeline_response) 230 231 if response.status_code == 202: 232 deserialized = self._deserialize('Snapshot', pipeline_response) 233 234 if cls: 235 return cls(pipeline_response, deserialized, {}) 236 237 return deserialized 238 _update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 239 240 def begin_update( 241 self, 242 resource_group_name, # type: str 243 snapshot_name, # type: str 244 snapshot, # type: "_models.SnapshotUpdate" 245 **kwargs # type: Any 246 ): 247 # type: (...) -> LROPoller["_models.Snapshot"] 248 """Updates (patches) a snapshot. 249 250 :param resource_group_name: The name of the resource group. 251 :type resource_group_name: str 252 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 253 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 254 max name length is 80 characters. 255 :type snapshot_name: str 256 :param snapshot: Snapshot object supplied in the body of the Patch snapshot operation. 257 :type snapshot: ~azure.mgmt.compute.v2020_06_30.models.SnapshotUpdate 258 :keyword callable cls: A custom type or function that will be passed the direct response 259 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 260 :keyword polling: By default, your polling method will be ARMPolling. 261 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 262 :paramtype polling: bool or ~azure.core.polling.PollingMethod 263 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 264 :return: An instance of LROPoller that returns either Snapshot or the result of cls(response) 265 :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2020_06_30.models.Snapshot] 266 :raises ~azure.core.exceptions.HttpResponseError: 267 """ 268 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 269 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 270 lro_delay = kwargs.pop( 271 'polling_interval', 272 self._config.polling_interval 273 ) 274 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 275 if cont_token is None: 276 raw_result = self._update_initial( 277 resource_group_name=resource_group_name, 278 snapshot_name=snapshot_name, 279 snapshot=snapshot, 280 cls=lambda x,y,z: x, 281 **kwargs 282 ) 283 284 kwargs.pop('error_map', None) 285 kwargs.pop('content_type', None) 286 287 def get_long_running_output(pipeline_response): 288 deserialized = self._deserialize('Snapshot', pipeline_response) 289 290 if cls: 291 return cls(pipeline_response, deserialized, {}) 292 return deserialized 293 294 path_format_arguments = { 295 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 296 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 297 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 298 } 299 300 if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 301 elif polling is False: polling_method = NoPolling() 302 else: polling_method = polling 303 if cont_token: 304 return LROPoller.from_continuation_token( 305 polling_method=polling_method, 306 continuation_token=cont_token, 307 client=self._client, 308 deserialization_callback=get_long_running_output 309 ) 310 else: 311 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 312 begin_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 313 314 def get( 315 self, 316 resource_group_name, # type: str 317 snapshot_name, # type: str 318 **kwargs # type: Any 319 ): 320 # type: (...) -> "_models.Snapshot" 321 """Gets information about a snapshot. 322 323 :param resource_group_name: The name of the resource group. 324 :type resource_group_name: str 325 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 326 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 327 max name length is 80 characters. 328 :type snapshot_name: str 329 :keyword callable cls: A custom type or function that will be passed the direct response 330 :return: Snapshot, or the result of cls(response) 331 :rtype: ~azure.mgmt.compute.v2020_06_30.models.Snapshot 332 :raises: ~azure.core.exceptions.HttpResponseError 333 """ 334 cls = kwargs.pop('cls', None) # type: ClsType["_models.Snapshot"] 335 error_map = { 336 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 337 } 338 error_map.update(kwargs.pop('error_map', {})) 339 api_version = "2020-06-30" 340 accept = "application/json" 341 342 # Construct URL 343 url = self.get.metadata['url'] # type: ignore 344 path_format_arguments = { 345 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 346 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 347 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 348 } 349 url = self._client.format_url(url, **path_format_arguments) 350 351 # Construct parameters 352 query_parameters = {} # type: Dict[str, Any] 353 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 354 355 # Construct headers 356 header_parameters = {} # type: Dict[str, Any] 357 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 358 359 request = self._client.get(url, query_parameters, header_parameters) 360 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 361 response = pipeline_response.http_response 362 363 if response.status_code not in [200]: 364 map_error(status_code=response.status_code, response=response, error_map=error_map) 365 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 366 367 deserialized = self._deserialize('Snapshot', pipeline_response) 368 369 if cls: 370 return cls(pipeline_response, deserialized, {}) 371 372 return deserialized 373 get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 374 375 def _delete_initial( 376 self, 377 resource_group_name, # type: str 378 snapshot_name, # type: str 379 **kwargs # type: Any 380 ): 381 # type: (...) -> None 382 cls = kwargs.pop('cls', None) # type: ClsType[None] 383 error_map = { 384 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 385 } 386 error_map.update(kwargs.pop('error_map', {})) 387 api_version = "2020-06-30" 388 389 # Construct URL 390 url = self._delete_initial.metadata['url'] # type: ignore 391 path_format_arguments = { 392 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 393 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 394 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 395 } 396 url = self._client.format_url(url, **path_format_arguments) 397 398 # Construct parameters 399 query_parameters = {} # type: Dict[str, Any] 400 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 401 402 # Construct headers 403 header_parameters = {} # type: Dict[str, Any] 404 405 request = self._client.delete(url, query_parameters, header_parameters) 406 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 407 response = pipeline_response.http_response 408 409 if response.status_code not in [200, 202, 204]: 410 map_error(status_code=response.status_code, response=response, error_map=error_map) 411 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 412 413 if cls: 414 return cls(pipeline_response, None, {}) 415 416 _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 417 418 def begin_delete( 419 self, 420 resource_group_name, # type: str 421 snapshot_name, # type: str 422 **kwargs # type: Any 423 ): 424 # type: (...) -> LROPoller[None] 425 """Deletes a snapshot. 426 427 :param resource_group_name: The name of the resource group. 428 :type resource_group_name: str 429 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 430 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 431 max name length is 80 characters. 432 :type snapshot_name: str 433 :keyword callable cls: A custom type or function that will be passed the direct response 434 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 435 :keyword polling: By default, your polling method will be ARMPolling. 436 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 437 :paramtype polling: bool or ~azure.core.polling.PollingMethod 438 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 439 :return: An instance of LROPoller that returns either None or the result of cls(response) 440 :rtype: ~azure.core.polling.LROPoller[None] 441 :raises ~azure.core.exceptions.HttpResponseError: 442 """ 443 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 444 cls = kwargs.pop('cls', None) # type: ClsType[None] 445 lro_delay = kwargs.pop( 446 'polling_interval', 447 self._config.polling_interval 448 ) 449 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 450 if cont_token is None: 451 raw_result = self._delete_initial( 452 resource_group_name=resource_group_name, 453 snapshot_name=snapshot_name, 454 cls=lambda x,y,z: x, 455 **kwargs 456 ) 457 458 kwargs.pop('error_map', None) 459 kwargs.pop('content_type', None) 460 461 def get_long_running_output(pipeline_response): 462 if cls: 463 return cls(pipeline_response, None, {}) 464 465 path_format_arguments = { 466 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 467 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 468 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 469 } 470 471 if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 472 elif polling is False: polling_method = NoPolling() 473 else: polling_method = polling 474 if cont_token: 475 return LROPoller.from_continuation_token( 476 polling_method=polling_method, 477 continuation_token=cont_token, 478 client=self._client, 479 deserialization_callback=get_long_running_output 480 ) 481 else: 482 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 483 begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}'} # type: ignore 484 485 def list_by_resource_group( 486 self, 487 resource_group_name, # type: str 488 **kwargs # type: Any 489 ): 490 # type: (...) -> Iterable["_models.SnapshotList"] 491 """Lists snapshots under a resource group. 492 493 :param resource_group_name: The name of the resource group. 494 :type resource_group_name: str 495 :keyword callable cls: A custom type or function that will be passed the direct response 496 :return: An iterator like instance of either SnapshotList or the result of cls(response) 497 :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.compute.v2020_06_30.models.SnapshotList] 498 :raises: ~azure.core.exceptions.HttpResponseError 499 """ 500 cls = kwargs.pop('cls', None) # type: ClsType["_models.SnapshotList"] 501 error_map = { 502 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 503 } 504 error_map.update(kwargs.pop('error_map', {})) 505 api_version = "2020-06-30" 506 accept = "application/json" 507 508 def prepare_request(next_link=None): 509 # Construct headers 510 header_parameters = {} # type: Dict[str, Any] 511 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 512 513 if not next_link: 514 # Construct URL 515 url = self.list_by_resource_group.metadata['url'] # type: ignore 516 path_format_arguments = { 517 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 518 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 519 } 520 url = self._client.format_url(url, **path_format_arguments) 521 # Construct parameters 522 query_parameters = {} # type: Dict[str, Any] 523 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 524 525 request = self._client.get(url, query_parameters, header_parameters) 526 else: 527 url = next_link 528 query_parameters = {} # type: Dict[str, Any] 529 request = self._client.get(url, query_parameters, header_parameters) 530 return request 531 532 def extract_data(pipeline_response): 533 deserialized = self._deserialize('SnapshotList', pipeline_response) 534 list_of_elem = deserialized.value 535 if cls: 536 list_of_elem = cls(list_of_elem) 537 return deserialized.next_link or None, iter(list_of_elem) 538 539 def get_next(next_link=None): 540 request = prepare_request(next_link) 541 542 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 543 response = pipeline_response.http_response 544 545 if response.status_code not in [200]: 546 map_error(status_code=response.status_code, response=response, error_map=error_map) 547 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 548 549 return pipeline_response 550 551 return ItemPaged( 552 get_next, extract_data 553 ) 554 list_by_resource_group.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots'} # type: ignore 555 556 def list( 557 self, 558 **kwargs # type: Any 559 ): 560 # type: (...) -> Iterable["_models.SnapshotList"] 561 """Lists snapshots under a subscription. 562 563 :keyword callable cls: A custom type or function that will be passed the direct response 564 :return: An iterator like instance of either SnapshotList or the result of cls(response) 565 :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.compute.v2020_06_30.models.SnapshotList] 566 :raises: ~azure.core.exceptions.HttpResponseError 567 """ 568 cls = kwargs.pop('cls', None) # type: ClsType["_models.SnapshotList"] 569 error_map = { 570 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 571 } 572 error_map.update(kwargs.pop('error_map', {})) 573 api_version = "2020-06-30" 574 accept = "application/json" 575 576 def prepare_request(next_link=None): 577 # Construct headers 578 header_parameters = {} # type: Dict[str, Any] 579 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 580 581 if not next_link: 582 # Construct URL 583 url = self.list.metadata['url'] # type: ignore 584 path_format_arguments = { 585 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 586 } 587 url = self._client.format_url(url, **path_format_arguments) 588 # Construct parameters 589 query_parameters = {} # type: Dict[str, Any] 590 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 591 592 request = self._client.get(url, query_parameters, header_parameters) 593 else: 594 url = next_link 595 query_parameters = {} # type: Dict[str, Any] 596 request = self._client.get(url, query_parameters, header_parameters) 597 return request 598 599 def extract_data(pipeline_response): 600 deserialized = self._deserialize('SnapshotList', pipeline_response) 601 list_of_elem = deserialized.value 602 if cls: 603 list_of_elem = cls(list_of_elem) 604 return deserialized.next_link or None, iter(list_of_elem) 605 606 def get_next(next_link=None): 607 request = prepare_request(next_link) 608 609 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 610 response = pipeline_response.http_response 611 612 if response.status_code not in [200]: 613 map_error(status_code=response.status_code, response=response, error_map=error_map) 614 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 615 616 return pipeline_response 617 618 return ItemPaged( 619 get_next, extract_data 620 ) 621 list.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/snapshots'} # type: ignore 622 623 def _grant_access_initial( 624 self, 625 resource_group_name, # type: str 626 snapshot_name, # type: str 627 grant_access_data, # type: "_models.GrantAccessData" 628 **kwargs # type: Any 629 ): 630 # type: (...) -> Optional["_models.AccessUri"] 631 cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.AccessUri"]] 632 error_map = { 633 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 634 } 635 error_map.update(kwargs.pop('error_map', {})) 636 api_version = "2020-06-30" 637 content_type = kwargs.pop("content_type", "application/json") 638 accept = "application/json" 639 640 # Construct URL 641 url = self._grant_access_initial.metadata['url'] # type: ignore 642 path_format_arguments = { 643 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 644 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 645 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 646 } 647 url = self._client.format_url(url, **path_format_arguments) 648 649 # Construct parameters 650 query_parameters = {} # type: Dict[str, Any] 651 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 652 653 # Construct headers 654 header_parameters = {} # type: Dict[str, Any] 655 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 656 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 657 658 body_content_kwargs = {} # type: Dict[str, Any] 659 body_content = self._serialize.body(grant_access_data, 'GrantAccessData') 660 body_content_kwargs['content'] = body_content 661 request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs) 662 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 663 response = pipeline_response.http_response 664 665 if response.status_code not in [200, 202]: 666 map_error(status_code=response.status_code, response=response, error_map=error_map) 667 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 668 669 deserialized = None 670 if response.status_code == 200: 671 deserialized = self._deserialize('AccessUri', pipeline_response) 672 673 if cls: 674 return cls(pipeline_response, deserialized, {}) 675 676 return deserialized 677 _grant_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/beginGetAccess'} # type: ignore 678 679 def begin_grant_access( 680 self, 681 resource_group_name, # type: str 682 snapshot_name, # type: str 683 grant_access_data, # type: "_models.GrantAccessData" 684 **kwargs # type: Any 685 ): 686 # type: (...) -> LROPoller["_models.AccessUri"] 687 """Grants access to a snapshot. 688 689 :param resource_group_name: The name of the resource group. 690 :type resource_group_name: str 691 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 692 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 693 max name length is 80 characters. 694 :type snapshot_name: str 695 :param grant_access_data: Access data object supplied in the body of the get snapshot access 696 operation. 697 :type grant_access_data: ~azure.mgmt.compute.v2020_06_30.models.GrantAccessData 698 :keyword callable cls: A custom type or function that will be passed the direct response 699 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 700 :keyword polling: By default, your polling method will be ARMPolling. 701 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 702 :paramtype polling: bool or ~azure.core.polling.PollingMethod 703 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 704 :return: An instance of LROPoller that returns either AccessUri or the result of cls(response) 705 :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.compute.v2020_06_30.models.AccessUri] 706 :raises ~azure.core.exceptions.HttpResponseError: 707 """ 708 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 709 cls = kwargs.pop('cls', None) # type: ClsType["_models.AccessUri"] 710 lro_delay = kwargs.pop( 711 'polling_interval', 712 self._config.polling_interval 713 ) 714 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 715 if cont_token is None: 716 raw_result = self._grant_access_initial( 717 resource_group_name=resource_group_name, 718 snapshot_name=snapshot_name, 719 grant_access_data=grant_access_data, 720 cls=lambda x,y,z: x, 721 **kwargs 722 ) 723 724 kwargs.pop('error_map', None) 725 kwargs.pop('content_type', None) 726 727 def get_long_running_output(pipeline_response): 728 deserialized = self._deserialize('AccessUri', pipeline_response) 729 730 if cls: 731 return cls(pipeline_response, deserialized, {}) 732 return deserialized 733 734 path_format_arguments = { 735 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 736 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 737 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 738 } 739 740 if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 741 elif polling is False: polling_method = NoPolling() 742 else: polling_method = polling 743 if cont_token: 744 return LROPoller.from_continuation_token( 745 polling_method=polling_method, 746 continuation_token=cont_token, 747 client=self._client, 748 deserialization_callback=get_long_running_output 749 ) 750 else: 751 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 752 begin_grant_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/beginGetAccess'} # type: ignore 753 754 def _revoke_access_initial( 755 self, 756 resource_group_name, # type: str 757 snapshot_name, # type: str 758 **kwargs # type: Any 759 ): 760 # type: (...) -> None 761 cls = kwargs.pop('cls', None) # type: ClsType[None] 762 error_map = { 763 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 764 } 765 error_map.update(kwargs.pop('error_map', {})) 766 api_version = "2020-06-30" 767 768 # Construct URL 769 url = self._revoke_access_initial.metadata['url'] # type: ignore 770 path_format_arguments = { 771 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 772 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 773 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 774 } 775 url = self._client.format_url(url, **path_format_arguments) 776 777 # Construct parameters 778 query_parameters = {} # type: Dict[str, Any] 779 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 780 781 # Construct headers 782 header_parameters = {} # type: Dict[str, Any] 783 784 request = self._client.post(url, query_parameters, header_parameters) 785 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 786 response = pipeline_response.http_response 787 788 if response.status_code not in [200, 202]: 789 map_error(status_code=response.status_code, response=response, error_map=error_map) 790 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 791 792 if cls: 793 return cls(pipeline_response, None, {}) 794 795 _revoke_access_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/endGetAccess'} # type: ignore 796 797 def begin_revoke_access( 798 self, 799 resource_group_name, # type: str 800 snapshot_name, # type: str 801 **kwargs # type: Any 802 ): 803 # type: (...) -> LROPoller[None] 804 """Revokes access to a snapshot. 805 806 :param resource_group_name: The name of the resource group. 807 :type resource_group_name: str 808 :param snapshot_name: The name of the snapshot that is being created. The name can't be changed 809 after the snapshot is created. Supported characters for the name are a-z, A-Z, 0-9 and _. The 810 max name length is 80 characters. 811 :type snapshot_name: str 812 :keyword callable cls: A custom type or function that will be passed the direct response 813 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 814 :keyword polling: By default, your polling method will be ARMPolling. 815 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 816 :paramtype polling: bool or ~azure.core.polling.PollingMethod 817 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 818 :return: An instance of LROPoller that returns either None or the result of cls(response) 819 :rtype: ~azure.core.polling.LROPoller[None] 820 :raises ~azure.core.exceptions.HttpResponseError: 821 """ 822 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 823 cls = kwargs.pop('cls', None) # type: ClsType[None] 824 lro_delay = kwargs.pop( 825 'polling_interval', 826 self._config.polling_interval 827 ) 828 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 829 if cont_token is None: 830 raw_result = self._revoke_access_initial( 831 resource_group_name=resource_group_name, 832 snapshot_name=snapshot_name, 833 cls=lambda x,y,z: x, 834 **kwargs 835 ) 836 837 kwargs.pop('error_map', None) 838 kwargs.pop('content_type', None) 839 840 def get_long_running_output(pipeline_response): 841 if cls: 842 return cls(pipeline_response, None, {}) 843 844 path_format_arguments = { 845 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 846 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 847 'snapshotName': self._serialize.url("snapshot_name", snapshot_name, 'str'), 848 } 849 850 if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 851 elif polling is False: polling_method = NoPolling() 852 else: polling_method = polling 853 if cont_token: 854 return LROPoller.from_continuation_token( 855 polling_method=polling_method, 856 continuation_token=cont_token, 857 client=self._client, 858 deserialization_callback=get_long_running_output 859 ) 860 else: 861 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 862 begin_revoke_access.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/snapshots/{snapshotName}/endGetAccess'} # type: ignore 863