1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import TYPE_CHECKING 9import warnings 10 11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 12from azure.core.pipeline import PipelineResponse 13from azure.core.pipeline.transport import HttpRequest, HttpResponse 14from azure.core.polling import LROPoller, NoPolling, PollingMethod 15from azure.mgmt.core.exceptions import ARMErrorFormat 16from azure.mgmt.core.polling.arm_polling import ARMPolling 17 18from .. import models as _models 19 20if TYPE_CHECKING: 21 # pylint: disable=unused-import,ungrouped-imports 22 from typing import Any, Callable, Dict, Generic, Optional, TypeVar, Union 23 24 T = TypeVar('T') 25 ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]] 26 27class DdosCustomPoliciesOperations(object): 28 """DdosCustomPoliciesOperations operations. 29 30 You should not instantiate this class directly. Instead, you should create a Client instance that 31 instantiates it for you and attaches it as an attribute. 32 33 :ivar models: Alias to model classes used in this operation group. 34 :type models: ~azure.mgmt.network.v2019_11_01.models 35 :param client: Client for service requests. 36 :param config: Configuration of service client. 37 :param serializer: An object model serializer. 38 :param deserializer: An object model deserializer. 39 """ 40 41 models = _models 42 43 def __init__(self, client, config, serializer, deserializer): 44 self._client = client 45 self._serialize = serializer 46 self._deserialize = deserializer 47 self._config = config 48 49 def _delete_initial( 50 self, 51 resource_group_name, # type: str 52 ddos_custom_policy_name, # type: str 53 **kwargs # type: Any 54 ): 55 # type: (...) -> None 56 cls = kwargs.pop('cls', None) # type: ClsType[None] 57 error_map = { 58 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 59 } 60 error_map.update(kwargs.pop('error_map', {})) 61 api_version = "2019-11-01" 62 accept = "application/json" 63 64 # Construct URL 65 url = self._delete_initial.metadata['url'] # type: ignore 66 path_format_arguments = { 67 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 68 'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'), 69 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 70 } 71 url = self._client.format_url(url, **path_format_arguments) 72 73 # Construct parameters 74 query_parameters = {} # type: Dict[str, Any] 75 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 76 77 # Construct headers 78 header_parameters = {} # type: Dict[str, Any] 79 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 80 81 request = self._client.delete(url, query_parameters, header_parameters) 82 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 83 response = pipeline_response.http_response 84 85 if response.status_code not in [200, 202, 204]: 86 map_error(status_code=response.status_code, response=response, error_map=error_map) 87 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 88 89 if cls: 90 return cls(pipeline_response, None, {}) 91 92 _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'} # type: ignore 93 94 def begin_delete( 95 self, 96 resource_group_name, # type: str 97 ddos_custom_policy_name, # type: str 98 **kwargs # type: Any 99 ): 100 # type: (...) -> LROPoller[None] 101 """Deletes the specified DDoS custom policy. 102 103 :param resource_group_name: The name of the resource group. 104 :type resource_group_name: str 105 :param ddos_custom_policy_name: The name of the DDoS custom policy. 106 :type ddos_custom_policy_name: str 107 :keyword callable cls: A custom type or function that will be passed the direct response 108 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 109 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 110 False for no polling, or your own initialized polling object for a personal polling strategy. 111 :paramtype polling: bool or ~azure.core.polling.PollingMethod 112 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 113 :return: An instance of LROPoller that returns either None or the result of cls(response) 114 :rtype: ~azure.core.polling.LROPoller[None] 115 :raises ~azure.core.exceptions.HttpResponseError: 116 """ 117 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 118 cls = kwargs.pop('cls', None) # type: ClsType[None] 119 lro_delay = kwargs.pop( 120 'polling_interval', 121 self._config.polling_interval 122 ) 123 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 124 if cont_token is None: 125 raw_result = self._delete_initial( 126 resource_group_name=resource_group_name, 127 ddos_custom_policy_name=ddos_custom_policy_name, 128 cls=lambda x,y,z: x, 129 **kwargs 130 ) 131 132 kwargs.pop('error_map', None) 133 kwargs.pop('content_type', None) 134 135 def get_long_running_output(pipeline_response): 136 if cls: 137 return cls(pipeline_response, None, {}) 138 139 path_format_arguments = { 140 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 141 'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'), 142 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 143 } 144 145 if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments, **kwargs) 146 elif polling is False: polling_method = NoPolling() 147 else: polling_method = polling 148 if cont_token: 149 return LROPoller.from_continuation_token( 150 polling_method=polling_method, 151 continuation_token=cont_token, 152 client=self._client, 153 deserialization_callback=get_long_running_output 154 ) 155 else: 156 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 157 begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'} # type: ignore 158 159 def get( 160 self, 161 resource_group_name, # type: str 162 ddos_custom_policy_name, # type: str 163 **kwargs # type: Any 164 ): 165 # type: (...) -> "_models.DdosCustomPolicy" 166 """Gets information about the specified DDoS custom policy. 167 168 :param resource_group_name: The name of the resource group. 169 :type resource_group_name: str 170 :param ddos_custom_policy_name: The name of the DDoS custom policy. 171 :type ddos_custom_policy_name: str 172 :keyword callable cls: A custom type or function that will be passed the direct response 173 :return: DdosCustomPolicy, or the result of cls(response) 174 :rtype: ~azure.mgmt.network.v2019_11_01.models.DdosCustomPolicy 175 :raises: ~azure.core.exceptions.HttpResponseError 176 """ 177 cls = kwargs.pop('cls', None) # type: ClsType["_models.DdosCustomPolicy"] 178 error_map = { 179 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 180 } 181 error_map.update(kwargs.pop('error_map', {})) 182 api_version = "2019-11-01" 183 accept = "application/json" 184 185 # Construct URL 186 url = self.get.metadata['url'] # type: ignore 187 path_format_arguments = { 188 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 189 'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'), 190 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 191 } 192 url = self._client.format_url(url, **path_format_arguments) 193 194 # Construct parameters 195 query_parameters = {} # type: Dict[str, Any] 196 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 197 198 # Construct headers 199 header_parameters = {} # type: Dict[str, Any] 200 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 201 202 request = self._client.get(url, query_parameters, header_parameters) 203 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 204 response = pipeline_response.http_response 205 206 if response.status_code not in [200]: 207 map_error(status_code=response.status_code, response=response, error_map=error_map) 208 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 209 210 deserialized = self._deserialize('DdosCustomPolicy', pipeline_response) 211 212 if cls: 213 return cls(pipeline_response, deserialized, {}) 214 215 return deserialized 216 get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'} # type: ignore 217 218 def _create_or_update_initial( 219 self, 220 resource_group_name, # type: str 221 ddos_custom_policy_name, # type: str 222 parameters, # type: "_models.DdosCustomPolicy" 223 **kwargs # type: Any 224 ): 225 # type: (...) -> "_models.DdosCustomPolicy" 226 cls = kwargs.pop('cls', None) # type: ClsType["_models.DdosCustomPolicy"] 227 error_map = { 228 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 229 } 230 error_map.update(kwargs.pop('error_map', {})) 231 api_version = "2019-11-01" 232 content_type = kwargs.pop("content_type", "application/json") 233 accept = "application/json" 234 235 # Construct URL 236 url = self._create_or_update_initial.metadata['url'] # type: ignore 237 path_format_arguments = { 238 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 239 'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'), 240 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 241 } 242 url = self._client.format_url(url, **path_format_arguments) 243 244 # Construct parameters 245 query_parameters = {} # type: Dict[str, Any] 246 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 247 248 # Construct headers 249 header_parameters = {} # type: Dict[str, Any] 250 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 251 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 252 253 body_content_kwargs = {} # type: Dict[str, Any] 254 body_content = self._serialize.body(parameters, 'DdosCustomPolicy') 255 body_content_kwargs['content'] = body_content 256 request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs) 257 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 258 response = pipeline_response.http_response 259 260 if response.status_code not in [200, 201]: 261 map_error(status_code=response.status_code, response=response, error_map=error_map) 262 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 263 264 if response.status_code == 200: 265 deserialized = self._deserialize('DdosCustomPolicy', pipeline_response) 266 267 if response.status_code == 201: 268 deserialized = self._deserialize('DdosCustomPolicy', pipeline_response) 269 270 if cls: 271 return cls(pipeline_response, deserialized, {}) 272 273 return deserialized 274 _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'} # type: ignore 275 276 def begin_create_or_update( 277 self, 278 resource_group_name, # type: str 279 ddos_custom_policy_name, # type: str 280 parameters, # type: "_models.DdosCustomPolicy" 281 **kwargs # type: Any 282 ): 283 # type: (...) -> LROPoller["_models.DdosCustomPolicy"] 284 """Creates or updates a DDoS custom policy. 285 286 :param resource_group_name: The name of the resource group. 287 :type resource_group_name: str 288 :param ddos_custom_policy_name: The name of the DDoS custom policy. 289 :type ddos_custom_policy_name: str 290 :param parameters: Parameters supplied to the create or update operation. 291 :type parameters: ~azure.mgmt.network.v2019_11_01.models.DdosCustomPolicy 292 :keyword callable cls: A custom type or function that will be passed the direct response 293 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 294 :keyword polling: Pass in True if you'd like the ARMPolling polling method, 295 False for no polling, or your own initialized polling object for a personal polling strategy. 296 :paramtype polling: bool or ~azure.core.polling.PollingMethod 297 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 298 :return: An instance of LROPoller that returns either DdosCustomPolicy or the result of cls(response) 299 :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.network.v2019_11_01.models.DdosCustomPolicy] 300 :raises ~azure.core.exceptions.HttpResponseError: 301 """ 302 polling = kwargs.pop('polling', True) # type: Union[bool, PollingMethod] 303 cls = kwargs.pop('cls', None) # type: ClsType["_models.DdosCustomPolicy"] 304 lro_delay = kwargs.pop( 305 'polling_interval', 306 self._config.polling_interval 307 ) 308 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 309 if cont_token is None: 310 raw_result = self._create_or_update_initial( 311 resource_group_name=resource_group_name, 312 ddos_custom_policy_name=ddos_custom_policy_name, 313 parameters=parameters, 314 cls=lambda x,y,z: x, 315 **kwargs 316 ) 317 318 kwargs.pop('error_map', None) 319 kwargs.pop('content_type', None) 320 321 def get_long_running_output(pipeline_response): 322 deserialized = self._deserialize('DdosCustomPolicy', pipeline_response) 323 324 if cls: 325 return cls(pipeline_response, deserialized, {}) 326 return deserialized 327 328 path_format_arguments = { 329 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 330 'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'), 331 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 332 } 333 334 if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments, **kwargs) 335 elif polling is False: polling_method = NoPolling() 336 else: polling_method = polling 337 if cont_token: 338 return LROPoller.from_continuation_token( 339 polling_method=polling_method, 340 continuation_token=cont_token, 341 client=self._client, 342 deserialization_callback=get_long_running_output 343 ) 344 else: 345 return LROPoller(self._client, raw_result, get_long_running_output, polling_method) 346 begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'} # type: ignore 347 348 def update_tags( 349 self, 350 resource_group_name, # type: str 351 ddos_custom_policy_name, # type: str 352 parameters, # type: "_models.TagsObject" 353 **kwargs # type: Any 354 ): 355 # type: (...) -> "_models.DdosCustomPolicy" 356 """Update a DDoS custom policy tags. 357 358 :param resource_group_name: The name of the resource group. 359 :type resource_group_name: str 360 :param ddos_custom_policy_name: The name of the DDoS custom policy. 361 :type ddos_custom_policy_name: str 362 :param parameters: Parameters supplied to update DDoS custom policy resource tags. 363 :type parameters: ~azure.mgmt.network.v2019_11_01.models.TagsObject 364 :keyword callable cls: A custom type or function that will be passed the direct response 365 :return: DdosCustomPolicy, or the result of cls(response) 366 :rtype: ~azure.mgmt.network.v2019_11_01.models.DdosCustomPolicy 367 :raises: ~azure.core.exceptions.HttpResponseError 368 """ 369 cls = kwargs.pop('cls', None) # type: ClsType["_models.DdosCustomPolicy"] 370 error_map = { 371 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 372 } 373 error_map.update(kwargs.pop('error_map', {})) 374 api_version = "2019-11-01" 375 content_type = kwargs.pop("content_type", "application/json") 376 accept = "application/json" 377 378 # Construct URL 379 url = self.update_tags.metadata['url'] # type: ignore 380 path_format_arguments = { 381 'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'), 382 'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'), 383 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 384 } 385 url = self._client.format_url(url, **path_format_arguments) 386 387 # Construct parameters 388 query_parameters = {} # type: Dict[str, Any] 389 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 390 391 # Construct headers 392 header_parameters = {} # type: Dict[str, Any] 393 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 394 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 395 396 body_content_kwargs = {} # type: Dict[str, Any] 397 body_content = self._serialize.body(parameters, 'TagsObject') 398 body_content_kwargs['content'] = body_content 399 request = self._client.patch(url, query_parameters, header_parameters, **body_content_kwargs) 400 pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs) 401 response = pipeline_response.http_response 402 403 if response.status_code not in [200]: 404 map_error(status_code=response.status_code, response=response, error_map=error_map) 405 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 406 407 deserialized = self._deserialize('DdosCustomPolicy', pipeline_response) 408 409 if cls: 410 return cls(pipeline_response, deserialized, {}) 411 412 return deserialized 413 update_tags.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'} # type: ignore 414