1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import TYPE_CHECKING 9import warnings 10 11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 12from azure.core.paging import ItemPaged 13from azure.core.pipeline import PipelineResponse 14from azure.core.pipeline.transport import HttpRequest, HttpResponse 15from azure.core.polling import LROPoller, NoPolling, PollingMethod 16from azure.mgmt.core.exceptions import ARMErrorFormat 17from azure.mgmt.core.polling.arm_polling import ARMPolling 18 19from .. import models as _models 20 21if TYPE_CHECKING: 22 # pylint: disable=unused-import,ungrouped-imports 23 from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union 24 25 T = TypeVar('T') 26 ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]] 27 28class FirewallPoliciesOperations(object): 29 """FirewallPoliciesOperations operations. 30 31 You should not instantiate this class directly. Instead, you should create a Client instance that 32 instantiates it for you and attaches it as an attribute. 33 34 :ivar models: Alias to model classes used in this operation group. 35 :type models: ~azure.mgmt.network.v2019_11_01.models 36 :param client: Client for service requests. 37 :param config: Configuration of service client. 38 :param serializer: An object model serializer. 39 :param deserializer: An object model deserializer. 40 """ 41 42 models = _models 43 44 def __init__(self, client, config, serializer, deserializer): 45 self._client = client 46 self._serialize = serializer 47 self._deserialize = deserializer 48 self._config = config 49 50 def _delete_initial( 51 self, 52 resource_group_name, # type: str 53 firewall_policy_name, # type: str 54 **kwargs # type: Any 55 ): 56 # type: (...) -> None 57 cls = kwargs.pop('cls', None) # type: ClsType[None] 58 error_map = { 59 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 60 } 61 error_map.update(kwargs.pop('error_map', {})) 62 api_version = "2019-11-01" 63 accept = "application/json" 64 65 # Construct URL 66 url = self._delete_initial.metadata['url'] # type: ignore 67 path_format_arguments = { 68 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 69 'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'), 70 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 71 } 72 url = self._client.format_url(url, **path_format_arguments) 73 74 # Construct parameters 75 query_parameters = {} # type: Dict[str, Any] 76 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 77 78 # Construct headers 79 header_parameters = {} # type: Dict[str, Any] 80 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 81 82 request = self._client.delete(url, query_parameters, header_parameters) 83 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 84 response = pipeline_response.http_response 85 86 if response.status_code not in [200, 202, 204]: 87 map_error(status_code=response.status_code, response=response, error_map=error_map) 88 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 89 90 if cls: 91 return cls(pipeline_response, None, {}) 92 93 _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'} # type: ignore 94 95 def begin_delete( 96 self, 97 resource_group_name, # type: str 98 firewall_policy_name, # type: str 99 **kwargs # type: Any 100 ): 101 # type: (...) -> LROPoller[None] 102 """Deletes the specified Firewall Policy. 103 104 :param resource_group_name: The name of the resource group. 105 :type resource_group_name: str 106 :param firewall_policy_name: The name of the Firewall Policy. 107 :type firewall_policy_name: str 108 :keyword callable cls: A custom type or function that will be passed the direct response 109 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 110 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 111 False for no polling, or your own initialized polling object for a personal polling strategy. 112 :paramtype polling: bool or ~azure.core.polling.PollingMethod 113 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 114 :return: An instance of LROPoller that returns either None or the result of cls(response) 115 :rtype: ~azure.core.polling.LROPoller[None] 116 :raises ~azure.core.exceptions.HttpResponseError: 117 """ 118 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 119 cls = kwargs.pop('cls', None) # type: ClsType[None] 120 lro_delay = kwargs.pop( 121 'polling_interval', 122 self._config.polling_interval 123 ) 124 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 125 if cont_token is None: 126 raw_result = self._delete_initial( 127 resource_group_name=resource_group_name, 128 firewall_policy_name=firewall_policy_name, 129 cls=lambda x,y,z: x, 130 **kwargs 131 ) 132 133 kwargs.pop('error_map', None) 134 kwargs.pop('content_type', None) 135 136 def get_long_running_output(pipeline_response): 137 if cls: 138 return cls(pipeline_response, None, {}) 139 140 path_format_arguments = { 141 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 142 'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'), 143 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 144 } 145 146 if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 147 elif polling is False: polling_method = NoPolling() 148 else: polling_method = polling 149 if cont_token: 150 return LROPoller.from_continuation_token( 151 polling_method=polling_method, 152 continuation_token=cont_token, 153 client=self._client, 154 deserialization_callback=get_long_running_output 155 ) 156 else: 157 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 158 begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'} # type: ignore 159 160 def get( 161 self, 162 resource_group_name, # type: str 163 firewall_policy_name, # type: str 164 expand=None, # type: Optional[str] 165 **kwargs # type: Any 166 ): 167 # type: (...) -> "_models.FirewallPolicy" 168 """Gets the specified Firewall Policy. 169 170 :param resource_group_name: The name of the resource group. 171 :type resource_group_name: str 172 :param firewall_policy_name: The name of the Firewall Policy. 173 :type firewall_policy_name: str 174 :param expand: Expands referenced resources. 175 :type expand: str 176 :keyword callable cls: A custom type or function that will be passed the direct response 177 :return: FirewallPolicy, or the result of cls(response) 178 :rtype: ~azure.mgmt.network.v2019_11_01.models.FirewallPolicy 179 :raises: ~azure.core.exceptions.HttpResponseError 180 """ 181 cls = kwargs.pop('cls', None) # type: ClsType["_models.FirewallPolicy"] 182 error_map = { 183 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 184 } 185 error_map.update(kwargs.pop('error_map', {})) 186 api_version = "2019-11-01" 187 accept = "application/json" 188 189 # Construct URL 190 url = self.get.metadata['url'] # type: ignore 191 path_format_arguments = { 192 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 193 'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'), 194 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 195 } 196 url = self._client.format_url(url, **path_format_arguments) 197 198 # Construct parameters 199 query_parameters = {} # type: Dict[str, Any] 200 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 201 if expand is not None: 202 query_parameters['$expand'] = self._serialize.query("expand", expand, 'str') 203 204 # Construct headers 205 header_parameters = {} # type: Dict[str, Any] 206 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 207 208 request = self._client.get(url, query_parameters, header_parameters) 209 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 210 response = pipeline_response.http_response 211 212 if response.status_code not in [200]: 213 map_error(status_code=response.status_code, response=response, error_map=error_map) 214 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 215 216 deserialized = self._deserialize('FirewallPolicy', pipeline_response) 217 218 if cls: 219 return cls(pipeline_response, deserialized, {}) 220 221 return deserialized 222 get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'} # type: ignore 223 224 def _create_or_update_initial( 225 self, 226 resource_group_name, # type: str 227 firewall_policy_name, # type: str 228 parameters, # type: "_models.FirewallPolicy" 229 **kwargs # type: Any 230 ): 231 # type: (...) -> "_models.FirewallPolicy" 232 cls = kwargs.pop('cls', None) # type: ClsType["_models.FirewallPolicy"] 233 error_map = { 234 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 235 } 236 error_map.update(kwargs.pop('error_map', {})) 237 api_version = "2019-11-01" 238 content_type = kwargs.pop("content_type", "application/json") 239 accept = "application/json" 240 241 # Construct URL 242 url = self._create_or_update_initial.metadata['url'] # type: ignore 243 path_format_arguments = { 244 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 245 'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'), 246 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 247 } 248 url = self._client.format_url(url, **path_format_arguments) 249 250 # Construct parameters 251 query_parameters = {} # type: Dict[str, Any] 252 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 253 254 # Construct headers 255 header_parameters = {} # type: Dict[str, Any] 256 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 257 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 258 259 body_content_kwargs = {} # type: Dict[str, Any] 260 body_content = self._serialize.body(parameters, 'FirewallPolicy') 261 body_content_kwargs['content'] = body_content 262 request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs) 263 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 264 response = pipeline_response.http_response 265 266 if response.status_code not in [200, 201]: 267 map_error(status_code=response.status_code, response=response, error_map=error_map) 268 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 269 270 if response.status_code == 200: 271 deserialized = self._deserialize('FirewallPolicy', pipeline_response) 272 273 if response.status_code == 201: 274 deserialized = self._deserialize('FirewallPolicy', pipeline_response) 275 276 if cls: 277 return cls(pipeline_response, deserialized, {}) 278 279 return deserialized 280 _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'} # type: ignore 281 282 def begin_create_or_update( 283 self, 284 resource_group_name, # type: str 285 firewall_policy_name, # type: str 286 parameters, # type: "_models.FirewallPolicy" 287 **kwargs # type: Any 288 ): 289 # type: (...) -> LROPoller["_models.FirewallPolicy"] 290 """Creates or updates the specified Firewall Policy. 291 292 :param resource_group_name: The name of the resource group. 293 :type resource_group_name: str 294 :param firewall_policy_name: The name of the Firewall Policy. 295 :type firewall_policy_name: str 296 :param parameters: Parameters supplied to the create or update Firewall Policy operation. 297 :type parameters: ~azure.mgmt.network.v2019_11_01.models.FirewallPolicy 298 :keyword callable cls: A custom type or function that will be passed the direct response 299 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 300 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 301 False for no polling, or your own initialized polling object for a personal polling strategy. 302 :paramtype polling: bool or ~azure.core.polling.PollingMethod 303 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 304 :return: An instance of LROPoller that returns either FirewallPolicy or the result of cls(response) 305 :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.network.v2019_11_01.models.FirewallPolicy] 306 :raises ~azure.core.exceptions.HttpResponseError: 307 """ 308 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 309 cls = kwargs.pop('cls', None) # type: ClsType["_models.FirewallPolicy"] 310 lro_delay = kwargs.pop( 311 'polling_interval', 312 self._config.polling_interval 313 ) 314 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 315 if cont_token is None: 316 raw_result = self._create_or_update_initial( 317 resource_group_name=resource_group_name, 318 firewall_policy_name=firewall_policy_name, 319 parameters=parameters, 320 cls=lambda x,y,z: x, 321 **kwargs 322 ) 323 324 kwargs.pop('error_map', None) 325 kwargs.pop('content_type', None) 326 327 def get_long_running_output(pipeline_response): 328 deserialized = self._deserialize('FirewallPolicy', pipeline_response) 329 330 if cls: 331 return cls(pipeline_response, deserialized, {}) 332 return deserialized 333 334 path_format_arguments = { 335 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 336 'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'), 337 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 338 } 339 340 if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments, **kwargs) 341 elif polling is False: polling_method = NoPolling() 342 else: polling_method = polling 343 if cont_token: 344 return LROPoller.from_continuation_token( 345 polling_method=polling_method, 346 continuation_token=cont_token, 347 client=self._client, 348 deserialization_callback=get_long_running_output 349 ) 350 else: 351 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 352 begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'} # type: ignore 353 354 def list( 355 self, 356 resource_group_name, # type: str 357 **kwargs # type: Any 358 ): 359 # type: (...) -> Iterable["_models.FirewallPolicyListResult"] 360 """Lists all Firewall Policies in a resource group. 361 362 :param resource_group_name: The name of the resource group. 363 :type resource_group_name: str 364 :keyword callable cls: A custom type or function that will be passed the direct response 365 :return: An iterator like instance of either FirewallPolicyListResult or the result of cls(response) 366 :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.network.v2019_11_01.models.FirewallPolicyListResult] 367 :raises: ~azure.core.exceptions.HttpResponseError 368 """ 369 cls = kwargs.pop('cls', None) # type: ClsType["_models.FirewallPolicyListResult"] 370 error_map = { 371 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 372 } 373 error_map.update(kwargs.pop('error_map', {})) 374 api_version = "2019-11-01" 375 accept = "application/json" 376 377 def prepare_request(next_link=None): 378 # Construct headers 379 header_parameters = {} # type: Dict[str, Any] 380 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 381 382 if not next_link: 383 # Construct URL 384 url = self.list.metadata['url'] # type: ignore 385 path_format_arguments = { 386 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 387 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 388 } 389 url = self._client.format_url(url, **path_format_arguments) 390 # Construct parameters 391 query_parameters = {} # type: Dict[str, Any] 392 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 393 394 request = self._client.get(url, query_parameters, header_parameters) 395 else: 396 url = next_link 397 query_parameters = {} # type: Dict[str, Any] 398 request = self._client.get(url, query_parameters, header_parameters) 399 return request 400 401 def extract_data(pipeline_response): 402 deserialized = self._deserialize('FirewallPolicyListResult', pipeline_response) 403 list_of_elem = deserialized.value 404 if cls: 405 list_of_elem = cls(list_of_elem) 406 return deserialized.next_link or None, iter(list_of_elem) 407 408 def get_next(next_link=None): 409 request = prepare_request(next_link) 410 411 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 412 response = pipeline_response.http_response 413 414 if response.status_code not in [200]: 415 map_error(status_code=response.status_code, response=response, error_map=error_map) 416 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 417 418 return pipeline_response 419 420 return ItemPaged( 421 get_next, extract_data 422 ) 423 list.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies'} # type: ignore 424 425 def list_all( 426 self, 427 **kwargs # type: Any 428 ): 429 # type: (...) -> Iterable["_models.FirewallPolicyListResult"] 430 """Gets all the Firewall Policies in a subscription. 431 432 :keyword callable cls: A custom type or function that will be passed the direct response 433 :return: An iterator like instance of either FirewallPolicyListResult or the result of cls(response) 434 :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.network.v2019_11_01.models.FirewallPolicyListResult] 435 :raises: ~azure.core.exceptions.HttpResponseError 436 """ 437 cls = kwargs.pop('cls', None) # type: ClsType["_models.FirewallPolicyListResult"] 438 error_map = { 439 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 440 } 441 error_map.update(kwargs.pop('error_map', {})) 442 api_version = "2019-11-01" 443 accept = "application/json" 444 445 def prepare_request(next_link=None): 446 # Construct headers 447 header_parameters = {} # type: Dict[str, Any] 448 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 449 450 if not next_link: 451 # Construct URL 452 url = self.list_all.metadata['url'] # type: ignore 453 path_format_arguments = { 454 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 455 } 456 url = self._client.format_url(url, **path_format_arguments) 457 # Construct parameters 458 query_parameters = {} # type: Dict[str, Any] 459 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 460 461 request = self._client.get(url, query_parameters, header_parameters) 462 else: 463 url = next_link 464 query_parameters = {} # type: Dict[str, Any] 465 request = self._client.get(url, query_parameters, header_parameters) 466 return request 467 468 def extract_data(pipeline_response): 469 deserialized = self._deserialize('FirewallPolicyListResult', pipeline_response) 470 list_of_elem = deserialized.value 471 if cls: 472 list_of_elem = cls(list_of_elem) 473 return deserialized.next_link or None, iter(list_of_elem) 474 475 def get_next(next_link=None): 476 request = prepare_request(next_link) 477 478 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 479 response = pipeline_response.http_response 480 481 if response.status_code not in [200]: 482 map_error(status_code=response.status_code, response=response, error_map=error_map) 483 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 484 485 return pipeline_response 486 487 return ItemPaged( 488 get_next, extract_data 489 ) 490 list_all.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Network/firewallPolicies'} # type: ignore 491