1# coding=utf-8
2# --------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for license information.
5# Code generated by Microsoft (R) AutoRest Code Generator.
6# Changes may cause incorrect behavior and will be lost if the code is regenerated.
7# --------------------------------------------------------------------------
8from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar, Union
9import warnings
10
11from azure.core.async_paging import AsyncItemPaged, AsyncList
12from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
13from azure.core.pipeline import PipelineResponse
14from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest
15from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod
16from azure.mgmt.core.exceptions import ARMErrorFormat
17from azure.mgmt.core.polling.async_arm_polling import AsyncARMPolling
18
19from ... import models as _models
20
21T = TypeVar('T')
22ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
23
24class FirewallPoliciesOperations:
25    """FirewallPoliciesOperations async operations.
26
27    You should not instantiate this class directly. Instead, you should create a Client instance that
28    instantiates it for you and attaches it as an attribute.
29
30    :ivar models: Alias to model classes used in this operation group.
31    :type models: ~azure.mgmt.network.v2020_03_01.models
32    :param client: Client for service requests.
33    :param config: Configuration of service client.
34    :param serializer: An object model serializer.
35    :param deserializer: An object model deserializer.
36    """
37
38    models = _models
39
40    def __init__(self, client, config, serializer, deserializer) -> None:
41        self._client = client
42        self._serialize = serializer
43        self._deserialize = deserializer
44        self._config = config
45
46    async def _delete_initial(
47        self,
48        resource_group_name: str,
49        firewall_policy_name: str,
50        **kwargs
51    ) -> None:
52        cls = kwargs.pop('cls', None)  # type: ClsType[None]
53        error_map = {
54            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
55        }
56        error_map.update(kwargs.pop('error_map', {}))
57        api_version = "2020-03-01"
58        accept = "application/json"
59
60        # Construct URL
61        url = self._delete_initial.metadata['url']  # type: ignore
62        path_format_arguments = {
63            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
64            'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'),
65            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
66        }
67        url = self._client.format_url(url, **path_format_arguments)
68
69        # Construct parameters
70        query_parameters = {}  # type: Dict[str, Any]
71        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
72
73        # Construct headers
74        header_parameters = {}  # type: Dict[str, Any]
75        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
76
77        request = self._client.delete(url, query_parameters, header_parameters)
78        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
79        response = pipeline_response.http_response
80
81        if response.status_code not in [200, 202, 204]:
82            map_error(status_code=response.status_code, response=response, error_map=error_map)
83            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
84
85        if cls:
86            return cls(pipeline_response, None, {})
87
88    _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'}  # type: ignore
89
90    async def begin_delete(
91        self,
92        resource_group_name: str,
93        firewall_policy_name: str,
94        **kwargs
95    ) -> AsyncLROPoller[None]:
96        """Deletes the specified Firewall Policy.
97
98        :param resource_group_name: The name of the resource group.
99        :type resource_group_name: str
100        :param firewall_policy_name: The name of the Firewall Policy.
101        :type firewall_policy_name: str
102        :keyword callable cls: A custom type or function that will be passed the direct response
103        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
104        :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method,
105         False for no polling, or your own initialized polling object for a personal polling strategy.
106        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
107        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
108        :return: An instance of AsyncLROPoller that returns either None or the result of cls(response)
109        :rtype: ~azure.core.polling.AsyncLROPoller[None]
110        :raises ~azure.core.exceptions.HttpResponseError:
111        """
112        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
113        cls = kwargs.pop('cls', None)  # type: ClsType[None]
114        lro_delay = kwargs.pop(
115            'polling_interval',
116            self._config.polling_interval
117        )
118        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
119        if cont_token is None:
120            raw_result = await self._delete_initial(
121                resource_group_name=resource_group_name,
122                firewall_policy_name=firewall_policy_name,
123                cls=lambda x,y,z: x,
124                **kwargs
125            )
126
127        kwargs.pop('error_map', None)
128        kwargs.pop('content_type', None)
129
130        def get_long_running_output(pipeline_response):
131            if cls:
132                return cls(pipeline_response, None, {})
133
134        path_format_arguments = {
135            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
136            'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'),
137            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
138        }
139
140        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments,  **kwargs)
141        elif polling is False: polling_method = AsyncNoPolling()
142        else: polling_method = polling
143        if cont_token:
144            return AsyncLROPoller.from_continuation_token(
145                polling_method=polling_method,
146                continuation_token=cont_token,
147                client=self._client,
148                deserialization_callback=get_long_running_output
149            )
150        else:
151            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
152    begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'}  # type: ignore
153
154    async def get(
155        self,
156        resource_group_name: str,
157        firewall_policy_name: str,
158        expand: Optional[str] = None,
159        **kwargs
160    ) -> "_models.FirewallPolicy":
161        """Gets the specified Firewall Policy.
162
163        :param resource_group_name: The name of the resource group.
164        :type resource_group_name: str
165        :param firewall_policy_name: The name of the Firewall Policy.
166        :type firewall_policy_name: str
167        :param expand: Expands referenced resources.
168        :type expand: str
169        :keyword callable cls: A custom type or function that will be passed the direct response
170        :return: FirewallPolicy, or the result of cls(response)
171        :rtype: ~azure.mgmt.network.v2020_03_01.models.FirewallPolicy
172        :raises: ~azure.core.exceptions.HttpResponseError
173        """
174        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FirewallPolicy"]
175        error_map = {
176            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
177        }
178        error_map.update(kwargs.pop('error_map', {}))
179        api_version = "2020-03-01"
180        accept = "application/json"
181
182        # Construct URL
183        url = self.get.metadata['url']  # type: ignore
184        path_format_arguments = {
185            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
186            'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'),
187            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
188        }
189        url = self._client.format_url(url, **path_format_arguments)
190
191        # Construct parameters
192        query_parameters = {}  # type: Dict[str, Any]
193        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
194        if expand is not None:
195            query_parameters['$expand'] = self._serialize.query("expand", expand, 'str')
196
197        # Construct headers
198        header_parameters = {}  # type: Dict[str, Any]
199        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
200
201        request = self._client.get(url, query_parameters, header_parameters)
202        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
203        response = pipeline_response.http_response
204
205        if response.status_code not in [200]:
206            map_error(status_code=response.status_code, response=response, error_map=error_map)
207            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
208
209        deserialized = self._deserialize('FirewallPolicy', pipeline_response)
210
211        if cls:
212            return cls(pipeline_response, deserialized, {})
213
214        return deserialized
215    get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'}  # type: ignore
216
217    async def _create_or_update_initial(
218        self,
219        resource_group_name: str,
220        firewall_policy_name: str,
221        parameters: "_models.FirewallPolicy",
222        **kwargs
223    ) -> "_models.FirewallPolicy":
224        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FirewallPolicy"]
225        error_map = {
226            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
227        }
228        error_map.update(kwargs.pop('error_map', {}))
229        api_version = "2020-03-01"
230        content_type = kwargs.pop("content_type", "application/json")
231        accept = "application/json"
232
233        # Construct URL
234        url = self._create_or_update_initial.metadata['url']  # type: ignore
235        path_format_arguments = {
236            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
237            'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'),
238            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
239        }
240        url = self._client.format_url(url, **path_format_arguments)
241
242        # Construct parameters
243        query_parameters = {}  # type: Dict[str, Any]
244        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
245
246        # Construct headers
247        header_parameters = {}  # type: Dict[str, Any]
248        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
249        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
250
251        body_content_kwargs = {}  # type: Dict[str, Any]
252        body_content = self._serialize.body(parameters, 'FirewallPolicy')
253        body_content_kwargs['content'] = body_content
254        request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs)
255        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
256        response = pipeline_response.http_response
257
258        if response.status_code not in [200, 201]:
259            map_error(status_code=response.status_code, response=response, error_map=error_map)
260            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
261
262        if response.status_code == 200:
263            deserialized = self._deserialize('FirewallPolicy', pipeline_response)
264
265        if response.status_code == 201:
266            deserialized = self._deserialize('FirewallPolicy', pipeline_response)
267
268        if cls:
269            return cls(pipeline_response, deserialized, {})
270
271        return deserialized
272    _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'}  # type: ignore
273
274    async def begin_create_or_update(
275        self,
276        resource_group_name: str,
277        firewall_policy_name: str,
278        parameters: "_models.FirewallPolicy",
279        **kwargs
280    ) -> AsyncLROPoller["_models.FirewallPolicy"]:
281        """Creates or updates the specified Firewall Policy.
282
283        :param resource_group_name: The name of the resource group.
284        :type resource_group_name: str
285        :param firewall_policy_name: The name of the Firewall Policy.
286        :type firewall_policy_name: str
287        :param parameters: Parameters supplied to the create or update Firewall Policy operation.
288        :type parameters: ~azure.mgmt.network.v2020_03_01.models.FirewallPolicy
289        :keyword callable cls: A custom type or function that will be passed the direct response
290        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
291        :keyword polling: Pass in True if you'd like the AsyncARMPolling polling method,
292         False for no polling, or your own initialized polling object for a personal polling strategy.
293        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
294        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
295        :return: An instance of AsyncLROPoller that returns either FirewallPolicy or the result of cls(response)
296        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.network.v2020_03_01.models.FirewallPolicy]
297        :raises ~azure.core.exceptions.HttpResponseError:
298        """
299        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
300        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FirewallPolicy"]
301        lro_delay = kwargs.pop(
302            'polling_interval',
303            self._config.polling_interval
304        )
305        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
306        if cont_token is None:
307            raw_result = await self._create_or_update_initial(
308                resource_group_name=resource_group_name,
309                firewall_policy_name=firewall_policy_name,
310                parameters=parameters,
311                cls=lambda x,y,z: x,
312                **kwargs
313            )
314
315        kwargs.pop('error_map', None)
316        kwargs.pop('content_type', None)
317
318        def get_long_running_output(pipeline_response):
319            deserialized = self._deserialize('FirewallPolicy', pipeline_response)
320
321            if cls:
322                return cls(pipeline_response, deserialized, {})
323            return deserialized
324
325        path_format_arguments = {
326            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
327            'firewallPolicyName': self._serialize.url("firewall_policy_name", firewall_policy_name, 'str'),
328            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
329        }
330
331        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments,  **kwargs)
332        elif polling is False: polling_method = AsyncNoPolling()
333        else: polling_method = polling
334        if cont_token:
335            return AsyncLROPoller.from_continuation_token(
336                polling_method=polling_method,
337                continuation_token=cont_token,
338                client=self._client,
339                deserialization_callback=get_long_running_output
340            )
341        else:
342            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
343    begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies/{firewallPolicyName}'}  # type: ignore
344
345    def list(
346        self,
347        resource_group_name: str,
348        **kwargs
349    ) -> AsyncIterable["_models.FirewallPolicyListResult"]:
350        """Lists all Firewall Policies in a resource group.
351
352        :param resource_group_name: The name of the resource group.
353        :type resource_group_name: str
354        :keyword callable cls: A custom type or function that will be passed the direct response
355        :return: An iterator like instance of either FirewallPolicyListResult or the result of cls(response)
356        :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.network.v2020_03_01.models.FirewallPolicyListResult]
357        :raises: ~azure.core.exceptions.HttpResponseError
358        """
359        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FirewallPolicyListResult"]
360        error_map = {
361            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
362        }
363        error_map.update(kwargs.pop('error_map', {}))
364        api_version = "2020-03-01"
365        accept = "application/json"
366
367        def prepare_request(next_link=None):
368            # Construct headers
369            header_parameters = {}  # type: Dict[str, Any]
370            header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
371
372            if not next_link:
373                # Construct URL
374                url = self.list.metadata['url']  # type: ignore
375                path_format_arguments = {
376                    'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
377                    'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
378                }
379                url = self._client.format_url(url, **path_format_arguments)
380                # Construct parameters
381                query_parameters = {}  # type: Dict[str, Any]
382                query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
383
384                request = self._client.get(url, query_parameters, header_parameters)
385            else:
386                url = next_link
387                query_parameters = {}  # type: Dict[str, Any]
388                request = self._client.get(url, query_parameters, header_parameters)
389            return request
390
391        async def extract_data(pipeline_response):
392            deserialized = self._deserialize('FirewallPolicyListResult', pipeline_response)
393            list_of_elem = deserialized.value
394            if cls:
395                list_of_elem = cls(list_of_elem)
396            return deserialized.next_link or None, AsyncList(list_of_elem)
397
398        async def get_next(next_link=None):
399            request = prepare_request(next_link)
400
401            pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
402            response = pipeline_response.http_response
403
404            if response.status_code not in [200]:
405                map_error(status_code=response.status_code, response=response, error_map=error_map)
406                raise HttpResponseError(response=response, error_format=ARMErrorFormat)
407
408            return pipeline_response
409
410        return AsyncItemPaged(
411            get_next, extract_data
412        )
413    list.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/firewallPolicies'}  # type: ignore
414
415    def list_all(
416        self,
417        **kwargs
418    ) -> AsyncIterable["_models.FirewallPolicyListResult"]:
419        """Gets all the Firewall Policies in a subscription.
420
421        :keyword callable cls: A custom type or function that will be passed the direct response
422        :return: An iterator like instance of either FirewallPolicyListResult or the result of cls(response)
423        :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.network.v2020_03_01.models.FirewallPolicyListResult]
424        :raises: ~azure.core.exceptions.HttpResponseError
425        """
426        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FirewallPolicyListResult"]
427        error_map = {
428            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
429        }
430        error_map.update(kwargs.pop('error_map', {}))
431        api_version = "2020-03-01"
432        accept = "application/json"
433
434        def prepare_request(next_link=None):
435            # Construct headers
436            header_parameters = {}  # type: Dict[str, Any]
437            header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
438
439            if not next_link:
440                # Construct URL
441                url = self.list_all.metadata['url']  # type: ignore
442                path_format_arguments = {
443                    'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
444                }
445                url = self._client.format_url(url, **path_format_arguments)
446                # Construct parameters
447                query_parameters = {}  # type: Dict[str, Any]
448                query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
449
450                request = self._client.get(url, query_parameters, header_parameters)
451            else:
452                url = next_link
453                query_parameters = {}  # type: Dict[str, Any]
454                request = self._client.get(url, query_parameters, header_parameters)
455            return request
456
457        async def extract_data(pipeline_response):
458            deserialized = self._deserialize('FirewallPolicyListResult', pipeline_response)
459            list_of_elem = deserialized.value
460            if cls:
461                list_of_elem = cls(list_of_elem)
462            return deserialized.next_link or None, AsyncList(list_of_elem)
463
464        async def get_next(next_link=None):
465            request = prepare_request(next_link)
466
467            pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
468            response = pipeline_response.http_response
469
470            if response.status_code not in [200]:
471                map_error(status_code=response.status_code, response=response, error_map=error_map)
472                raise HttpResponseError(response=response, error_format=ARMErrorFormat)
473
474            return pipeline_response
475
476        return AsyncItemPaged(
477            get_next, extract_data
478        )
479    list_all.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Network/firewallPolicies'}  # type: ignore
480