1# coding=utf-8
2# --------------------------------------------------------------------------
3# Copyright (c) Microsoft Corporation. All rights reserved.
4# Licensed under the MIT License. See License.txt in the project root for license information.
5# Code generated by Microsoft (R) AutoRest Code Generator.
6# Changes may cause incorrect behavior and will be lost if the code is regenerated.
7# --------------------------------------------------------------------------
8from typing import Any, Callable, Dict, Generic, Optional, TypeVar, Union
9import warnings
10
11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
12from azure.core.pipeline import PipelineResponse
13from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest
14from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod
15from azure.mgmt.core.exceptions import ARMErrorFormat
16from azure.mgmt.core.polling.async_arm_polling import AsyncARMPolling
17
18from ... import models as _models
19
20T = TypeVar('T')
21ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
22
23class LogAnalyticsOperations:
24    """LogAnalyticsOperations async operations.
25
26    You should not instantiate this class directly. Instead, you should create a Client instance that
27    instantiates it for you and attaches it as an attribute.
28
29    :ivar models: Alias to model classes used in this operation group.
30    :type models: ~azure.mgmt.compute.v2018_10_01.models
31    :param client: Client for service requests.
32    :param config: Configuration of service client.
33    :param serializer: An object model serializer.
34    :param deserializer: An object model deserializer.
35    """
36
37    models = _models
38
39    def __init__(self, client, config, serializer, deserializer) -> None:
40        self._client = client
41        self._serialize = serializer
42        self._deserialize = deserializer
43        self._config = config
44
45    async def _export_request_rate_by_interval_initial(
46        self,
47        location: str,
48        parameters: "_models.RequestRateByIntervalInput",
49        **kwargs: Any
50    ) -> Optional["_models.LogAnalyticsOperationResult"]:
51        cls = kwargs.pop('cls', None)  # type: ClsType[Optional["_models.LogAnalyticsOperationResult"]]
52        error_map = {
53            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
54        }
55        error_map.update(kwargs.pop('error_map', {}))
56        api_version = "2018-10-01"
57        content_type = kwargs.pop("content_type", "application/json")
58        accept = "application/json"
59
60        # Construct URL
61        url = self._export_request_rate_by_interval_initial.metadata['url']  # type: ignore
62        path_format_arguments = {
63            'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'),
64            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
65        }
66        url = self._client.format_url(url, **path_format_arguments)
67
68        # Construct parameters
69        query_parameters = {}  # type: Dict[str, Any]
70        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
71
72        # Construct headers
73        header_parameters = {}  # type: Dict[str, Any]
74        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
75        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
76
77        body_content_kwargs = {}  # type: Dict[str, Any]
78        body_content = self._serialize.body(parameters, 'RequestRateByIntervalInput')
79        body_content_kwargs['content'] = body_content
80        request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
81        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
82        response = pipeline_response.http_response
83
84        if response.status_code not in [200, 202]:
85            map_error(status_code=response.status_code, response=response, error_map=error_map)
86            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
87
88        deserialized = None
89        if response.status_code == 200:
90            deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response)
91
92        if cls:
93            return cls(pipeline_response, deserialized, {})
94
95        return deserialized
96    _export_request_rate_by_interval_initial.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getRequestRateByInterval'}  # type: ignore
97
98    async def begin_export_request_rate_by_interval(
99        self,
100        location: str,
101        parameters: "_models.RequestRateByIntervalInput",
102        **kwargs: Any
103    ) -> AsyncLROPoller["_models.LogAnalyticsOperationResult"]:
104        """Export logs that show Api requests made by this subscription in the given time window to show
105        throttling activities.
106
107        :param location: The location upon which virtual-machine-sizes is queried.
108        :type location: str
109        :param parameters: Parameters supplied to the LogAnalytics getRequestRateByInterval Api.
110        :type parameters: ~azure.mgmt.compute.v2018_10_01.models.RequestRateByIntervalInput
111        :keyword callable cls: A custom type or function that will be passed the direct response
112        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
113        :keyword polling: By default, your polling method will be AsyncARMPolling.
114         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
115        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
116        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
117        :return: An instance of AsyncLROPoller that returns either LogAnalyticsOperationResult or the result of cls(response)
118        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2018_10_01.models.LogAnalyticsOperationResult]
119        :raises ~azure.core.exceptions.HttpResponseError:
120        """
121        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
122        cls = kwargs.pop('cls', None)  # type: ClsType["_models.LogAnalyticsOperationResult"]
123        lro_delay = kwargs.pop(
124            'polling_interval',
125            self._config.polling_interval
126        )
127        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
128        if cont_token is None:
129            raw_result = await self._export_request_rate_by_interval_initial(
130                location=location,
131                parameters=parameters,
132                cls=lambda x,y,z: x,
133                **kwargs
134            )
135
136        kwargs.pop('error_map', None)
137        kwargs.pop('content_type', None)
138
139        def get_long_running_output(pipeline_response):
140            deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response)
141
142            if cls:
143                return cls(pipeline_response, deserialized, {})
144            return deserialized
145
146        path_format_arguments = {
147            'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'),
148            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
149        }
150
151        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments,  **kwargs)
152        elif polling is False: polling_method = AsyncNoPolling()
153        else: polling_method = polling
154        if cont_token:
155            return AsyncLROPoller.from_continuation_token(
156                polling_method=polling_method,
157                continuation_token=cont_token,
158                client=self._client,
159                deserialization_callback=get_long_running_output
160            )
161        else:
162            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
163    begin_export_request_rate_by_interval.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getRequestRateByInterval'}  # type: ignore
164
165    async def _export_throttled_requests_initial(
166        self,
167        location: str,
168        parameters: "_models.ThrottledRequestsInput",
169        **kwargs: Any
170    ) -> Optional["_models.LogAnalyticsOperationResult"]:
171        cls = kwargs.pop('cls', None)  # type: ClsType[Optional["_models.LogAnalyticsOperationResult"]]
172        error_map = {
173            401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
174        }
175        error_map.update(kwargs.pop('error_map', {}))
176        api_version = "2018-10-01"
177        content_type = kwargs.pop("content_type", "application/json")
178        accept = "application/json"
179
180        # Construct URL
181        url = self._export_throttled_requests_initial.metadata['url']  # type: ignore
182        path_format_arguments = {
183            'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'),
184            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
185        }
186        url = self._client.format_url(url, **path_format_arguments)
187
188        # Construct parameters
189        query_parameters = {}  # type: Dict[str, Any]
190        query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
191
192        # Construct headers
193        header_parameters = {}  # type: Dict[str, Any]
194        header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')
195        header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
196
197        body_content_kwargs = {}  # type: Dict[str, Any]
198        body_content = self._serialize.body(parameters, 'ThrottledRequestsInput')
199        body_content_kwargs['content'] = body_content
200        request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
201        pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
202        response = pipeline_response.http_response
203
204        if response.status_code not in [200, 202]:
205            map_error(status_code=response.status_code, response=response, error_map=error_map)
206            raise HttpResponseError(response=response, error_format=ARMErrorFormat)
207
208        deserialized = None
209        if response.status_code == 200:
210            deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response)
211
212        if cls:
213            return cls(pipeline_response, deserialized, {})
214
215        return deserialized
216    _export_throttled_requests_initial.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getThrottledRequests'}  # type: ignore
217
218    async def begin_export_throttled_requests(
219        self,
220        location: str,
221        parameters: "_models.ThrottledRequestsInput",
222        **kwargs: Any
223    ) -> AsyncLROPoller["_models.LogAnalyticsOperationResult"]:
224        """Export logs that show total throttled Api requests for this subscription in the given time
225        window.
226
227        :param location: The location upon which virtual-machine-sizes is queried.
228        :type location: str
229        :param parameters: Parameters supplied to the LogAnalytics getThrottledRequests Api.
230        :type parameters: ~azure.mgmt.compute.v2018_10_01.models.ThrottledRequestsInput
231        :keyword callable cls: A custom type or function that will be passed the direct response
232        :keyword str continuation_token: A continuation token to restart a poller from a saved state.
233        :keyword polling: By default, your polling method will be AsyncARMPolling.
234         Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy.
235        :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod
236        :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present.
237        :return: An instance of AsyncLROPoller that returns either LogAnalyticsOperationResult or the result of cls(response)
238        :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2018_10_01.models.LogAnalyticsOperationResult]
239        :raises ~azure.core.exceptions.HttpResponseError:
240        """
241        polling = kwargs.pop('polling', True)  # type: Union[bool, AsyncPollingMethod]
242        cls = kwargs.pop('cls', None)  # type: ClsType["_models.LogAnalyticsOperationResult"]
243        lro_delay = kwargs.pop(
244            'polling_interval',
245            self._config.polling_interval
246        )
247        cont_token = kwargs.pop('continuation_token', None)  # type: Optional[str]
248        if cont_token is None:
249            raw_result = await self._export_throttled_requests_initial(
250                location=location,
251                parameters=parameters,
252                cls=lambda x,y,z: x,
253                **kwargs
254            )
255
256        kwargs.pop('error_map', None)
257        kwargs.pop('content_type', None)
258
259        def get_long_running_output(pipeline_response):
260            deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response)
261
262            if cls:
263                return cls(pipeline_response, deserialized, {})
264            return deserialized
265
266        path_format_arguments = {
267            'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'),
268            'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
269        }
270
271        if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments,  **kwargs)
272        elif polling is False: polling_method = AsyncNoPolling()
273        else: polling_method = polling
274        if cont_token:
275            return AsyncLROPoller.from_continuation_token(
276                polling_method=polling_method,
277                continuation_token=cont_token,
278                client=self._client,
279                deserialization_callback=get_long_running_output
280            )
281        else:
282            return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method)
283    begin_export_throttled_requests.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getThrottledRequests'}  # type: ignore
284