1# coding=utf-8 2# -------------------------------------------------------------------------- 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# Licensed under the MIT License. See License.txt in the project root for license information. 5# Code generated by Microsoft (R) AutoRest Code Generator. 6# Changes may cause incorrect behavior and will be lost if the code is regenerated. 7# -------------------------------------------------------------------------- 8from typing import Any, Callable, Dict, Generic, Optional, TypeVar, Union 9import warnings 10 11from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error 12from azure.core.pipeline import PipelineResponse 13from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest 14from azure.core.polling import AsyncLROPoller, AsyncNoPolling, AsyncPollingMethod 15from azure.mgmt.core.exceptions import ARMErrorFormat 16from azure.mgmt.core.polling.async_arm_polling import AsyncARMPolling 17 18from ... import models as _models 19 20T = TypeVar('T') 21ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]] 22 23class LogAnalyticsOperations: 24 """LogAnalyticsOperations async operations. 25 26 You should not instantiate this class directly. Instead, you should create a Client instance that 27 instantiates it for you and attaches it as an attribute. 28 29 :ivar models: Alias to model classes used in this operation group. 30 :type models: ~azure.mgmt.compute.v2018_10_01.models 31 :param client: Client for service requests. 32 :param config: Configuration of service client. 33 :param serializer: An object model serializer. 34 :param deserializer: An object model deserializer. 35 """ 36 37 models = _models 38 39 def __init__(self, client, config, serializer, deserializer) -> None: 40 self._client = client 41 self._serialize = serializer 42 self._deserialize = deserializer 43 self._config = config 44 45 async def _export_request_rate_by_interval_initial( 46 self, 47 location: str, 48 parameters: "_models.RequestRateByIntervalInput", 49 **kwargs: Any 50 ) -> Optional["_models.LogAnalyticsOperationResult"]: 51 cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.LogAnalyticsOperationResult"]] 52 error_map = { 53 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 54 } 55 error_map.update(kwargs.pop('error_map', {})) 56 api_version = "2018-10-01" 57 content_type = kwargs.pop("content_type", "application/json") 58 accept = "application/json" 59 60 # Construct URL 61 url = self._export_request_rate_by_interval_initial.metadata['url'] # type: ignore 62 path_format_arguments = { 63 'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'), 64 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 65 } 66 url = self._client.format_url(url, **path_format_arguments) 67 68 # Construct parameters 69 query_parameters = {} # type: Dict[str, Any] 70 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 71 72 # Construct headers 73 header_parameters = {} # type: Dict[str, Any] 74 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 75 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 76 77 body_content_kwargs = {} # type: Dict[str, Any] 78 body_content = self._serialize.body(parameters, 'RequestRateByIntervalInput') 79 body_content_kwargs['content'] = body_content 80 request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs) 81 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 82 response = pipeline_response.http_response 83 84 if response.status_code not in [200, 202]: 85 map_error(status_code=response.status_code, response=response, error_map=error_map) 86 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 87 88 deserialized = None 89 if response.status_code == 200: 90 deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response) 91 92 if cls: 93 return cls(pipeline_response, deserialized, {}) 94 95 return deserialized 96 _export_request_rate_by_interval_initial.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getRequestRateByInterval'} # type: ignore 97 98 async def begin_export_request_rate_by_interval( 99 self, 100 location: str, 101 parameters: "_models.RequestRateByIntervalInput", 102 **kwargs: Any 103 ) -> AsyncLROPoller["_models.LogAnalyticsOperationResult"]: 104 """Export logs that show Api requests made by this subscription in the given time window to show 105 throttling activities. 106 107 :param location: The location upon which virtual-machine-sizes is queried. 108 :type location: str 109 :param parameters: Parameters supplied to the LogAnalytics getRequestRateByInterval Api. 110 :type parameters: ~azure.mgmt.compute.v2018_10_01.models.RequestRateByIntervalInput 111 :keyword callable cls: A custom type or function that will be passed the direct response 112 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 113 :keyword polling: By default, your polling method will be AsyncARMPolling. 114 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 115 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 116 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 117 :return: An instance of AsyncLROPoller that returns either LogAnalyticsOperationResult or the result of cls(response) 118 :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2018_10_01.models.LogAnalyticsOperationResult] 119 :raises ~azure.core.exceptions.HttpResponseError: 120 """ 121 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 122 cls = kwargs.pop('cls', None) # type: ClsType["_models.LogAnalyticsOperationResult"] 123 lro_delay = kwargs.pop( 124 'polling_interval', 125 self._config.polling_interval 126 ) 127 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 128 if cont_token is None: 129 raw_result = await self._export_request_rate_by_interval_initial( 130 location=location, 131 parameters=parameters, 132 cls=lambda x,y,z: x, 133 **kwargs 134 ) 135 136 kwargs.pop('error_map', None) 137 kwargs.pop('content_type', None) 138 139 def get_long_running_output(pipeline_response): 140 deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response) 141 142 if cls: 143 return cls(pipeline_response, deserialized, {}) 144 return deserialized 145 146 path_format_arguments = { 147 'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'), 148 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 149 } 150 151 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments, **kwargs) 152 elif polling is False: polling_method = AsyncNoPolling() 153 else: polling_method = polling 154 if cont_token: 155 return AsyncLROPoller.from_continuation_token( 156 polling_method=polling_method, 157 continuation_token=cont_token, 158 client=self._client, 159 deserialization_callback=get_long_running_output 160 ) 161 else: 162 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 163 begin_export_request_rate_by_interval.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getRequestRateByInterval'} # type: ignore 164 165 async def _export_throttled_requests_initial( 166 self, 167 location: str, 168 parameters: "_models.ThrottledRequestsInput", 169 **kwargs: Any 170 ) -> Optional["_models.LogAnalyticsOperationResult"]: 171 cls = kwargs.pop('cls', None) # type: ClsType[Optional["_models.LogAnalyticsOperationResult"]] 172 error_map = { 173 401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError 174 } 175 error_map.update(kwargs.pop('error_map', {})) 176 api_version = "2018-10-01" 177 content_type = kwargs.pop("content_type", "application/json") 178 accept = "application/json" 179 180 # Construct URL 181 url = self._export_throttled_requests_initial.metadata['url'] # type: ignore 182 path_format_arguments = { 183 'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'), 184 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 185 } 186 url = self._client.format_url(url, **path_format_arguments) 187 188 # Construct parameters 189 query_parameters = {} # type: Dict[str, Any] 190 query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str') 191 192 # Construct headers 193 header_parameters = {} # type: Dict[str, Any] 194 header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str') 195 header_parameters['Accept'] = self._serialize.header("accept", accept, 'str') 196 197 body_content_kwargs = {} # type: Dict[str, Any] 198 body_content = self._serialize.body(parameters, 'ThrottledRequestsInput') 199 body_content_kwargs['content'] = body_content 200 request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs) 201 pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs) 202 response = pipeline_response.http_response 203 204 if response.status_code not in [200, 202]: 205 map_error(status_code=response.status_code, response=response, error_map=error_map) 206 raise HttpResponseError(response=response, error_format=ARMErrorFormat) 207 208 deserialized = None 209 if response.status_code == 200: 210 deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response) 211 212 if cls: 213 return cls(pipeline_response, deserialized, {}) 214 215 return deserialized 216 _export_throttled_requests_initial.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getThrottledRequests'} # type: ignore 217 218 async def begin_export_throttled_requests( 219 self, 220 location: str, 221 parameters: "_models.ThrottledRequestsInput", 222 **kwargs: Any 223 ) -> AsyncLROPoller["_models.LogAnalyticsOperationResult"]: 224 """Export logs that show total throttled Api requests for this subscription in the given time 225 window. 226 227 :param location: The location upon which virtual-machine-sizes is queried. 228 :type location: str 229 :param parameters: Parameters supplied to the LogAnalytics getThrottledRequests Api. 230 :type parameters: ~azure.mgmt.compute.v2018_10_01.models.ThrottledRequestsInput 231 :keyword callable cls: A custom type or function that will be passed the direct response 232 :keyword str continuation_token: A continuation token to restart a poller from a saved state. 233 :keyword polling: By default, your polling method will be AsyncARMPolling. 234 Pass in False for this operation to not poll, or pass in your own initialized polling object for a personal polling strategy. 235 :paramtype polling: bool or ~azure.core.polling.AsyncPollingMethod 236 :keyword int polling_interval: Default waiting time between two polls for LRO operations if no Retry-After header is present. 237 :return: An instance of AsyncLROPoller that returns either LogAnalyticsOperationResult or the result of cls(response) 238 :rtype: ~azure.core.polling.AsyncLROPoller[~azure.mgmt.compute.v2018_10_01.models.LogAnalyticsOperationResult] 239 :raises ~azure.core.exceptions.HttpResponseError: 240 """ 241 polling = kwargs.pop('polling', True) # type: Union[bool, AsyncPollingMethod] 242 cls = kwargs.pop('cls', None) # type: ClsType["_models.LogAnalyticsOperationResult"] 243 lro_delay = kwargs.pop( 244 'polling_interval', 245 self._config.polling_interval 246 ) 247 cont_token = kwargs.pop('continuation_token', None) # type: Optional[str] 248 if cont_token is None: 249 raw_result = await self._export_throttled_requests_initial( 250 location=location, 251 parameters=parameters, 252 cls=lambda x,y,z: x, 253 **kwargs 254 ) 255 256 kwargs.pop('error_map', None) 257 kwargs.pop('content_type', None) 258 259 def get_long_running_output(pipeline_response): 260 deserialized = self._deserialize('LogAnalyticsOperationResult', pipeline_response) 261 262 if cls: 263 return cls(pipeline_response, deserialized, {}) 264 return deserialized 265 266 path_format_arguments = { 267 'location': self._serialize.url("location", location, 'str', pattern=r'^[-\w\._]+$'), 268 'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'), 269 } 270 271 if polling is True: polling_method = AsyncARMPolling(lro_delay, lro_options={'final-state-via': 'azure-async-operation'}, path_format_arguments=path_format_arguments, **kwargs) 272 elif polling is False: polling_method = AsyncNoPolling() 273 else: polling_method = polling 274 if cont_token: 275 return AsyncLROPoller.from_continuation_token( 276 polling_method=polling_method, 277 continuation_token=cont_token, 278 client=self._client, 279 deserialization_callback=get_long_running_output 280 ) 281 else: 282 return AsyncLROPoller(self._client, raw_result, get_long_running_output, polling_method) 283 begin_export_throttled_requests.metadata = {'url': '/subscriptions/{subscriptionId}/providers/Microsoft.Compute/locations/{location}/logAnalytics/apiAccess/getThrottledRequests'} # type: ignore 284