1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar, Union 9import warnings 10 11from azure.core.async_paging import AsyncItemPaged, AsyncList 12from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 13from azure.core.pipeline import PipelineResponse 14from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest 15from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod 16from azure.mgmt.core.exceptions import ARMErrorFormat 17from azure.mgmt.core.polling.async_arm_polling import AsyncARMPolling 18 19from ... import models as _models 20 21T = TypeVar('T') 22ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]] 23 24class PacketCapturesOperations: 25 """PacketCapturesOperations async operations. 26 27 You should not instantiate this class directly. Instead, you should create a Client instance that 28 instantiates it for you and attaches it as an attribute. 29 30 :ivar models: Alias to model classes used in this operation group. 31 :type models: ~azure.mgmt.network.v2020_03_01.models 32 :param client: Client for service requests. 33 :param config: Configuration of service client. 34 :param serializer: An object model serializer. 35 :param deserializer: An object model deserializer. 36 """ 37 38 models = _models 39 40 def __init__(self, client, config, serializer, deserializer) -> None: 41 self._client = client 42 self._serialize = serializer 43 self._deserialize = deserializer 44 self._config = config 45 46 async def _create_initial( 47 self, 48 resource_group_name: str, 49 network_watcher_name: str, 50 packet_capture_name: str, 51 parameters: "_models.PacketCapture", 52 **kwargs 53 ) -> "_models.PacketCaptureResult": 54 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureResult"] 55 error_map = { 56 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 57 } 58 error_map.update(kwargs.pop('error_map', {})) 59 api_version = "2020-03-01" 60 content_type = kwargs.pop("content_type", "application/json") 61 accept = "application/json" 62 63 # Construct URL 64 url = self._create_initial.metadata['url'] # type: ignore 65 path_format_arguments = { 66 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 67 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 68 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 69 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 70 } 71 url = self._client.format_url(url, **path_format_arguments) 72 73 # Construct parameters 74 query_parameters = {} # type: Dict[str, Any] 75 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 76 77 # Construct headers 78 header_parameters = {} # type: Dict[str, Any] 79 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 80 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 81 82 body_content_kwargs = {} # type: Dict[str, Any] 83 body_content = self._serialize.body(parameters, 'PacketCapture') 84 body_content_kwargs['content'] = body_content 85 request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs) 86 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 87 response = pipeline_response.http_response 88 89 if response.status_code not in [201]: 90 map_error(status_code=response.status_code, response=response, error_map=error_map) 91 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 92 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 93 94 deserialized = self._deserialize('PacketCaptureResult', pipeline_response) 95 96 if cls: 97 return cls(pipeline_response, deserialized, {}) 98 99 return deserialized 100 _create_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 101 102 async def begin_create( 103 self, 104 resource_group_name: str, 105 network_watcher_name: str, 106 packet_capture_name: str, 107 parameters: "_models.PacketCapture", 108 **kwargs 109 ) -> AsyncLROPoller["_models.PacketCaptureResult"]: 110 """Create and start a packet capture on the specified VM. 111 112 :param resource_group_name: The name of the resource group. 113 :type resource_group_name: str 114 :param network_watcher_name: The name of the network watcher. 115 :type network_watcher_name: str 116 :param packet_capture_name: The name of the packet capture session. 117 :type packet_capture_name: str 118 :param parameters: Parameters that define the create packet capture operation. 119 :type parameters: ~azure.mgmt.network.v2020_03_01.models.PacketCapture 120 :keyword callable cls: A custom type or function that will be passed the direct response 121 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 122 :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method, 123 False for no polling, or your own initialized polling object for a personal polling strategy. 124 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 125 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 126 :return: An instance of AsyncLROPoller that returns either PacketCaptureResult or the result of cls(response) 127 :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.network.v2020_03_01.models.PacketCaptureResult] 128 :raises ~azure.core.exceptions.HttpResponseError: 129 """ 130 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 131 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureResult"] 132 lro_delay = kwargs.pop( 133 'polling_interval', 134 self._config.polling_interval 135 ) 136 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 137 if cont_token is None: 138 raw_result = await self._create_initial( 139 resource_group_name=resource_group_name, 140 network_watcher_name=network_watcher_name, 141 packet_capture_name=packet_capture_name, 142 parameters=parameters, 143 cls=lambda x,y,z: x, 144 **kwargs 145 ) 146 147 kwargs.pop('error_map', None) 148 kwargs.pop('content_type', None) 149 150 def get_long_running_output(pipeline_response): 151 deserialized = self._deserialize('PacketCaptureResult', pipeline_response) 152 153 if cls: 154 return cls(pipeline_response, deserialized, {}) 155 return deserialized 156 157 path_format_arguments = { 158 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 159 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 160 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 161 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 162 } 163 164 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments, **kwargs) 165 elif polling is False: polling_method = AsyncNoPolling() 166 else: polling_method = polling 167 if cont_token: 168 return AsyncLROPoller.from_continuation_token( 169 polling_method=polling_method, 170 continuation_token=cont_token, 171 client=self._client, 172 deserialization_callback=get_long_running_output 173 ) 174 else: 175 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 176 begin_create.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 177 178 async def get( 179 self, 180 resource_group_name: str, 181 network_watcher_name: str, 182 packet_capture_name: str, 183 **kwargs 184 ) -> "_models.PacketCaptureResult": 185 """Gets a packet capture session by name. 186 187 :param resource_group_name: The name of the resource group. 188 :type resource_group_name: str 189 :param network_watcher_name: The name of the network watcher. 190 :type network_watcher_name: str 191 :param packet_capture_name: The name of the packet capture session. 192 :type packet_capture_name: str 193 :keyword callable cls: A custom type or function that will be passed the direct response 194 :return: PacketCaptureResult, or the result of cls(response) 195 :rtype: ~azure.mgmt.network.v2020_03_01.models.PacketCaptureResult 196 :raises: ~azure.core.exceptions.HttpResponseError 197 """ 198 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureResult"] 199 error_map = { 200 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 201 } 202 error_map.update(kwargs.pop('error_map', {})) 203 api_version = "2020-03-01" 204 accept = "application/json" 205 206 # Construct URL 207 url = self.get.metadata['url'] # type: ignore 208 path_format_arguments = { 209 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 210 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 211 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 212 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 213 } 214 url = self._client.format_url(url, **path_format_arguments) 215 216 # Construct parameters 217 query_parameters = {} # type: Dict[str, Any] 218 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 219 220 # Construct headers 221 header_parameters = {} # type: Dict[str, Any] 222 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 223 224 request = self._client.get(url, query_parameters, header_parameters) 225 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 226 response = pipeline_response.http_response 227 228 if response.status_code not in [200]: 229 map_error(status_code=response.status_code, response=response, error_map=error_map) 230 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 231 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 232 233 deserialized = self._deserialize('PacketCaptureResult', pipeline_response) 234 235 if cls: 236 return cls(pipeline_response, deserialized, {}) 237 238 return deserialized 239 get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 240 241 async def _delete_initial( 242 self, 243 resource_group_name: str, 244 network_watcher_name: str, 245 packet_capture_name: str, 246 **kwargs 247 ) -> None: 248 cls = kwargs.pop('cls', None) # type: ClsType[None] 249 error_map = { 250 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 251 } 252 error_map.update(kwargs.pop('error_map', {})) 253 api_version = "2020-03-01" 254 accept = "application/json" 255 256 # Construct URL 257 url = self._delete_initial.metadata['url'] # type: ignore 258 path_format_arguments = { 259 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 260 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 261 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 262 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 263 } 264 url = self._client.format_url(url, **path_format_arguments) 265 266 # Construct parameters 267 query_parameters = {} # type: Dict[str, Any] 268 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 269 270 # Construct headers 271 header_parameters = {} # type: Dict[str, Any] 272 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 273 274 request = self._client.delete(url, query_parameters, header_parameters) 275 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 276 response = pipeline_response.http_response 277 278 if response.status_code not in [202, 204]: 279 map_error(status_code=response.status_code, response=response, error_map=error_map) 280 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 281 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 282 283 if cls: 284 return cls(pipeline_response, None, {}) 285 286 _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 287 288 async def begin_delete( 289 self, 290 resource_group_name: str, 291 network_watcher_name: str, 292 packet_capture_name: str, 293 **kwargs 294 ) -> AsyncLROPoller[None]: 295 """Deletes the specified packet capture session. 296 297 :param resource_group_name: The name of the resource group. 298 :type resource_group_name: str 299 :param network_watcher_name: The name of the network watcher. 300 :type network_watcher_name: str 301 :param packet_capture_name: The name of the packet capture session. 302 :type packet_capture_name: str 303 :keyword callable cls: A custom type or function that will be passed the direct response 304 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 305 :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method, 306 False for no polling, or your own initialized polling object for a personal polling strategy. 307 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 308 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 309 :return: An instance of AsyncLROPoller that returns either None or the result of cls(response) 310 :rtype: ~azure.core.polling.AsyncLROPoller[None] 311 :raises ~azure.core.exceptions.HttpResponseError: 312 """ 313 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 314 cls = kwargs.pop('cls', None) # type: ClsType[None] 315 lro_delay = kwargs.pop( 316 'polling_interval', 317 self._config.polling_interval 318 ) 319 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 320 if cont_token is None: 321 raw_result = await self._delete_initial( 322 resource_group_name=resource_group_name, 323 network_watcher_name=network_watcher_name, 324 packet_capture_name=packet_capture_name, 325 cls=lambda x,y,z: x, 326 **kwargs 327 ) 328 329 kwargs.pop('error_map', None) 330 kwargs.pop('content_type', None) 331 332 def get_long_running_output(pipeline_response): 333 if cls: 334 return cls(pipeline_response, None, {}) 335 336 path_format_arguments = { 337 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 338 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 339 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 340 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 341 } 342 343 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 344 elif polling is False: polling_method = AsyncNoPolling() 345 else: polling_method = polling 346 if cont_token: 347 return AsyncLROPoller.from_continuation_token( 348 polling_method=polling_method, 349 continuation_token=cont_token, 350 client=self._client, 351 deserialization_callback=get_long_running_output 352 ) 353 else: 354 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 355 begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 356 357 async def _stop_initial( 358 self, 359 resource_group_name: str, 360 network_watcher_name: str, 361 packet_capture_name: str, 362 **kwargs 363 ) -> None: 364 cls = kwargs.pop('cls', None) # type: ClsType[None] 365 error_map = { 366 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 367 } 368 error_map.update(kwargs.pop('error_map', {})) 369 api_version = "2020-03-01" 370 accept = "application/json" 371 372 # Construct URL 373 url = self._stop_initial.metadata['url'] # type: ignore 374 path_format_arguments = { 375 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 376 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 377 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 378 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 379 } 380 url = self._client.format_url(url, **path_format_arguments) 381 382 # Construct parameters 383 query_parameters = {} # type: Dict[str, Any] 384 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 385 386 # Construct headers 387 header_parameters = {} # type: Dict[str, Any] 388 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 389 390 request = self._client.post(url, query_parameters, header_parameters) 391 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 392 response = pipeline_response.http_response 393 394 if response.status_code not in [200, 202]: 395 map_error(status_code=response.status_code, response=response, error_map=error_map) 396 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 397 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 398 399 if cls: 400 return cls(pipeline_response, None, {}) 401 402 _stop_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/stop'} # type: ignore 403 404 async def begin_stop( 405 self, 406 resource_group_name: str, 407 network_watcher_name: str, 408 packet_capture_name: str, 409 **kwargs 410 ) -> AsyncLROPoller[None]: 411 """Stops a specified packet capture session. 412 413 :param resource_group_name: The name of the resource group. 414 :type resource_group_name: str 415 :param network_watcher_name: The name of the network watcher. 416 :type network_watcher_name: str 417 :param packet_capture_name: The name of the packet capture session. 418 :type packet_capture_name: str 419 :keyword callable cls: A custom type or function that will be passed the direct response 420 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 421 :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method, 422 False for no polling, or your own initialized polling object for a personal polling strategy. 423 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 424 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 425 :return: An instance of AsyncLROPoller that returns either None or the result of cls(response) 426 :rtype: ~azure.core.polling.AsyncLROPoller[None] 427 :raises ~azure.core.exceptions.HttpResponseError: 428 """ 429 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 430 cls = kwargs.pop('cls', None) # type: ClsType[None] 431 lro_delay = kwargs.pop( 432 'polling_interval', 433 self._config.polling_interval 434 ) 435 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 436 if cont_token is None: 437 raw_result = await self._stop_initial( 438 resource_group_name=resource_group_name, 439 network_watcher_name=network_watcher_name, 440 packet_capture_name=packet_capture_name, 441 cls=lambda x,y,z: x, 442 **kwargs 443 ) 444 445 kwargs.pop('error_map', None) 446 kwargs.pop('content_type', None) 447 448 def get_long_running_output(pipeline_response): 449 if cls: 450 return cls(pipeline_response, None, {}) 451 452 path_format_arguments = { 453 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 454 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 455 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 456 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 457 } 458 459 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 460 elif polling is False: polling_method = AsyncNoPolling() 461 else: polling_method = polling 462 if cont_token: 463 return AsyncLROPoller.from_continuation_token( 464 polling_method=polling_method, 465 continuation_token=cont_token, 466 client=self._client, 467 deserialization_callback=get_long_running_output 468 ) 469 else: 470 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 471 begin_stop.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/stop'} # type: ignore 472 473 async def _get_status_initial( 474 self, 475 resource_group_name: str, 476 network_watcher_name: str, 477 packet_capture_name: str, 478 **kwargs 479 ) -> "_models.PacketCaptureQueryStatusResult": 480 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureQueryStatusResult"] 481 error_map = { 482 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 483 } 484 error_map.update(kwargs.pop('error_map', {})) 485 api_version = "2020-03-01" 486 accept = "application/json" 487 488 # Construct URL 489 url = self._get_status_initial.metadata['url'] # type: ignore 490 path_format_arguments = { 491 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 492 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 493 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 494 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 495 } 496 url = self._client.format_url(url, **path_format_arguments) 497 498 # Construct parameters 499 query_parameters = {} # type: Dict[str, Any] 500 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 501 502 # Construct headers 503 header_parameters = {} # type: Dict[str, Any] 504 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 505 506 request = self._client.post(url, query_parameters, header_parameters) 507 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 508 response = pipeline_response.http_response 509 510 if response.status_code not in [200, 202]: 511 map_error(status_code=response.status_code, response=response, error_map=error_map) 512 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 513 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 514 515 if response.status_code == 200: 516 deserialized = self._deserialize('PacketCaptureQueryStatusResult', pipeline_response) 517 518 if response.status_code == 202: 519 deserialized = self._deserialize('PacketCaptureQueryStatusResult', pipeline_response) 520 521 if cls: 522 return cls(pipeline_response, deserialized, {}) 523 524 return deserialized 525 _get_status_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/queryStatus'} # type: ignore 526 527 async def begin_get_status( 528 self, 529 resource_group_name: str, 530 network_watcher_name: str, 531 packet_capture_name: str, 532 **kwargs 533 ) -> AsyncLROPoller["_models.PacketCaptureQueryStatusResult"]: 534 """Query the status of a running packet capture session. 535 536 :param resource_group_name: The name of the resource group. 537 :type resource_group_name: str 538 :param network_watcher_name: The name of the Network Watcher resource. 539 :type network_watcher_name: str 540 :param packet_capture_name: The name given to the packet capture session. 541 :type packet_capture_name: str 542 :keyword callable cls: A custom type or function that will be passed the direct response 543 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 544 :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method, 545 False for no polling, or your own initialized polling object for a personal polling strategy. 546 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 547 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 548 :return: An instance of AsyncLROPoller that returns either PacketCaptureQueryStatusResult or the result of cls(response) 549 :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.network.v2020_03_01.models.PacketCaptureQueryStatusResult] 550 :raises ~azure.core.exceptions.HttpResponseError: 551 """ 552 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 553 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureQueryStatusResult"] 554 lro_delay = kwargs.pop( 555 'polling_interval', 556 self._config.polling_interval 557 ) 558 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 559 if cont_token is None: 560 raw_result = await self._get_status_initial( 561 resource_group_name=resource_group_name, 562 network_watcher_name=network_watcher_name, 563 packet_capture_name=packet_capture_name, 564 cls=lambda x,y,z: x, 565 **kwargs 566 ) 567 568 kwargs.pop('error_map', None) 569 kwargs.pop('content_type', None) 570 571 def get_long_running_output(pipeline_response): 572 deserialized = self._deserialize('PacketCaptureQueryStatusResult', pipeline_response) 573 574 if cls: 575 return cls(pipeline_response, deserialized, {}) 576 return deserialized 577 578 path_format_arguments = { 579 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 580 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 581 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 582 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 583 } 584 585 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 586 elif polling is False: polling_method = AsyncNoPolling() 587 else: polling_method = polling 588 if cont_token: 589 return AsyncLROPoller.from_continuation_token( 590 polling_method=polling_method, 591 continuation_token=cont_token, 592 client=self._client, 593 deserialization_callback=get_long_running_output 594 ) 595 else: 596 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 597 begin_get_status.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/queryStatus'} # type: ignore 598 599 def list( 600 self, 601 resource_group_name: str, 602 network_watcher_name: str, 603 **kwargs 604 ) -> AsyncIterable["_models.PacketCaptureListResult"]: 605 """Lists all packet capture sessions within the specified resource group. 606 607 :param resource_group_name: The name of the resource group. 608 :type resource_group_name: str 609 :param network_watcher_name: The name of the Network Watcher resource. 610 :type network_watcher_name: str 611 :keyword callable cls: A custom type or function that will be passed the direct response 612 :return: An iterator like instance of either PacketCaptureListResult or the result of cls(response) 613 :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.network.v2020_03_01.models.PacketCaptureListResult] 614 :raises: ~azure.core.exceptions.HttpResponseError 615 """ 616 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureListResult"] 617 error_map = { 618 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 619 } 620 error_map.update(kwargs.pop('error_map', {})) 621 api_version = "2020-03-01" 622 accept = "application/json" 623 624 def prepare_request(next_link=None): 625 # Construct headers 626 header_parameters = {} # type: Dict[str, Any] 627 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 628 629 if not next_link: 630 # Construct URL 631 url = self.list.metadata['url'] # type: ignore 632 path_format_arguments = { 633 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 634 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 635 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 636 } 637 url = self._client.format_url(url, **path_format_arguments) 638 # Construct parameters 639 query_parameters = {} # type: Dict[str, Any] 640 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 641 642 request = self._client.get(url, query_parameters, header_parameters) 643 else: 644 url = next_link 645 query_parameters = {} # type: Dict[str, Any] 646 request = self._client.get(url, query_parameters, header_parameters) 647 return request 648 649 async def extract_data(pipeline_response): 650 deserialized = self._deserialize('PacketCaptureListResult', pipeline_response) 651 list_of_elem = deserialized.value 652 if cls: 653 list_of_elem = cls(list_of_elem) 654 return None, AsyncList(list_of_elem) 655 656 async def get_next(next_link=None): 657 request = prepare_request(next_link) 658 659 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 660 response = pipeline_response.http_response 661 662 if response.status_code not in [200]: 663 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 664 map_error(status_code=response.status_code, response=response, error_map=error_map) 665 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 666 667 return pipeline_response 668 669 return AsyncItemPaged( 670 get_next, extract_data 671 ) 672 list.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures'} # type: ignore 673