1# coding=utf-8
2# --------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for license information.
5# Code generated by Microsoft (R) AutoRest Code Generator.
6# Changes may cause incorrect behavior and will be lost if the code is regenerated.
7# --------------------------------------------------------------------------
8from typing import Any, Callable, Dict, Generic, Optional, TypeVar, Union
9import warnings
10
11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
12from azure.core.pipeline import PipelineResponse
13from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest
14from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod
15from azure.mgmt.core.exceptions import ARMErrorFormat
16from azure.mgmt.core.polling.async_arm_polling import AsyncARMPolling
17
18from ... import models as _models
19
20T = TypeVar('T')
21ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
22
23class DdosCustomPoliciesOperations:
24    """DdosCustomPoliciesOperations async operations.
25
26    You should not instantiate this class directly. Instead, you should create a Client instance that
27    instantiates it for you and attaches it as an attribute.
28
29    :ivar models: Alias to model classes used in this operation group.
30    :type models: ~azure.mgmt.network.v2019_08_01.models
31    :param client: Client for service requests.
32    :param config: Configuration of service client.
33    :param serializer: An object model serializer.
34    :param deserializer: An object model deserializer.
35    """
36
37    models = _models
38
39    def __init__(self, client, config, serializer, deserializer) -> None:
40        self._client = client
41        self._serialize = serializer
42        self._deserialize = deserializer
43        self._config = config
44
45    async def _delete_initial(
46        self,
47        resource_group_name: str,
48        ddos_custom_policy_name: str,
49        **kwargs
50    ) -> None:
51        cls = kwargs.pop('cls', None)  # type: ClsType[None]
52        error_map = {
53            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
54        }
55        error_map.update(kwargs.pop('error_map', {}))
56        api_version = "2019-08-01"
57        accept = "application/json"
58
59        # Construct URL
60        url = self._delete_initial.metadata['url']  # type: ignore
61        path_format_arguments = {
62            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
63            'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'),
64            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
65        }
66        url = self._client.format_url(url, **path_format_arguments)
67
68        # Construct parameters
69        query_parameters = {}  # type: Dict[str, Any]
70        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
71
72        # Construct headers
73        header_parameters = {}  # type: Dict[str, Any]
74        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
75
76        request = self._client.delete(url, query_parameters, header_parameters)
77        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
78        response = pipeline_response.http_response
79
80        if response.status_code not in [200, 202, 204]:
81            map_error(status_code=response.status_code, response=response, error_map=error_map)
82            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
83
84        if cls:
85            return cls(pipeline_response, None, {})
86
87    _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'}  # type: ignore
88
89    async def begin_delete(
90        self,
91        resource_group_name: str,
92        ddos_custom_policy_name: str,
93        **kwargs
94    ) -> AsyncLROPoller[None]:
95        """Deletes the specified DDoS custom policy.
96
97        :param resource_group_name: The name of the resource group.
98        :type resource_group_name: str
99        :param ddos_custom_policy_name: The name of the DDoS custom policy.
100        :type ddos_custom_policy_name: str
101        :keyword callable cls: A custom type or function that will be passed the direct response
102        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
103        :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method,
104         False for no polling, or your own initialized polling object for a personal polling strategy.
105        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
106        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
107        :return: An instance of AsyncLROPoller that returns either None or the result of cls(response)
108        :rtype: ~azure.core.polling.AsyncLROPoller[None]
109        :raises ~azure.core.exceptions.HttpResponseError:
110        """
111        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
112        cls = kwargs.pop('cls', None)  # type: ClsType[None]
113        lro_delay = kwargs.pop(
114            'polling_interval',
115            self._config.polling_interval
116        )
117        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
118        if cont_token is None:
119            raw_result = await self._delete_initial(
120                resource_group_name=resource_group_name,
121                ddos_custom_policy_name=ddos_custom_policy_name,
122                cls=lambda x,y,z: x,
123                **kwargs
124            )
125
126        kwargs.pop('error_map', None)
127        kwargs.pop('content_type', None)
128
129        def get_long_running_output(pipeline_response):
130            if cls:
131                return cls(pipeline_response, None, {})
132
133        path_format_arguments = {
134            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
135            'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'),
136            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
137        }
138
139        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments,  **kwargs)
140        elif polling is False: polling_method = AsyncNoPolling()
141        else: polling_method = polling
142        if cont_token:
143            return AsyncLROPoller.from_continuation_token(
144                polling_method=polling_method,
145                continuation_token=cont_token,
146                client=self._client,
147                deserialization_callback=get_long_running_output
148            )
149        else:
150            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
151    begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'}  # type: ignore
152
153    async def get(
154        self,
155        resource_group_name: str,
156        ddos_custom_policy_name: str,
157        **kwargs
158    ) -> "_models.DdosCustomPolicy":
159        """Gets information about the specified DDoS custom policy.
160
161        :param resource_group_name: The name of the resource group.
162        :type resource_group_name: str
163        :param ddos_custom_policy_name: The name of the DDoS custom policy.
164        :type ddos_custom_policy_name: str
165        :keyword callable cls: A custom type or function that will be passed the direct response
166        :return: DdosCustomPolicy, or the result of cls(response)
167        :rtype: ~azure.mgmt.network.v2019_08_01.models.DdosCustomPolicy
168        :raises: ~azure.core.exceptions.HttpResponseError
169        """
170        cls = kwargs.pop('cls', None)  # type: ClsType["_models.DdosCustomPolicy"]
171        error_map = {
172            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
173        }
174        error_map.update(kwargs.pop('error_map', {}))
175        api_version = "2019-08-01"
176        accept = "application/json"
177
178        # Construct URL
179        url = self.get.metadata['url']  # type: ignore
180        path_format_arguments = {
181            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
182            'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'),
183            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
184        }
185        url = self._client.format_url(url, **path_format_arguments)
186
187        # Construct parameters
188        query_parameters = {}  # type: Dict[str, Any]
189        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
190
191        # Construct headers
192        header_parameters = {}  # type: Dict[str, Any]
193        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
194
195        request = self._client.get(url, query_parameters, header_parameters)
196        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
197        response = pipeline_response.http_response
198
199        if response.status_code not in [200]:
200            map_error(status_code=response.status_code, response=response, error_map=error_map)
201            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
202
203        deserialized = self._deserialize('DdosCustomPolicy', pipeline_response)
204
205        if cls:
206            return cls(pipeline_response, deserialized, {})
207
208        return deserialized
209    get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'}  # type: ignore
210
211    async def _create_or_update_initial(
212        self,
213        resource_group_name: str,
214        ddos_custom_policy_name: str,
215        parameters: "_models.DdosCustomPolicy",
216        **kwargs
217    ) -> "_models.DdosCustomPolicy":
218        cls = kwargs.pop('cls', None)  # type: ClsType["_models.DdosCustomPolicy"]
219        error_map = {
220            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
221        }
222        error_map.update(kwargs.pop('error_map', {}))
223        api_version = "2019-08-01"
224        content_type = kwargs.pop("content_type", "application/json")
225        accept = "application/json"
226
227        # Construct URL
228        url = self._create_or_update_initial.metadata['url']  # type: ignore
229        path_format_arguments = {
230            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
231            'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'),
232            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
233        }
234        url = self._client.format_url(url, **path_format_arguments)
235
236        # Construct parameters
237        query_parameters = {}  # type: Dict[str, Any]
238        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
239
240        # Construct headers
241        header_parameters = {}  # type: Dict[str, Any]
242        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
243        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
244
245        body_content_kwargs = {}  # type: Dict[str, Any]
246        body_content = self._serialize.body(parameters, 'DdosCustomPolicy')
247        body_content_kwargs['content'] = body_content
248        request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs)
249        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
250        response = pipeline_response.http_response
251
252        if response.status_code not in [200, 201]:
253            map_error(status_code=response.status_code, response=response, error_map=error_map)
254            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
255
256        if response.status_code == 200:
257            deserialized = self._deserialize('DdosCustomPolicy', pipeline_response)
258
259        if response.status_code == 201:
260            deserialized = self._deserialize('DdosCustomPolicy', pipeline_response)
261
262        if cls:
263            return cls(pipeline_response, deserialized, {})
264
265        return deserialized
266    _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'}  # type: ignore
267
268    async def begin_create_or_update(
269        self,
270        resource_group_name: str,
271        ddos_custom_policy_name: str,
272        parameters: "_models.DdosCustomPolicy",
273        **kwargs
274    ) -> AsyncLROPoller["_models.DdosCustomPolicy"]:
275        """Creates or updates a DDoS custom policy.
276
277        :param resource_group_name: The name of the resource group.
278        :type resource_group_name: str
279        :param ddos_custom_policy_name: The name of the DDoS custom policy.
280        :type ddos_custom_policy_name: str
281        :param parameters: Parameters supplied to the create or update operation.
282        :type parameters: ~azure.mgmt.network.v2019_08_01.models.DdosCustomPolicy
283        :keyword callable cls: A custom type or function that will be passed the direct response
284        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
285        :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method,
286         False for no polling, or your own initialized polling object for a personal polling strategy.
287        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
288        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
289        :return: An instance of AsyncLROPoller that returns either DdosCustomPolicy or the result of cls(response)
290        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.network.v2019_08_01.models.DdosCustomPolicy]
291        :raises ~azure.core.exceptions.HttpResponseError:
292        """
293        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
294        cls = kwargs.pop('cls', None)  # type: ClsType["_models.DdosCustomPolicy"]
295        lro_delay = kwargs.pop(
296            'polling_interval',
297            self._config.polling_interval
298        )
299        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
300        if cont_token is None:
301            raw_result = await self._create_or_update_initial(
302                resource_group_name=resource_group_name,
303                ddos_custom_policy_name=ddos_custom_policy_name,
304                parameters=parameters,
305                cls=lambda x,y,z: x,
306                **kwargs
307            )
308
309        kwargs.pop('error_map', None)
310        kwargs.pop('content_type', None)
311
312        def get_long_running_output(pipeline_response):
313            deserialized = self._deserialize('DdosCustomPolicy', pipeline_response)
314
315            if cls:
316                return cls(pipeline_response, deserialized, {})
317            return deserialized
318
319        path_format_arguments = {
320            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
321            'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'),
322            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
323        }
324
325        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments,  **kwargs)
326        elif polling is False: polling_method = AsyncNoPolling()
327        else: polling_method = polling
328        if cont_token:
329            return AsyncLROPoller.from_continuation_token(
330                polling_method=polling_method,
331                continuation_token=cont_token,
332                client=self._client,
333                deserialization_callback=get_long_running_output
334            )
335        else:
336            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
337    begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'}  # type: ignore
338
339    async def _update_tags_initial(
340        self,
341        resource_group_name: str,
342        ddos_custom_policy_name: str,
343        parameters: "_models.TagsObject",
344        **kwargs
345    ) -> "_models.DdosCustomPolicy":
346        cls = kwargs.pop('cls', None)  # type: ClsType["_models.DdosCustomPolicy"]
347        error_map = {
348            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
349        }
350        error_map.update(kwargs.pop('error_map', {}))
351        api_version = "2019-08-01"
352        content_type = kwargs.pop("content_type", "application/json")
353        accept = "application/json"
354
355        # Construct URL
356        url = self._update_tags_initial.metadata['url']  # type: ignore
357        path_format_arguments = {
358            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
359            'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'),
360            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
361        }
362        url = self._client.format_url(url, **path_format_arguments)
363
364        # Construct parameters
365        query_parameters = {}  # type: Dict[str, Any]
366        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
367
368        # Construct headers
369        header_parameters = {}  # type: Dict[str, Any]
370        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
371        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
372
373        body_content_kwargs = {}  # type: Dict[str, Any]
374        body_content = self._serialize.body(parameters, 'TagsObject')
375        body_content_kwargs['content'] = body_content
376        request = self._client.patch(url, query_parameters, header_parameters, **body_content_kwargs)
377        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
378        response = pipeline_response.http_response
379
380        if response.status_code not in [200]:
381            map_error(status_code=response.status_code, response=response, error_map=error_map)
382            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
383
384        deserialized = self._deserialize('DdosCustomPolicy', pipeline_response)
385
386        if cls:
387            return cls(pipeline_response, deserialized, {})
388
389        return deserialized
390    _update_tags_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'}  # type: ignore
391
392    async def begin_update_tags(
393        self,
394        resource_group_name: str,
395        ddos_custom_policy_name: str,
396        parameters: "_models.TagsObject",
397        **kwargs
398    ) -> AsyncLROPoller["_models.DdosCustomPolicy"]:
399        """Update a DDoS custom policy tags.
400
401        :param resource_group_name: The name of the resource group.
402        :type resource_group_name: str
403        :param ddos_custom_policy_name: The name of the DDoS custom policy.
404        :type ddos_custom_policy_name: str
405        :param parameters: Parameters supplied to the update DDoS custom policy resource tags.
406        :type parameters: ~azure.mgmt.network.v2019_08_01.models.TagsObject
407        :keyword callable cls: A custom type or function that will be passed the direct response
408        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
409        :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method,
410         False for no polling, or your own initialized polling object for a personal polling strategy.
411        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
412        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
413        :return: An instance of AsyncLROPoller that returns either DdosCustomPolicy or the result of cls(response)
414        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.network.v2019_08_01.models.DdosCustomPolicy]
415        :raises ~azure.core.exceptions.HttpResponseError:
416        """
417        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
418        cls = kwargs.pop('cls', None)  # type: ClsType["_models.DdosCustomPolicy"]
419        lro_delay = kwargs.pop(
420            'polling_interval',
421            self._config.polling_interval
422        )
423        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
424        if cont_token is None:
425            raw_result = await self._update_tags_initial(
426                resource_group_name=resource_group_name,
427                ddos_custom_policy_name=ddos_custom_policy_name,
428                parameters=parameters,
429                cls=lambda x,y,z: x,
430                **kwargs
431            )
432
433        kwargs.pop('error_map', None)
434        kwargs.pop('content_type', None)
435
436        def get_long_running_output(pipeline_response):
437            deserialized = self._deserialize('DdosCustomPolicy', pipeline_response)
438
439            if cls:
440                return cls(pipeline_response, deserialized, {})
441            return deserialized
442
443        path_format_arguments = {
444            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
445            'ddosCustomPolicyName': self._serialize.url("ddos_custom_policy_name", ddos_custom_policy_name, 'str'),
446            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
447        }
448
449        if polling is True: polling_method = AsyncARMPolling(lro_delay, path_format_arguments=path_format_arguments,  **kwargs)
450        elif polling is False: polling_method = AsyncNoPolling()
451        else: polling_method = polling
452        if cont_token:
453            return AsyncLROPoller.from_continuation_token(
454                polling_method=polling_method,
455                continuation_token=cont_token,
456                client=self._client,
457                deserialization_callback=get_long_running_output
458            )
459        else:
460            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
461    begin_update_tags.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/ddosCustomPolicies/{ddosCustomPolicyName}'}  # type: ignore
462