1# -------------------------------------------------------------------------- 2# 3# Copyright (c) Microsoft Corporation. All rights reserved. 4# 5# The MIT License (MIT) 6# 7# Permission is hereby granted, free of charge, to any person obtaining a copy 8# of this software and associated documentation files (the ""Software""), to 9# deal in the Software without restriction, including without limitation the 10# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 11# sell copies of the Software, and to permit persons to whom the Software is 12# furnished to do so, subject to the following conditions: 13# 14# The above copyright notice and this permission notice shall be included in 15# all copies or substantial portions of the Software. 16# 17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 23# IN THE SOFTWARE. 24# 25# -------------------------------------------------------------------------- 26 27import re 28import threading 29import time 30import uuid 31try: 32 from urlparse import urlparse 33except ImportError: 34 from urllib.parse import urlparse 35 36from msrest.exceptions import DeserializationError, ClientException 37from msrestazure.azure_exceptions import CloudError 38 39 40FINISHED = frozenset(['succeeded', 'canceled', 'failed']) 41FAILED = frozenset(['canceled', 'failed']) 42SUCCEEDED = frozenset(['succeeded']) 43 44 45def finished(status): 46 if hasattr(status, 'value'): 47 status = status.value 48 return str(status).lower() in FINISHED 49 50 51def failed(status): 52 if hasattr(status, 'value'): 53 status = status.value 54 return str(status).lower() in FAILED 55 56 57def succeeded(status): 58 if hasattr(status, 'value'): 59 status = status.value 60 return str(status).lower() in SUCCEEDED 61 62 63def _validate(url): 64 """Validate a url. 65 66 :param str url: Polling URL extracted from response header. 67 :raises: ValueError if URL has no scheme or host. 68 """ 69 if url is None: 70 return 71 parsed = urlparse(url) 72 if not parsed.scheme or not parsed.netloc: 73 raise ValueError("Invalid URL header") 74 75def _get_header_url(response, header_name): 76 """Get a URL from a header requests. 77 78 :param requests.Response response: REST call response. 79 :param str header_name: Header name. 80 :returns: URL if not None AND valid, None otherwise 81 """ 82 url = response.headers.get(header_name) 83 try: 84 _validate(url) 85 except ValueError: 86 return None 87 else: 88 return url 89 90class BadStatus(Exception): 91 pass 92 93 94class BadResponse(Exception): 95 pass 96 97 98class OperationFailed(Exception): 99 pass 100 101 102class SimpleResource: 103 """An implementation of Python 3 SimpleNamespace. 104 Used to deserialize resource objects from response bodies where 105 no particular object type has been specified. 106 """ 107 108 def __init__(self, **kwargs): 109 self.__dict__.update(kwargs) 110 111 def __repr__(self): 112 keys = sorted(self.__dict__) 113 items = ("{}={!r}".format(k, self.__dict__[k]) for k in keys) 114 return "{}({})".format(type(self).__name__, ", ".join(items)) 115 116 def __eq__(self, other): 117 return self.__dict__ == other.__dict__ 118 119 120class LongRunningOperation(object): 121 """LongRunningOperation 122 Provides default logic for interpreting operation responses 123 and status updates. 124 """ 125 _convert = re.compile('([a-z0-9])([A-Z])') 126 127 def __init__(self, response, outputs): 128 self.method = response.request.method 129 self.status = "" 130 self.resource = None 131 self.get_outputs = outputs 132 self.async_url = None 133 self.location_url = None 134 self.initial_status_code = None 135 136 def _raise_if_bad_http_status_and_method(self, response): 137 """Check response status code is valid for a Put or Patch 138 request. Must be 200, 201, 202, or 204. 139 140 :raises: BadStatus if invalid status. 141 """ 142 code = response.status_code 143 if code in {200, 202} or \ 144 (code == 201 and self.method in {'PUT', 'PATCH'}) or \ 145 (code == 204 and self.method in {'DELETE', 'POST'}): 146 return 147 raise BadStatus( 148 "Invalid return status for {!r} operation".format(self.method)) 149 150 def _is_empty(self, response): 151 """Check if response body contains meaningful content. 152 153 :rtype: bool 154 :raises: DeserializationError if response body contains invalid 155 json data. 156 """ 157 if not response.content: 158 return True 159 try: 160 body = response.json() 161 return not body 162 except ValueError: 163 raise DeserializationError( 164 "Error occurred in deserializing the response body.") 165 166 def _deserialize(self, response): 167 """Attempt to deserialize resource from response. 168 169 :param requests.Response response: latest REST call response. 170 """ 171 # Hacking response with initial status_code 172 previous_status = response.status_code 173 response.status_code = self.initial_status_code 174 resource = self.get_outputs(response) 175 response.status_code = previous_status 176 177 # Hack for Storage or SQL, to workaround the bug in the Python generator 178 if resource is None: 179 previous_status = response.status_code 180 for status_code_to_test in [200, 201]: 181 try: 182 response.status_code = status_code_to_test 183 resource = self.get_outputs(response) 184 except ClientException: 185 pass 186 else: 187 return resource 188 finally: 189 response.status_code = previous_status 190 return resource 191 192 def _get_async_status(self, response): 193 """Attempt to find status info in response body. 194 195 :param requests.Response response: latest REST call response. 196 :rtype: str 197 :returns: Status if found, else 'None'. 198 """ 199 if self._is_empty(response): 200 return None 201 body = response.json() 202 return body.get('status') 203 204 def _get_provisioning_state(self, response): 205 """ 206 Attempt to get provisioning state from resource. 207 :param requests.Response response: latest REST call response. 208 :returns: Status if found, else 'None'. 209 """ 210 if self._is_empty(response): 211 return None 212 body = response.json() 213 return body.get("properties", {}).get("provisioningState") 214 215 def should_do_final_get(self): 216 """Check whether the polling should end doing a final GET. 217 218 :param requests.Response response: latest REST call response. 219 :rtype: bool 220 """ 221 return (self.async_url or not self.resource) and \ 222 self.method in {'PUT', 'PATCH'} 223 224 def set_initial_status(self, response): 225 """Process first response after initiating long running 226 operation and set self.status attribute. 227 228 :param requests.Response response: initial REST call response. 229 """ 230 self._raise_if_bad_http_status_and_method(response) 231 232 if self._is_empty(response): 233 self.resource = None 234 else: 235 try: 236 self.resource = self.get_outputs(response) 237 except DeserializationError: 238 self.resource = None 239 240 self.set_async_url_if_present(response) 241 242 if response.status_code in {200, 201, 202, 204}: 243 self.initial_status_code = response.status_code 244 if self.async_url or self.location_url or response.status_code == 202: 245 self.status = 'InProgress' 246 elif response.status_code == 201: 247 status = self._get_provisioning_state(response) 248 self.status = status or 'InProgress' 249 elif response.status_code == 200: 250 status = self._get_provisioning_state(response) 251 self.status = status or 'Succeeded' 252 elif response.status_code == 204: 253 self.status = 'Succeeded' 254 self.resource = None 255 else: 256 raise OperationFailed("Invalid status found") 257 return 258 raise OperationFailed("Operation failed or cancelled") 259 260 def get_status_from_location(self, response): 261 """Process the latest status update retrieved from a 'location' 262 header. 263 264 :param requests.Response response: latest REST call response. 265 :raises: BadResponse if response has no body and not status 202. 266 """ 267 self._raise_if_bad_http_status_and_method(response) 268 code = response.status_code 269 if code == 202: 270 self.status = "InProgress" 271 else: 272 self.status = 'Succeeded' 273 if self._is_empty(response): 274 self.resource = None 275 else: 276 self.resource = self._deserialize(response) 277 278 def get_status_from_resource(self, response): 279 """Process the latest status update retrieved from the same URL as 280 the previous request. 281 282 :param requests.Response response: latest REST call response. 283 :raises: BadResponse if status not 200 or 204. 284 """ 285 self._raise_if_bad_http_status_and_method(response) 286 if self._is_empty(response): 287 raise BadResponse('The response from long running operation ' 288 'does not contain a body.') 289 290 status = self._get_provisioning_state(response) 291 self.status = status or 'Succeeded' 292 293 self.resource = self._deserialize(response) 294 295 def get_status_from_async(self, response): 296 """Process the latest status update retrieved from a 297 'azure-asyncoperation' header. 298 299 :param requests.Response response: latest REST call response. 300 :raises: BadResponse if response has no body, or body does not 301 contain status. 302 """ 303 self._raise_if_bad_http_status_and_method(response) 304 if self._is_empty(response): 305 raise BadResponse('The response from long running operation ' 306 'does not contain a body.') 307 308 self.status = self._get_async_status(response) 309 if not self.status: 310 raise BadResponse("No status found in body") 311 312 # Status can contains information, see ARM spec: 313 # https://github.com/Azure/azure-resource-manager-rpc/blob/master/v1.0/Addendum.md#operation-resource-format 314 # "properties": { 315 # /\* The resource provider can choose the values here, but it should only be 316 # returned on a successful operation (status being "Succeeded"). \*/ 317 #}, 318 # So try to parse it 319 try: 320 self.resource = self.get_outputs(response) 321 except Exception: 322 self.resource = None 323 324 def set_async_url_if_present(self, response): 325 async_url = _get_header_url(response, 'azure-asyncoperation') 326 if async_url: 327 self.async_url = async_url 328 329 location_url = _get_header_url(response, 'location') 330 if location_url: 331 self.location_url = location_url 332 333 334class AzureOperationPoller(object): 335 """Initiates long running operation and polls status in separate 336 thread. 337 338 :param callable send_cmd: The API request to initiate the operation. 339 :param callable update_cmd: The API reuqest to check the status of 340 the operation. 341 :param callable output_cmd: The function to deserialize the resource 342 of the operation. 343 :param int timeout: Time in seconds to wait between status calls, 344 default is 30. 345 """ 346 347 def __init__(self, send_cmd, output_cmd, update_cmd, timeout=30): 348 self._timeout = timeout 349 self._callbacks = [] 350 351 try: 352 self._response = send_cmd() 353 self._operation = LongRunningOperation(self._response, output_cmd) 354 self._operation.set_initial_status(self._response) 355 except BadStatus: 356 self._operation.status = 'Failed' 357 raise CloudError(self._response) 358 except BadResponse as err: 359 self._operation.status = 'Failed' 360 raise CloudError(self._response, str(err)) 361 except OperationFailed: 362 raise CloudError(self._response) 363 364 self._thread = None 365 self._done = None 366 self._exception = None 367 if not finished(self.status()): 368 self._done = threading.Event() 369 self._thread = threading.Thread( 370 target=self._start, 371 name="AzureOperationPoller({})".format(uuid.uuid4()), 372 args=(update_cmd,)) 373 self._thread.daemon = True 374 self._thread.start() 375 376 def _start(self, update_cmd): 377 """Start the long running operation. 378 On completion, runs any callbacks. 379 380 :param callable update_cmd: The API reuqest to check the status of 381 the operation. 382 """ 383 try: 384 self._poll(update_cmd) 385 386 except BadStatus: 387 self._operation.status = 'Failed' 388 self._exception = CloudError(self._response) 389 390 except BadResponse as err: 391 self._operation.status = 'Failed' 392 self._exception = CloudError(self._response, str(err)) 393 394 except OperationFailed: 395 self._exception = CloudError(self._response) 396 397 except Exception as err: 398 self._exception = err 399 400 finally: 401 self._done.set() 402 403 callbacks, self._callbacks = self._callbacks, [] 404 while callbacks: 405 for call in callbacks: 406 call(self._operation) 407 callbacks, self._callbacks = self._callbacks, [] 408 409 def _delay(self): 410 """Check for a 'retry-after' header to set timeout, 411 otherwise use configured timeout. 412 """ 413 if self._response is None: 414 return 415 if self._response.headers.get('retry-after'): 416 time.sleep(int(self._response.headers['retry-after'])) 417 else: 418 time.sleep(self._timeout) 419 420 def _polling_cookie(self): 421 """Collect retry cookie - we only want to do this for the test server 422 at this point, unless we implement a proper cookie policy. 423 424 :returns: Dictionary containing a cookie header if required, 425 otherwise an empty dictionary. 426 """ 427 parsed_url = urlparse(self._response.request.url) 428 host = parsed_url.hostname.strip('.') 429 if host == 'localhost': 430 return {'cookie': self._response.headers.get('set-cookie', '')} 431 return {} 432 433 def _poll(self, update_cmd): 434 """Poll status of operation so long as operation is incomplete and 435 we have an endpoint to query. 436 437 :param callable update_cmd: The function to call to retrieve the 438 latest status of the long running operation. 439 :raises: OperationFailed if operation status 'Failed' or 'Cancelled'. 440 :raises: BadStatus if response status invalid. 441 :raises: BadResponse if response invalid. 442 """ 443 initial_url = self._response.request.url 444 445 while not finished(self.status()): 446 self._delay() 447 headers = self._polling_cookie() 448 449 if self._operation.async_url: 450 self._response = update_cmd( 451 self._operation.async_url, headers) 452 self._operation.set_async_url_if_present(self._response) 453 self._operation.get_status_from_async( 454 self._response) 455 elif self._operation.location_url: 456 self._response = update_cmd( 457 self._operation.location_url, headers) 458 self._operation.set_async_url_if_present(self._response) 459 self._operation.get_status_from_location( 460 self._response) 461 elif self._operation.method == "PUT": 462 self._response = update_cmd(initial_url, headers) 463 self._operation.set_async_url_if_present(self._response) 464 self._operation.get_status_from_resource( 465 self._response) 466 else: 467 raise BadResponse( 468 'Location header is missing from long running operation.') 469 470 if failed(self._operation.status): 471 raise OperationFailed("Operation failed or cancelled") 472 elif self._operation.should_do_final_get(): 473 self._response = update_cmd(initial_url) 474 self._operation.get_status_from_resource( 475 self._response) 476 477 def status(self): 478 """Returns the current status string. 479 480 :returns: The current status string 481 :rtype: str 482 """ 483 return self._operation.status 484 485 def result(self, timeout=None): 486 """Return the result of the long running operation, or 487 the result available after the specified timeout. 488 489 :returns: The deserialized resource of the long running operation, 490 if one is available. 491 :raises CloudError: Server problem with the query. 492 """ 493 self.wait(timeout) 494 return self._operation.resource 495 496 def wait(self, timeout=None): 497 """Wait on the long running operation for a specified length 498 of time. 499 500 :param int timeout: Perion of time to wait for the long running 501 operation to complete. 502 :raises CloudError: Server problem with the query. 503 """ 504 if self._thread is None: 505 return 506 self._thread.join(timeout=timeout) 507 try: 508 raise self._exception 509 except TypeError: 510 pass 511 512 def done(self): 513 """Check status of the long running operation. 514 515 :returns: 'True' if the process has completed, else 'False'. 516 """ 517 return self._thread is None or not self._thread.is_alive() 518 519 def add_done_callback(self, func): 520 """Add callback function to be run once the long running operation 521 has completed - regardless of the status of the operation. 522 523 :param callable func: Callback function that takes at least one 524 argument, a completed LongRunningOperation. 525 :raises: ValueError if the long running operation has already 526 completed. 527 """ 528 if self._done is None or self._done.is_set(): 529 raise ValueError("Process is complete.") 530 self._callbacks.append(func) 531 532 def remove_done_callback(self, func): 533 """Remove a callback from the long running operation. 534 535 :param callable func: The function to be removed from the callbacks. 536 :raises: ValueError if the long running operation has already 537 completed. 538 """ 539 if self._done is None or self._done.is_set(): 540 raise ValueError("Process is complete.") 541 self._callbacks = [c for c in self._callbacks if c != func] 542