1# coding=utf-8
2# --------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for license information.
5# Code generated by Microsoft (R) AutoRest Code Generator.
6# Changes may cause incorrect behavior and will be lost if the code is regenerated.
7# --------------------------------------------------------------------------
8from typing import TYPE_CHECKING
9import warnings
10
11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
12from azure.core.paging import ItemPaged
13from azure.core.pipeline import PipelineResponse
14from azure.core.pipeline.transport import HttpRequest, HttpResponse
15from azure.core.polling import LROPoller, NoPolling, PollingMethod
16from azure.mgmt.core.exceptions import ARMErrorFormat
17from azure.mgmt.core.polling.arm_polling import ARMPolling
18
19from .. import models as _models
20
21if TYPE_CHECKING:
22    # pylint: disable=unused-import,ungrouped-imports
23    from typing import Any, Callable, Dict, Generic, Iterable, Optional, TypeVar, Union
24
25    T = TypeVar('T')
26    ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
27
28class FlowLogsOperations(object):
29    """FlowLogsOperations operations.
30
31    You should not instantiate this class directly. Instead, you should create a Client instance that
32    instantiates it for you and attaches it as an attribute.
33
34    :ivar models: Alias to model classes used in this operation group.
35    :type models: ~azure.mgmt.network.v2019_11_01.models
36    :param client: Client for service requests.
37    :param config: Configuration of service client.
38    :param serializer: An object model serializer.
39    :param deserializer: An object model deserializer.
40    """
41
42    models = _models
43
44    def __init__(self, client, config, serializer, deserializer):
45        self._client = client
46        self._serialize = serializer
47        self._deserialize = deserializer
48        self._config = config
49
50    def _create_or_update_initial(
51        self,
52        resource_group_name,  # type: str
53        network_watcher_name,  # type: str
54        flow_log_name,  # type: str
55        parameters,  # type: "_models.FlowLog"
56        **kwargs  # type: Any
57    ):
58        # type: (...) -> "_models.FlowLog"
59        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FlowLog"]
60        error_map = {
61            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
62        }
63        error_map.update(kwargs.pop('error_map', {}))
64        api_version = "2019-11-01"
65        content_type = kwargs.pop("content_type", "application/json")
66        accept = "application/json"
67
68        # Construct URL
69        url = self._create_or_update_initial.metadata['url']  # type: ignore
70        path_format_arguments = {
71            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
72            'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'),
73            'flowLogName': self._serialize.url("flow_log_name", flow_log_name, 'str'),
74            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
75        }
76        url = self._client.format_url(url, **path_format_arguments)
77
78        # Construct parameters
79        query_parameters = {}  # type: Dict[str, Any]
80        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
81
82        # Construct headers
83        header_parameters = {}  # type: Dict[str, Any]
84        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
85        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
86
87        body_content_kwargs = {}  # type: Dict[str, Any]
88        body_content = self._serialize.body(parameters, 'FlowLog')
89        body_content_kwargs['content'] = body_content
90        request = self._client.put(url, query_parameters, header_parameters, **body_content_kwargs)
91        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
92        response = pipeline_response.http_response
93
94        if response.status_code not in [200, 201]:
95            map_error(status_code=response.status_code, response=response, error_map=error_map)
96            error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response)
97            raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
98
99        if response.status_code == 200:
100            deserialized = self._deserialize('FlowLog', pipeline_response)
101
102        if response.status_code == 201:
103            deserialized = self._deserialize('FlowLog', pipeline_response)
104
105        if cls:
106            return cls(pipeline_response, deserialized, {})
107
108        return deserialized
109    _create_or_update_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/flowLogs/{flowLogName}'}  # type: ignore
110
111    def begin_create_or_update(
112        self,
113        resource_group_name,  # type: str
114        network_watcher_name,  # type: str
115        flow_log_name,  # type: str
116        parameters,  # type: "_models.FlowLog"
117        **kwargs  # type: Any
118    ):
119        # type: (...) -> LROPoller["_models.FlowLog"]
120        """Create or update a flow log for the specified network security group.
121
122        :param resource_group_name: The name of the resource group.
123        :type resource_group_name: str
124        :param network_watcher_name: The name of the network watcher.
125        :type network_watcher_name: str
126        :param flow_log_name: The name of the flow log.
127        :type flow_log_name: str
128        :param parameters: Parameters that define the create or update flow log resource.
129        :type parameters: ~azure.mgmt.network.v2019_11_01.models.FlowLog
130        :keyword callable cls: A custom type or function that will be passed the direct response
131        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
132        :keyword polling: Pass in True if you'd like the ARMPolling polling method,
133         False for no polling, or your own initialized polling object for a personal polling strategy.
134        :paramtype polling: bool or ~azure.core.polling.PollingMethod
135        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
136        :return: An instance of LROPoller that returns either FlowLog or the result of cls(response)
137        :rtype: ~azure.core.polling.LROPoller[~azure.mgmt.network.v2019_11_01.models.FlowLog]
138        :raises ~azure.core.exceptions.HttpResponseError:
139        """
140        polling = kwargs.pop('polling', True)  # type: Union[bool, PollingMethod]
141        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FlowLog"]
142        lro_delay = kwargs.pop(
143            'polling_interval',
144            self._config.polling_interval
145        )
146        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
147        if cont_token is None:
148            raw_result = self._create_or_update_initial(
149                resource_group_name=resource_group_name,
150                network_watcher_name=network_watcher_name,
151                flow_log_name=flow_log_name,
152                parameters=parameters,
153                cls=lambda x,y,z: x,
154                **kwargs
155            )
156
157        kwargs.pop('error_map', None)
158        kwargs.pop('content_type', None)
159
160        def get_long_running_output(pipeline_response):
161            deserialized = self._deserialize('FlowLog', pipeline_response)
162
163            if cls:
164                return cls(pipeline_response, deserialized, {})
165            return deserialized
166
167        path_format_arguments = {
168            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
169            'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'),
170            'flowLogName': self._serialize.url("flow_log_name", flow_log_name, 'str'),
171            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
172        }
173
174        if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments,  **kwargs)
175        elif polling is False: polling_method = NoPolling()
176        else: polling_method = polling
177        if cont_token:
178            return LROPoller.from_continuation_token(
179                polling_method=polling_method,
180                continuation_token=cont_token,
181                client=self._client,
182                deserialization_callback=get_long_running_output
183            )
184        else:
185            return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
186    begin_create_or_update.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/flowLogs/{flowLogName}'}  # type: ignore
187
188    def get(
189        self,
190        resource_group_name,  # type: str
191        network_watcher_name,  # type: str
192        flow_log_name,  # type: str
193        **kwargs  # type: Any
194    ):
195        # type: (...) -> "_models.FlowLog"
196        """Gets a flow log resource by name.
197
198        :param resource_group_name: The name of the resource group.
199        :type resource_group_name: str
200        :param network_watcher_name: The name of the network watcher.
201        :type network_watcher_name: str
202        :param flow_log_name: The name of the flow log resource.
203        :type flow_log_name: str
204        :keyword callable cls: A custom type or function that will be passed the direct response
205        :return: FlowLog, or the result of cls(response)
206        :rtype: ~azure.mgmt.network.v2019_11_01.models.FlowLog
207        :raises: ~azure.core.exceptions.HttpResponseError
208        """
209        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FlowLog"]
210        error_map = {
211            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
212        }
213        error_map.update(kwargs.pop('error_map', {}))
214        api_version = "2019-11-01"
215        accept = "application/json"
216
217        # Construct URL
218        url = self.get.metadata['url']  # type: ignore
219        path_format_arguments = {
220            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
221            'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'),
222            'flowLogName': self._serialize.url("flow_log_name", flow_log_name, 'str'),
223            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
224        }
225        url = self._client.format_url(url, **path_format_arguments)
226
227        # Construct parameters
228        query_parameters = {}  # type: Dict[str, Any]
229        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
230
231        # Construct headers
232        header_parameters = {}  # type: Dict[str, Any]
233        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
234
235        request = self._client.get(url, query_parameters, header_parameters)
236        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
237        response = pipeline_response.http_response
238
239        if response.status_code not in [200]:
240            map_error(status_code=response.status_code, response=response, error_map=error_map)
241            error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response)
242            raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
243
244        deserialized = self._deserialize('FlowLog', pipeline_response)
245
246        if cls:
247            return cls(pipeline_response, deserialized, {})
248
249        return deserialized
250    get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/flowLogs/{flowLogName}'}  # type: ignore
251
252    def _delete_initial(
253        self,
254        resource_group_name,  # type: str
255        network_watcher_name,  # type: str
256        flow_log_name,  # type: str
257        **kwargs  # type: Any
258    ):
259        # type: (...) -> None
260        cls = kwargs.pop('cls', None)  # type: ClsType[None]
261        error_map = {
262            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
263        }
264        error_map.update(kwargs.pop('error_map', {}))
265        api_version = "2019-11-01"
266        accept = "application/json"
267
268        # Construct URL
269        url = self._delete_initial.metadata['url']  # type: ignore
270        path_format_arguments = {
271            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
272            'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'),
273            'flowLogName': self._serialize.url("flow_log_name", flow_log_name, 'str'),
274            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
275        }
276        url = self._client.format_url(url, **path_format_arguments)
277
278        # Construct parameters
279        query_parameters = {}  # type: Dict[str, Any]
280        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
281
282        # Construct headers
283        header_parameters = {}  # type: Dict[str, Any]
284        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
285
286        request = self._client.delete(url, query_parameters, header_parameters)
287        pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
288        response = pipeline_response.http_response
289
290        if response.status_code not in [202, 204]:
291            map_error(status_code=response.status_code, response=response, error_map=error_map)
292            error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response)
293            raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
294
295        if cls:
296            return cls(pipeline_response, None, {})
297
298    _delete_initial.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/flowLogs/{flowLogName}'}  # type: ignore
299
300    def begin_delete(
301        self,
302        resource_group_name,  # type: str
303        network_watcher_name,  # type: str
304        flow_log_name,  # type: str
305        **kwargs  # type: Any
306    ):
307        # type: (...) -> LROPoller[None]
308        """Deletes the specified flow log resource.
309
310        :param resource_group_name: The name of the resource group.
311        :type resource_group_name: str
312        :param network_watcher_name: The name of the network watcher.
313        :type network_watcher_name: str
314        :param flow_log_name: The name of the flow log resource.
315        :type flow_log_name: str
316        :keyword callable cls: A custom type or function that will be passed the direct response
317        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
318        :keyword polling: Pass in True if you'd like the ARMPolling polling method,
319         False for no polling, or your own initialized polling object for a personal polling strategy.
320        :paramtype polling: bool or ~azure.core.polling.PollingMethod
321        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
322        :return: An instance of LROPoller that returns either None or the result of cls(response)
323        :rtype: ~azure.core.polling.LROPoller[None]
324        :raises ~azure.core.exceptions.HttpResponseError:
325        """
326        polling = kwargs.pop('polling', True)  # type: Union[bool, PollingMethod]
327        cls = kwargs.pop('cls', None)  # type: ClsType[None]
328        lro_delay = kwargs.pop(
329            'polling_interval',
330            self._config.polling_interval
331        )
332        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
333        if cont_token is None:
334            raw_result = self._delete_initial(
335                resource_group_name=resource_group_name,
336                network_watcher_name=network_watcher_name,
337                flow_log_name=flow_log_name,
338                cls=lambda x,y,z: x,
339                **kwargs
340            )
341
342        kwargs.pop('error_map', None)
343        kwargs.pop('content_type', None)
344
345        def get_long_running_output(pipeline_response):
346            if cls:
347                return cls(pipeline_response, None, {})
348
349        path_format_arguments = {
350            'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
351            'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'),
352            'flowLogName': self._serialize.url("flow_log_name", flow_log_name, 'str'),
353            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
354        }
355
356        if polling is True: polling_method = ARMPolling(lro_delay, lro_options={'final-state-via': 'location'}, path_format_arguments=path_format_arguments,  **kwargs)
357        elif polling is False: polling_method = NoPolling()
358        else: polling_method = polling
359        if cont_token:
360            return LROPoller.from_continuation_token(
361                polling_method=polling_method,
362                continuation_token=cont_token,
363                client=self._client,
364                deserialization_callback=get_long_running_output
365            )
366        else:
367            return LROPoller(self._client, raw_result, get_long_running_output, polling_method)
368    begin_delete.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/flowLogs/{flowLogName}'}  # type: ignore
369
370    def list(
371        self,
372        resource_group_name,  # type: str
373        network_watcher_name,  # type: str
374        **kwargs  # type: Any
375    ):
376        # type: (...) -> Iterable["_models.FlowLogListResult"]
377        """Lists all flow log resources for the specified Network Watcher.
378
379        :param resource_group_name: The name of the resource group containing Network Watcher.
380        :type resource_group_name: str
381        :param network_watcher_name: The name of the Network Watcher resource.
382        :type network_watcher_name: str
383        :keyword callable cls: A custom type or function that will be passed the direct response
384        :return: An iterator like instance of either FlowLogListResult or the result of cls(response)
385        :rtype: ~azure.core.paging.ItemPaged[~azure.mgmt.network.v2019_11_01.models.FlowLogListResult]
386        :raises: ~azure.core.exceptions.HttpResponseError
387        """
388        cls = kwargs.pop('cls', None)  # type: ClsType["_models.FlowLogListResult"]
389        error_map = {
390            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
391        }
392        error_map.update(kwargs.pop('error_map', {}))
393        api_version = "2019-11-01"
394        accept = "application/json"
395
396        def prepare_request(next_link=None):
397            # Construct headers
398            header_parameters = {}  # type: Dict[str, Any]
399            header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
400
401            if not next_link:
402                # Construct URL
403                url = self.list.metadata['url']  # type: ignore
404                path_format_arguments = {
405                    'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
406                    'networkWatcherName': self._serialize.url("network_watcher_name", network_watcher_name, 'str'),
407                    'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
408                }
409                url = self._client.format_url(url, **path_format_arguments)
410                # Construct parameters
411                query_parameters = {}  # type: Dict[str, Any]
412                query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
413
414                request = self._client.get(url, query_parameters, header_parameters)
415            else:
416                url = next_link
417                query_parameters = {}  # type: Dict[str, Any]
418                request = self._client.get(url, query_parameters, header_parameters)
419            return request
420
421        def extract_data(pipeline_response):
422            deserialized = self._deserialize('FlowLogListResult', pipeline_response)
423            list_of_elem = deserialized.value
424            if cls:
425                list_of_elem = cls(list_of_elem)
426            return deserialized.next_link or None, iter(list_of_elem)
427
428        def get_next(next_link=None):
429            request = prepare_request(next_link)
430
431            pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
432            response = pipeline_response.http_response
433
434            if response.status_code not in [200]:
435                error = self._deserialize.failsafe_deserialize(_models.ErrorResponse, response)
436                map_error(status_code=response.status_code, response=response, error_map=error_map)
437                raise HttpResponseError(response=response, model=error, error_format=ARMErrorFormat)
438
439            return pipeline_response
440
441        return ItemPaged(
442            get_next, extract_data
443        )
444    list.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkWatchers/{networkWatcherName}/flowLogs'}  # type: ignore
445