1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import TYPE_CHECKING 9import warnings 10 11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 12from azure.core.paging import ItemPaged 13from azure.core.pipeline import PipelineResponse 14from azure.core.pipeline.transport import HttpRequest, HttpResponse 15from azure.core.polling import LROPoller, NoPolling, PollingMethod 16from azure.mgmt.core.exceptions import ARMErrorFormat 17from azure.mgmt.core.polling.arm_polling import ARMPolling 18 19from .. import models as _models 20 21if TYPE_CHECKING: 22 # pylint: disable=unused-import,ungrouped-imports 23 from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union 24 25 T = TypeVar('T') 26 ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]] 27 28class PacketCapturesOperations(object): 29 """PacketCapturesOperations operations. 30 31 You should not instantiate this class directly. Instead, you should create a Client instance that 32 instantiates it for you and attaches it as an attribute. 33 34 :ivar models: Alias to model classes used in this operation group. 35 :type models: ~azure.mgmt.network.v2018_08_01.models 36 :param client: Client for service requests. 37 :param config: Configuration of service client. 38 :param serializer: An object model serializer. 39 :param deserializer: An object model deserializer. 40 """ 41 42 models = _models 43 44 def __init__(self, client, config, serializer, deserializer): 45 self._client = client 46 self._serialize = serializer 47 self._deserialize = deserializer 48 self._config = config 49 50 def _create_initial( 51 self, 52 resource_group_name, # type: str 53 network_watcher_name, # type: str 54 packet_capture_name, # type: str 55 parameters, # type: "_models.PacketCapture" 56 **kwargs # type: Any 57 ): 58 # type: (...) -> "_models.PacketCaptureResult" 59 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureResult"] 60 error_map = { 61 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 62 } 63 error_map.update(kwargs.pop('error_map', {})) 64 api_version = "2018-08-01" 65 content_type = kwargs.pop("content_type", "application/json") 66 accept = "application/json" 67 68 # Construct URL 69 url = self._create_initial.metadata['url'] # type: ignore 70 path_format_arguments = { 71 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 72 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 73 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 74 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 75 } 76 url = self._client.format_url(url, **path_format_arguments) 77 78 # Construct parameters 79 query_parameters = {} # type: Dict[str, Any] 80 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 81 82 # Construct headers 83 header_parameters = {} # type: Dict[str, Any] 84 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 85 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 86 87 body_content_kwargs = {} # type: Dict[str, Any] 88 body_content = self._serialize.body(parameters, 'PacketCapture') 89 body_content_kwargs['content'] = body_content 90 request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs) 91 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 92 response = pipeline_response.http_response 93 94 if response.status_code not in [201]: 95 map_error(status_code=response.status_code, response=response, error_map=error_map) 96 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 97 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 98 99 deserialized = self._deserialize('PacketCaptureResult', pipeline_response) 100 101 if cls: 102 return cls(pipeline_response, deserialized, {}) 103 104 return deserialized 105 _create_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 106 107 def begin_create( 108 self, 109 resource_group_name, # type: str 110 network_watcher_name, # type: str 111 packet_capture_name, # type: str 112 parameters, # type: "_models.PacketCapture" 113 **kwargs # type: Any 114 ): 115 # type: (...) -> LROPoller["_models.PacketCaptureResult"] 116 """Create and start a packet capture on the specified VM. 117 118 :param resource_group_name: The name of the resource group. 119 :type resource_group_name: str 120 :param network_watcher_name: The name of the network watcher. 121 :type network_watcher_name: str 122 :param packet_capture_name: The name of the packet capture session. 123 :type packet_capture_name: str 124 :param parameters: Parameters that define the create packet capture operation. 125 :type parameters: ~azure.mgmt.network.v2018_08_01.models.PacketCapture 126 :keyword callable cls: A custom type or function that will be passed the direct response 127 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 128 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 129 False for no polling, or your own initialized polling object for a personal polling strategy. 130 :paramtype polling: bool or ~azure.core.polling.PollingMethod 131 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 132 :return: An instance of LROPoller that returns either PacketCaptureResult or the result of cls(response) 133 :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.network.v2018_08_01.models.PacketCaptureResult] 134 :raises ~azure.core.exceptions.HttpResponseError: 135 """ 136 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 137 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureResult"] 138 lro_delay = kwargs.pop( 139 'polling_interval', 140 self._config.polling_interval 141 ) 142 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 143 if cont_token is None: 144 raw_result = self._create_initial( 145 resource_group_name=resource_group_name, 146 network_watcher_name=network_watcher_name, 147 packet_capture_name=packet_capture_name, 148 parameters=parameters, 149 cls=lambda x,y,z: x, 150 **kwargs 151 ) 152 153 kwargs.pop('error_map', None) 154 kwargs.pop('content_type', None) 155 156 def get_long_running_output(pipeline_response): 157 deserialized = self._deserialize('PacketCaptureResult', pipeline_response) 158 159 if cls: 160 return cls(pipeline_response, deserialized, {}) 161 return deserialized 162 163 path_format_arguments = { 164 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 165 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 166 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 167 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 168 } 169 170 if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 171 elif polling is False: polling_method = NoPolling() 172 else: polling_method = polling 173 if cont_token: 174 return LROPoller.from_continuation_token( 175 polling_method=polling_method, 176 continuation_token=cont_token, 177 client=self._client, 178 deserialization_callback=get_long_running_output 179 ) 180 else: 181 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 182 begin_create.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 183 184 def get( 185 self, 186 resource_group_name, # type: str 187 network_watcher_name, # type: str 188 packet_capture_name, # type: str 189 **kwargs # type: Any 190 ): 191 # type: (...) -> "_models.PacketCaptureResult" 192 """Gets a packet capture session by name. 193 194 :param resource_group_name: The name of the resource group. 195 :type resource_group_name: str 196 :param network_watcher_name: The name of the network watcher. 197 :type network_watcher_name: str 198 :param packet_capture_name: The name of the packet capture session. 199 :type packet_capture_name: str 200 :keyword callable cls: A custom type or function that will be passed the direct response 201 :return: PacketCaptureResult, or the result of cls(response) 202 :rtype: ~azure.mgmt.network.v2018_08_01.models.PacketCaptureResult 203 :raises: ~azure.core.exceptions.HttpResponseError 204 """ 205 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureResult"] 206 error_map = { 207 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 208 } 209 error_map.update(kwargs.pop('error_map', {})) 210 api_version = "2018-08-01" 211 accept = "application/json" 212 213 # Construct URL 214 url = self.get.metadata['url'] # type: ignore 215 path_format_arguments = { 216 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 217 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 218 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 219 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 220 } 221 url = self._client.format_url(url, **path_format_arguments) 222 223 # Construct parameters 224 query_parameters = {} # type: Dict[str, Any] 225 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 226 227 # Construct headers 228 header_parameters = {} # type: Dict[str, Any] 229 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 230 231 request = self._client.get(url, query_parameters, header_parameters) 232 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 233 response = pipeline_response.http_response 234 235 if response.status_code not in [200]: 236 map_error(status_code=response.status_code, response=response, error_map=error_map) 237 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 238 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 239 240 deserialized = self._deserialize('PacketCaptureResult', pipeline_response) 241 242 if cls: 243 return cls(pipeline_response, deserialized, {}) 244 245 return deserialized 246 get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 247 248 def _delete_initial( 249 self, 250 resource_group_name, # type: str 251 network_watcher_name, # type: str 252 packet_capture_name, # type: str 253 **kwargs # type: Any 254 ): 255 # type: (...) -> None 256 cls = kwargs.pop('cls', None) # type: ClsType[None] 257 error_map = { 258 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 259 } 260 error_map.update(kwargs.pop('error_map', {})) 261 api_version = "2018-08-01" 262 accept = "application/json" 263 264 # Construct URL 265 url = self._delete_initial.metadata['url'] # type: ignore 266 path_format_arguments = { 267 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 268 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 269 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 270 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 271 } 272 url = self._client.format_url(url, **path_format_arguments) 273 274 # Construct parameters 275 query_parameters = {} # type: Dict[str, Any] 276 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 277 278 # Construct headers 279 header_parameters = {} # type: Dict[str, Any] 280 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 281 282 request = self._client.delete(url, query_parameters, header_parameters) 283 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 284 response = pipeline_response.http_response 285 286 if response.status_code not in [202, 204]: 287 map_error(status_code=response.status_code, response=response, error_map=error_map) 288 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 289 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 290 291 if cls: 292 return cls(pipeline_response, None, {}) 293 294 _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 295 296 def begin_delete( 297 self, 298 resource_group_name, # type: str 299 network_watcher_name, # type: str 300 packet_capture_name, # type: str 301 **kwargs # type: Any 302 ): 303 # type: (...) -> LROPoller[None] 304 """Deletes the specified packet capture session. 305 306 :param resource_group_name: The name of the resource group. 307 :type resource_group_name: str 308 :param network_watcher_name: The name of the network watcher. 309 :type network_watcher_name: str 310 :param packet_capture_name: The name of the packet capture session. 311 :type packet_capture_name: str 312 :keyword callable cls: A custom type or function that will be passed the direct response 313 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 314 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 315 False for no polling, or your own initialized polling object for a personal polling strategy. 316 :paramtype polling: bool or ~azure.core.polling.PollingMethod 317 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 318 :return: An instance of LROPoller that returns either None or the result of cls(response) 319 :rtype: ~azure.core.polling.LROPoller[None] 320 :raises ~azure.core.exceptions.HttpResponseError: 321 """ 322 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 323 cls = kwargs.pop('cls', None) # type: ClsType[None] 324 lro_delay = kwargs.pop( 325 'polling_interval', 326 self._config.polling_interval 327 ) 328 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 329 if cont_token is None: 330 raw_result = self._delete_initial( 331 resource_group_name=resource_group_name, 332 network_watcher_name=network_watcher_name, 333 packet_capture_name=packet_capture_name, 334 cls=lambda x,y,z: x, 335 **kwargs 336 ) 337 338 kwargs.pop('error_map', None) 339 kwargs.pop('content_type', None) 340 341 def get_long_running_output(pipeline_response): 342 if cls: 343 return cls(pipeline_response, None, {}) 344 345 path_format_arguments = { 346 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 347 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 348 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 349 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 350 } 351 352 if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 353 elif polling is False: polling_method = NoPolling() 354 else: polling_method = polling 355 if cont_token: 356 return LROPoller.from_continuation_token( 357 polling_method=polling_method, 358 continuation_token=cont_token, 359 client=self._client, 360 deserialization_callback=get_long_running_output 361 ) 362 else: 363 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 364 begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}'} # type: ignore 365 366 def _stop_initial( 367 self, 368 resource_group_name, # type: str 369 network_watcher_name, # type: str 370 packet_capture_name, # type: str 371 **kwargs # type: Any 372 ): 373 # type: (...) -> None 374 cls = kwargs.pop('cls', None) # type: ClsType[None] 375 error_map = { 376 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 377 } 378 error_map.update(kwargs.pop('error_map', {})) 379 api_version = "2018-08-01" 380 accept = "application/json" 381 382 # Construct URL 383 url = self._stop_initial.metadata['url'] # type: ignore 384 path_format_arguments = { 385 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 386 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 387 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 388 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 389 } 390 url = self._client.format_url(url, **path_format_arguments) 391 392 # Construct parameters 393 query_parameters = {} # type: Dict[str, Any] 394 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 395 396 # Construct headers 397 header_parameters = {} # type: Dict[str, Any] 398 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 399 400 request = self._client.post(url, query_parameters, header_parameters) 401 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 402 response = pipeline_response.http_response 403 404 if response.status_code not in [200, 202]: 405 map_error(status_code=response.status_code, response=response, error_map=error_map) 406 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 407 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 408 409 if cls: 410 return cls(pipeline_response, None, {}) 411 412 _stop_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/stop'} # type: ignore 413 414 def begin_stop( 415 self, 416 resource_group_name, # type: str 417 network_watcher_name, # type: str 418 packet_capture_name, # type: str 419 **kwargs # type: Any 420 ): 421 # type: (...) -> LROPoller[None] 422 """Stops a specified packet capture session. 423 424 :param resource_group_name: The name of the resource group. 425 :type resource_group_name: str 426 :param network_watcher_name: The name of the network watcher. 427 :type network_watcher_name: str 428 :param packet_capture_name: The name of the packet capture session. 429 :type packet_capture_name: str 430 :keyword callable cls: A custom type or function that will be passed the direct response 431 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 432 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 433 False for no polling, or your own initialized polling object for a personal polling strategy. 434 :paramtype polling: bool or ~azure.core.polling.PollingMethod 435 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 436 :return: An instance of LROPoller that returns either None or the result of cls(response) 437 :rtype: ~azure.core.polling.LROPoller[None] 438 :raises ~azure.core.exceptions.HttpResponseError: 439 """ 440 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 441 cls = kwargs.pop('cls', None) # type: ClsType[None] 442 lro_delay = kwargs.pop( 443 'polling_interval', 444 self._config.polling_interval 445 ) 446 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 447 if cont_token is None: 448 raw_result = self._stop_initial( 449 resource_group_name=resource_group_name, 450 network_watcher_name=network_watcher_name, 451 packet_capture_name=packet_capture_name, 452 cls=lambda x,y,z: x, 453 **kwargs 454 ) 455 456 kwargs.pop('error_map', None) 457 kwargs.pop('content_type', None) 458 459 def get_long_running_output(pipeline_response): 460 if cls: 461 return cls(pipeline_response, None, {}) 462 463 path_format_arguments = { 464 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 465 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 466 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 467 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 468 } 469 470 if polling is True: polling_method = ARMPolling(lro_delay, path_format_arguments=path_format_arguments, **kwargs) 471 elif polling is False: polling_method = NoPolling() 472 else: polling_method = polling 473 if cont_token: 474 return LROPoller.from_continuation_token( 475 polling_method=polling_method, 476 continuation_token=cont_token, 477 client=self._client, 478 deserialization_callback=get_long_running_output 479 ) 480 else: 481 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 482 begin_stop.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/stop'} # type: ignore 483 484 def _get_status_initial( 485 self, 486 resource_group_name, # type: str 487 network_watcher_name, # type: str 488 packet_capture_name, # type: str 489 **kwargs # type: Any 490 ): 491 # type: (...) -> "_models.PacketCaptureQueryStatusResult" 492 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureQueryStatusResult"] 493 error_map = { 494 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 495 } 496 error_map.update(kwargs.pop('error_map', {})) 497 api_version = "2018-08-01" 498 accept = "application/json" 499 500 # Construct URL 501 url = self._get_status_initial.metadata['url'] # type: ignore 502 path_format_arguments = { 503 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 504 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 505 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 506 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 507 } 508 url = self._client.format_url(url, **path_format_arguments) 509 510 # Construct parameters 511 query_parameters = {} # type: Dict[str, Any] 512 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 513 514 # Construct headers 515 header_parameters = {} # type: Dict[str, Any] 516 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 517 518 request = self._client.post(url, query_parameters, header_parameters) 519 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 520 response = pipeline_response.http_response 521 522 if response.status_code not in [200, 202]: 523 map_error(status_code=response.status_code, response=response, error_map=error_map) 524 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 525 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 526 527 if response.status_code == 200: 528 deserialized = self._deserialize('PacketCaptureQueryStatusResult', pipeline_response) 529 530 if response.status_code == 202: 531 deserialized = self._deserialize('PacketCaptureQueryStatusResult', pipeline_response) 532 533 if cls: 534 return cls(pipeline_response, deserialized, {}) 535 536 return deserialized 537 _get_status_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/queryStatus'} # type: ignore 538 539 def begin_get_status( 540 self, 541 resource_group_name, # type: str 542 network_watcher_name, # type: str 543 packet_capture_name, # type: str 544 **kwargs # type: Any 545 ): 546 # type: (...) -> LROPoller["_models.PacketCaptureQueryStatusResult"] 547 """Query the status of a running packet capture session. 548 549 :param resource_group_name: The name of the resource group. 550 :type resource_group_name: str 551 :param network_watcher_name: The name of the Network Watcher resource. 552 :type network_watcher_name: str 553 :param packet_capture_name: The name given to the packet capture session. 554 :type packet_capture_name: str 555 :keyword callable cls: A custom type or function that will be passed the direct response 556 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 557 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 558 False for no polling, or your own initialized polling object for a personal polling strategy. 559 :paramtype polling: bool or ~azure.core.polling.PollingMethod 560 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 561 :return: An instance of LROPoller that returns either PacketCaptureQueryStatusResult or the result of cls(response) 562 :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.network.v2018_08_01.models.PacketCaptureQueryStatusResult] 563 :raises ~azure.core.exceptions.HttpResponseError: 564 """ 565 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 566 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureQueryStatusResult"] 567 lro_delay = kwargs.pop( 568 'polling_interval', 569 self._config.polling_interval 570 ) 571 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 572 if cont_token is None: 573 raw_result = self._get_status_initial( 574 resource_group_name=resource_group_name, 575 network_watcher_name=network_watcher_name, 576 packet_capture_name=packet_capture_name, 577 cls=lambda x,y,z: x, 578 **kwargs 579 ) 580 581 kwargs.pop('error_map', None) 582 kwargs.pop('content_type', None) 583 584 def get_long_running_output(pipeline_response): 585 deserialized = self._deserialize('PacketCaptureQueryStatusResult', pipeline_response) 586 587 if cls: 588 return cls(pipeline_response, deserialized, {}) 589 return deserialized 590 591 path_format_arguments = { 592 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 593 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 594 'packetCaptureName': self._serialize.url("packet_capture_name", packet_capture_name, 'str'), 595 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 596 } 597 598 if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 599 elif polling is False: polling_method = NoPolling() 600 else: polling_method = polling 601 if cont_token: 602 return LROPoller.from_continuation_token( 603 polling_method=polling_method, 604 continuation_token=cont_token, 605 client=self._client, 606 deserialization_callback=get_long_running_output 607 ) 608 else: 609 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 610 begin_get_status.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures/{packetCaptureName}/queryStatus'} # type: ignore 611 612 def list( 613 self, 614 resource_group_name, # type: str 615 network_watcher_name, # type: str 616 **kwargs # type: Any 617 ): 618 # type: (...) -> Iterable["_models.PacketCaptureListResult"] 619 """Lists all packet capture sessions within the specified resource group. 620 621 :param resource_group_name: The name of the resource group. 622 :type resource_group_name: str 623 :param network_watcher_name: The name of the Network Watcher resource. 624 :type network_watcher_name: str 625 :keyword callable cls: A custom type or function that will be passed the direct response 626 :return: An iterator like instance of either PacketCaptureListResult or the result of cls(response) 627 :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.network.v2018_08_01.models.PacketCaptureListResult] 628 :raises: ~azure.core.exceptions.HttpResponseError 629 """ 630 cls = kwargs.pop('cls', None) # type: ClsType["_models.PacketCaptureListResult"] 631 error_map = { 632 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 633 } 634 error_map.update(kwargs.pop('error_map', {})) 635 api_version = "2018-08-01" 636 accept = "application/json" 637 638 def prepare_request(next_link=None): 639 # Construct headers 640 header_parameters = {} # type: Dict[str, Any] 641 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 642 643 if not next_link: 644 # Construct URL 645 url = self.list.metadata['url'] # type: ignore 646 path_format_arguments = { 647 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 648 'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'), 649 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 650 } 651 url = self._client.format_url(url, **path_format_arguments) 652 # Construct parameters 653 query_parameters = {} # type: Dict[str, Any] 654 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 655 656 request = self._client.get(url, query_parameters, header_parameters) 657 else: 658 url = next_link 659 query_parameters = {} # type: Dict[str, Any] 660 request = self._client.get(url, query_parameters, header_parameters) 661 return request 662 663 def extract_data(pipeline_response): 664 deserialized = self._deserialize('PacketCaptureListResult', pipeline_response) 665 list_of_elem = deserialized.value 666 if cls: 667 list_of_elem = cls(list_of_elem) 668 return None, iter(list_of_elem) 669 670 def get_next(next_link=None): 671 request = prepare_request(next_link) 672 673 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 674 response = pipeline_response.http_response 675 676 if response.status_code not in [200]: 677 error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response) 678 map_error(status_code=response.status_code, response=response, error_map=error_map) 679 raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat) 680 681 return pipeline_response 682 683 return ItemPaged( 684 get_next, extract_data 685 ) 686 list.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/packetCaptures'} # type: ignore 687