1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5# pylint: disable=too-many-lines 6 7import base64 8import binascii 9import getpass 10import json 11import logging 12import os 13import platform 14import re 15import ssl 16import sys 17from urllib.request import urlopen 18 19from knack.log import get_logger 20from knack.util import CLIError, to_snake_case 21 22logger = get_logger(__name__) 23 24CLI_PACKAGE_NAME = 'azure-cli' 25COMPONENT_PREFIX = 'azure-cli-' 26 27SSLERROR_TEMPLATE = ('Certificate verification failed. This typically happens when using Azure CLI behind a proxy ' 28 'that intercepts traffic with a self-signed certificate. ' 29 # pylint: disable=line-too-long 30 'Please add this certificate to the trusted CA bundle. More info: https://docs.microsoft.com/cli/azure/use-cli-effectively#work-behind-a-proxy.') 31 32QUERY_REFERENCE = ("To learn more about --query, please visit: " 33 "'https://docs.microsoft.com/cli/azure/query-azure-cli'") 34 35 36_PROXYID_RE = re.compile( 37 '(?i)/subscriptions/(?P<subscription>[^/]*)(/resourceGroups/(?P<resource_group>[^/]*))?' 38 '(/providers/(?P<namespace>[^/]*)/(?P<type>[^/]*)/(?P<name>[^/]*)(?P<children>.*))?') 39 40_CHILDREN_RE = re.compile('(?i)/(?P<child_type>[^/]*)/(?P<child_name>[^/]*)') 41 42_VERSION_CHECK_TIME = 'check_time' 43_VERSION_UPDATE_TIME = 'update_time' 44 45# A list of reserved names that cannot be used as admin username of VM 46DISALLOWED_USER_NAMES = [ 47 "administrator", "admin", "user", "user1", "test", "user2", 48 "test1", "user3", "admin1", "1", "123", "a", "actuser", "adm", 49 "admin2", "aspnet", "backup", "console", "guest", 50 "owner", "root", "server", "sql", "support", "support_388945a0", 51 "sys", "test2", "test3", "user4", "user5" 52] 53 54 55def handle_exception(ex): # pylint: disable=too-many-locals, too-many-statements, too-many-branches 56 # For error code, follow guidelines at https://docs.python.org/2/library/sys.html#sys.exit, 57 from jmespath.exceptions import JMESPathError 58 from msrestazure.azure_exceptions import CloudError 59 from msrest.exceptions import HttpOperationError, ValidationError, ClientRequestError 60 from azure.common import AzureException 61 from azure.core.exceptions import AzureError 62 from requests.exceptions import SSLError, HTTPError 63 import azure.cli.core.azclierror as azclierror 64 import traceback 65 66 logger.debug("azure.cli.core.util.handle_exception is called with an exception:") 67 # Print the traceback and exception message 68 logger.debug(traceback.format_exc()) 69 70 error_msg = getattr(ex, 'message', str(ex)) 71 exit_code = 1 72 73 if isinstance(ex, azclierror.AzCLIError): 74 az_error = ex 75 76 elif isinstance(ex, JMESPathError): 77 error_msg = "Invalid jmespath query supplied for `--query`: {}".format(error_msg) 78 az_error = azclierror.InvalidArgumentValueError(error_msg) 79 az_error.set_recommendation(QUERY_REFERENCE) 80 81 elif isinstance(ex, SSLError): 82 az_error = azclierror.AzureConnectionError(error_msg) 83 az_error.set_recommendation(SSLERROR_TEMPLATE) 84 85 elif isinstance(ex, CloudError): 86 if extract_common_error_message(ex): 87 error_msg = extract_common_error_message(ex) 88 status_code = str(getattr(ex, 'status_code', 'Unknown Code')) 89 AzCLIErrorType = get_error_type_by_status_code(status_code) 90 az_error = AzCLIErrorType(error_msg) 91 92 elif isinstance(ex, ValidationError): 93 az_error = azclierror.ValidationError(error_msg) 94 95 elif isinstance(ex, CLIError): 96 # TODO: Fine-grained analysis here 97 az_error = azclierror.UnclassifiedUserFault(error_msg) 98 99 elif isinstance(ex, AzureError): 100 if extract_common_error_message(ex): 101 error_msg = extract_common_error_message(ex) 102 AzCLIErrorType = get_error_type_by_azure_error(ex) 103 az_error = AzCLIErrorType(error_msg) 104 105 elif isinstance(ex, AzureException): 106 if is_azure_connection_error(error_msg): 107 az_error = azclierror.AzureConnectionError(error_msg) 108 else: 109 # TODO: Fine-grained analysis here for Unknown error 110 az_error = azclierror.UnknownError(error_msg) 111 112 elif isinstance(ex, ClientRequestError): 113 if is_azure_connection_error(error_msg): 114 az_error = azclierror.AzureConnectionError(error_msg) 115 elif isinstance(ex.inner_exception, SSLError): 116 # When msrest encounters SSLError, msrest wraps SSLError in ClientRequestError 117 az_error = azclierror.AzureConnectionError(error_msg) 118 az_error.set_recommendation(SSLERROR_TEMPLATE) 119 else: 120 az_error = azclierror.ClientRequestError(error_msg) 121 122 elif isinstance(ex, HttpOperationError): 123 message, _ = extract_http_operation_error(ex) 124 if message: 125 error_msg = message 126 status_code = str(getattr(ex.response, 'status_code', 'Unknown Code')) 127 AzCLIErrorType = get_error_type_by_status_code(status_code) 128 az_error = AzCLIErrorType(error_msg) 129 130 elif isinstance(ex, HTTPError): 131 status_code = str(getattr(ex.response, 'status_code', 'Unknown Code')) 132 AzCLIErrorType = get_error_type_by_status_code(status_code) 133 az_error = AzCLIErrorType(error_msg) 134 135 elif isinstance(ex, KeyboardInterrupt): 136 error_msg = 'Keyboard interrupt is captured.' 137 az_error = azclierror.ManualInterrupt(error_msg) 138 139 else: 140 error_msg = "The command failed with an unexpected error. Here is the traceback:" 141 az_error = azclierror.CLIInternalError(error_msg) 142 az_error.set_exception_trace(ex) 143 az_error.set_recommendation("To open an issue, please run: 'az feedback'") 144 145 if isinstance(az_error, azclierror.ResourceNotFoundError): 146 exit_code = 3 147 148 az_error.print_error() 149 az_error.send_telemetry() 150 151 return exit_code 152 153 154def extract_common_error_message(ex): 155 error_msg = None 156 try: 157 error_msg = ex.args[0] 158 for detail in ex.args[0].error.details: 159 error_msg += ('\n' + detail) 160 except Exception: # pylint: disable=broad-except 161 pass 162 return error_msg 163 164 165def extract_http_operation_error(ex): 166 error_msg = None 167 status_code = 'Unknown Code' 168 try: 169 response = json.loads(ex.response.text) 170 if isinstance(response, str): 171 error = response 172 else: 173 error = response.get('error', response.get('Error', None)) 174 # ARM should use ODATA v4. So should try this first. 175 # http://docs.oasis-open.org/odata/odata-json-format/v4.0/os/odata-json-format-v4.0-os.html#_Toc372793091 176 if isinstance(error, dict): 177 status_code = error.get('code', error.get('Code', 'Unknown Code')) 178 message = error.get('message', error.get('Message', ex)) 179 error_msg = "{}: {}".format(status_code, message) 180 else: 181 error_msg = error 182 except (ValueError, KeyError): 183 pass 184 return error_msg, status_code 185 186 187def get_error_type_by_azure_error(ex): 188 import azure.core.exceptions as exceptions 189 import azure.cli.core.azclierror as azclierror 190 191 if isinstance(ex, exceptions.HttpResponseError): 192 status_code = str(ex.status_code) 193 return get_error_type_by_status_code(status_code) 194 if isinstance(ex, exceptions.ResourceNotFoundError): 195 return azclierror.ResourceNotFoundError 196 if isinstance(ex, exceptions.ServiceRequestError): 197 return azclierror.ClientRequestError 198 if isinstance(ex, exceptions.ServiceRequestTimeoutError): 199 return azclierror.AzureConnectionError 200 if isinstance(ex, (exceptions.ServiceResponseError, exceptions.ServiceResponseTimeoutError)): 201 return azclierror.AzureResponseError 202 203 return azclierror.UnknownError 204 205 206# pylint: disable=too-many-return-statements 207def get_error_type_by_status_code(status_code): 208 import azure.cli.core.azclierror as azclierror 209 210 if status_code == '400': 211 return azclierror.BadRequestError 212 if status_code == '401': 213 return azclierror.UnauthorizedError 214 if status_code == '403': 215 return azclierror.ForbiddenError 216 if status_code == '404': 217 return azclierror.ResourceNotFoundError 218 if status_code.startswith('4'): 219 return azclierror.UnclassifiedUserFault 220 if status_code.startswith('5'): 221 return azclierror.AzureInternalError 222 223 return azclierror.UnknownError 224 225 226def is_azure_connection_error(error_msg): 227 error_msg = error_msg.lower() 228 if 'connection error' in error_msg \ 229 or 'connection broken' in error_msg \ 230 or 'connection aborted' in error_msg: 231 return True 232 return False 233 234 235# pylint: disable=inconsistent-return-statements 236def empty_on_404(ex): 237 from msrestazure.azure_exceptions import CloudError 238 if isinstance(ex, CloudError) and ex.status_code == 404: 239 return None 240 raise ex 241 242 243def truncate_text(str_to_shorten, width=70, placeholder=' [...]'): 244 if width <= 0: 245 raise ValueError('width must be greater than 0.') 246 s_len = width - len(placeholder) 247 return str_to_shorten[:s_len] + (str_to_shorten[s_len:] and placeholder) 248 249 250def get_installed_cli_distributions(): 251 # Stop importing pkg_resources, because importing it is slow (~200ms). 252 # from pkg_resources import working_set 253 # return [d for d in list(working_set) if d.key == CLI_PACKAGE_NAME or d.key.startswith(COMPONENT_PREFIX)] 254 255 # Use the hard-coded version instead of querying all modules under site-packages. 256 from azure.cli.core import __version__ as azure_cli_core_version 257 from azure.cli.telemetry import __version__ as azure_cli_telemetry_version 258 259 class VersionItem: # pylint: disable=too-few-public-methods 260 """A mock of pkg_resources.EggInfoDistribution to maintain backward compatibility.""" 261 def __init__(self, key, version): 262 self.key = key 263 self.version = version 264 265 return [ 266 VersionItem('azure-cli', azure_cli_core_version), 267 VersionItem('azure-cli-core', azure_cli_core_version), 268 VersionItem('azure-cli-telemetry', azure_cli_telemetry_version) 269 ] 270 271 272def get_latest_from_github(package_path='azure-cli'): 273 try: 274 import requests 275 git_url = "https://raw.githubusercontent.com/Azure/azure-cli/main/src/{}/setup.py".format(package_path) 276 response = requests.get(git_url, timeout=10) 277 if response.status_code != 200: 278 logger.info("Failed to fetch the latest version from '%s' with status code '%s' and reason '%s'", 279 git_url, response.status_code, response.reason) 280 return None 281 for line in response.iter_lines(): 282 txt = line.decode('utf-8', errors='ignore') 283 if txt.startswith('VERSION'): 284 match = re.search(r'VERSION = \"(.*)\"$', txt) 285 if match: 286 return match.group(1) 287 except Exception as ex: # pylint: disable=broad-except 288 logger.info("Failed to get the latest version from '%s'. %s", git_url, str(ex)) 289 return None 290 291 292def _update_latest_from_github(versions): 293 if not check_connectivity(max_retries=0): 294 return versions, False 295 success = True 296 for pkg in ['azure-cli-core', 'azure-cli-telemetry']: 297 version = get_latest_from_github(pkg) 298 if not version: 299 success = False 300 else: 301 versions[pkg.replace(COMPONENT_PREFIX, '')]['pypi'] = version 302 try: 303 versions[CLI_PACKAGE_NAME]['pypi'] = versions['core']['pypi'] 304 except KeyError: 305 pass 306 return versions, success 307 308 309def get_cached_latest_versions(versions=None): 310 """ Get the latest versions from a cached file""" 311 import datetime 312 from azure.cli.core._session import VERSIONS 313 314 if not versions: 315 versions = _get_local_versions() 316 317 if VERSIONS[_VERSION_UPDATE_TIME]: 318 version_update_time = datetime.datetime.strptime(VERSIONS[_VERSION_UPDATE_TIME], '%Y-%m-%d %H:%M:%S.%f') 319 if datetime.datetime.now() < version_update_time + datetime.timedelta(days=1): 320 cache_versions = VERSIONS['versions'] 321 if cache_versions and cache_versions['azure-cli']['local'] == versions['azure-cli']['local']: 322 return cache_versions.copy(), True 323 324 versions, success = _update_latest_from_github(versions) 325 VERSIONS['versions'] = versions 326 VERSIONS[_VERSION_UPDATE_TIME] = str(datetime.datetime.now()) 327 return versions.copy(), success 328 329 330def _get_local_versions(): 331 # get locally installed versions 332 versions = {} 333 for dist in get_installed_cli_distributions(): 334 if dist.key == CLI_PACKAGE_NAME: 335 versions[CLI_PACKAGE_NAME] = {'local': dist.version} 336 elif dist.key.startswith(COMPONENT_PREFIX): 337 comp_name = dist.key.replace(COMPONENT_PREFIX, '') 338 versions[comp_name] = {'local': dist.version} 339 return versions 340 341 342def get_az_version_string(use_cache=False): # pylint: disable=too-many-statements 343 from azure.cli.core.extension import get_extensions, EXTENSIONS_DIR, DEV_EXTENSION_SOURCES, EXTENSIONS_SYS_DIR 344 import io 345 output = io.StringIO() 346 versions = _get_local_versions() 347 348 # get the versions from pypi 349 versions, success = get_cached_latest_versions(versions) if use_cache else _update_latest_from_github(versions) 350 updates_available_components = [] 351 352 def _print(val=''): 353 print(val, file=output) 354 355 def _get_version_string(name, version_dict): 356 from packaging.version import parse # pylint: disable=import-error,no-name-in-module 357 local = version_dict['local'] 358 pypi = version_dict.get('pypi', None) 359 if pypi and parse(pypi) > parse(local): 360 return name.ljust(25) + local.rjust(15) + ' *' 361 return name.ljust(25) + local.rjust(15) 362 363 ver_string = _get_version_string(CLI_PACKAGE_NAME, versions.pop(CLI_PACKAGE_NAME)) 364 if '*' in ver_string: 365 updates_available_components.append(CLI_PACKAGE_NAME) 366 _print(ver_string) 367 _print() 368 for name in sorted(versions.keys()): 369 ver_string = _get_version_string(name, versions.pop(name)) 370 if '*' in ver_string: 371 updates_available_components.append(name) 372 _print(ver_string) 373 _print() 374 extensions = get_extensions() 375 if extensions: 376 _print('Extensions:') 377 for ext in extensions: 378 if ext.ext_type == 'dev': 379 _print(ext.name.ljust(20) + (ext.version or 'Unknown').rjust(20) + ' (dev) ' + ext.path) 380 else: 381 _print(ext.name.ljust(20) + (ext.version or 'Unknown').rjust(20)) 382 _print() 383 _print("Python location '{}'".format(os.path.abspath(sys.executable))) 384 _print("Extensions directory '{}'".format(EXTENSIONS_DIR)) 385 if os.path.isdir(EXTENSIONS_SYS_DIR) and os.listdir(EXTENSIONS_SYS_DIR): 386 _print("Extensions system directory '{}'".format(EXTENSIONS_SYS_DIR)) 387 if DEV_EXTENSION_SOURCES: 388 _print("Development extension sources:") 389 for source in DEV_EXTENSION_SOURCES: 390 _print(' {}'.format(source)) 391 _print() 392 _print('Python ({}) {}'.format(platform.system(), sys.version)) 393 _print() 394 _print('Legal docs and information: aka.ms/AzureCliLegal') 395 _print() 396 version_string = output.getvalue() 397 398 # if unable to query PyPI, use sentinel value to flag that 399 # we couldn't check for updates 400 if not success: 401 updates_available_components = None 402 return version_string, updates_available_components 403 404 405def get_az_version_json(): 406 from azure.cli.core.extension import get_extensions 407 versions = {'extensions': {}} 408 409 for dist in get_installed_cli_distributions(): 410 versions[dist.key] = dist.version 411 extensions = get_extensions() 412 if extensions: 413 for ext in extensions: 414 versions['extensions'][ext.name] = ext.version or 'Unknown' 415 return versions 416 417 418def show_updates_available(new_line_before=False, new_line_after=False): 419 from azure.cli.core._session import VERSIONS 420 import datetime 421 422 if VERSIONS[_VERSION_CHECK_TIME]: 423 version_check_time = datetime.datetime.strptime(VERSIONS[_VERSION_CHECK_TIME], '%Y-%m-%d %H:%M:%S.%f') 424 if datetime.datetime.now() < version_check_time + datetime.timedelta(days=7): 425 return 426 427 _, updates_available_components = get_az_version_string(use_cache=True) 428 if updates_available_components: 429 if new_line_before: 430 logger.warning("") 431 show_updates(updates_available_components, only_show_when_updates_available=True) 432 if new_line_after: 433 logger.warning("") 434 VERSIONS[_VERSION_CHECK_TIME] = str(datetime.datetime.now()) 435 436 437def show_updates(updates_available_components, only_show_when_updates_available=False): 438 if updates_available_components is None: 439 if not only_show_when_updates_available: 440 logger.warning('Unable to check if your CLI is up-to-date. Check your internet connection.') 441 elif updates_available_components: # pylint: disable=too-many-nested-blocks 442 if in_cloud_console(): 443 warning_msg = 'You have %i updates available. They will be updated with the next build of Cloud Shell.' 444 else: 445 warning_msg = "You have %i updates available." 446 if CLI_PACKAGE_NAME in updates_available_components: 447 warning_msg = "{} Consider updating your CLI installation with 'az upgrade'".format(warning_msg) 448 logger.warning(warning_msg, len(updates_available_components)) 449 elif not only_show_when_updates_available: 450 print('Your CLI is up-to-date.') 451 452 453def get_json_object(json_string): 454 """ Loads a JSON string as an object and converts all keys to snake case """ 455 456 def _convert_to_snake_case(item): 457 if isinstance(item, dict): 458 new_item = {} 459 for key, val in item.items(): 460 new_item[to_snake_case(key)] = _convert_to_snake_case(val) 461 return new_item 462 if isinstance(item, list): 463 return [_convert_to_snake_case(x) for x in item] 464 return item 465 466 return _convert_to_snake_case(shell_safe_json_parse(json_string)) 467 468 469def get_file_json(file_path, throw_on_empty=True, preserve_order=False): 470 content = read_file_content(file_path) 471 if not content and not throw_on_empty: 472 return None 473 try: 474 return shell_safe_json_parse(content, preserve_order) 475 except CLIError as ex: 476 raise CLIError("Failed to parse {} with exception:\n {}".format(file_path, ex)) 477 478 479def read_file_content(file_path, allow_binary=False): 480 from codecs import open as codecs_open 481 # Note, always put 'utf-8-sig' first, so that BOM in WinOS won't cause trouble. 482 for encoding in ['utf-8-sig', 'utf-8', 'utf-16', 'utf-16le', 'utf-16be']: 483 try: 484 with codecs_open(file_path, encoding=encoding) as f: 485 logger.debug("attempting to read file %s as %s", file_path, encoding) 486 return f.read() 487 except (UnicodeError, UnicodeDecodeError): 488 pass 489 490 if allow_binary: 491 try: 492 with open(file_path, 'rb') as input_file: 493 logger.debug("attempting to read file %s as binary", file_path) 494 return base64.b64encode(input_file.read()).decode("utf-8") 495 except Exception: # pylint: disable=broad-except 496 pass 497 raise CLIError('Failed to decode file {} - unknown decoding'.format(file_path)) 498 499 500def shell_safe_json_parse(json_or_dict_string, preserve_order=False, strict=True): 501 """ Allows the passing of JSON or Python dictionary strings. This is needed because certain 502 JSON strings in CMD shell are not received in main's argv. This allows the user to specify 503 the alternative notation, which does not have this problem (but is technically not JSON). """ 504 try: 505 if not preserve_order: 506 return json.loads(json_or_dict_string, strict=strict) 507 from collections import OrderedDict 508 return json.loads(json_or_dict_string, object_pairs_hook=OrderedDict, strict=strict) 509 except ValueError as json_ex: 510 try: 511 import ast 512 return ast.literal_eval(json_or_dict_string) 513 except Exception as ex: 514 logger.debug(ex) # log the exception which could be a python dict parsing error. 515 516 # Echo the JSON received by CLI 517 msg = "Failed to parse JSON: {}\nError detail: {}".format(json_or_dict_string, json_ex) 518 519 # Recommendation for all shells 520 from azure.cli.core.azclierror import InvalidArgumentValueError 521 recommendation = "The JSON may have been parsed by the shell. See " \ 522 "https://docs.microsoft.com/cli/azure/use-cli-effectively#quoting-issues" 523 524 # Recommendation especially for PowerShell 525 parent_proc = get_parent_proc_name() 526 if parent_proc and parent_proc.lower() in ("powershell.exe", "pwsh.exe"): 527 recommendation += "\nPowerShell requires additional quoting rules. See " \ 528 "https://github.com/Azure/azure-cli/blob/dev/doc/quoting-issues-with-powershell.md" 529 530 # Raise from json_ex error which is more likely to be the original error 531 raise InvalidArgumentValueError(msg, recommendation=recommendation) from json_ex 532 533 534def b64encode(s): 535 """ 536 Encodes a string to base64 on 2.x and 3.x 537 :param str s: latin_1 encoded string 538 :return: base64 encoded string 539 :rtype: str 540 """ 541 encoded = base64.b64encode(s.encode("latin-1")) 542 return encoded if encoded is str else encoded.decode('latin-1') 543 544 545def b64_to_hex(s): 546 """ 547 Decodes a string to base64 on 2.x and 3.x 548 :param str s: base64 encoded string 549 :return: uppercase hex string 550 :rtype: str 551 """ 552 decoded = base64.b64decode(s) 553 hex_data = binascii.hexlify(decoded).upper() 554 if isinstance(hex_data, bytes): 555 return str(hex_data.decode("utf-8")) 556 return hex_data 557 558 559def random_string(length=16, force_lower=False, digits_only=False): 560 from string import ascii_letters, digits, ascii_lowercase 561 from random import choice 562 choice_set = digits 563 if not digits_only: 564 choice_set += ascii_lowercase if force_lower else ascii_letters 565 return ''.join([choice(choice_set) for _ in range(length)]) 566 567 568def hash_string(value, length=16, force_lower=False): 569 """ Generate a deterministic hashed string.""" 570 import hashlib 571 m = hashlib.sha256() 572 try: 573 m.update(value) 574 except TypeError: 575 m.update(value.encode()) 576 digest = m.hexdigest() 577 digest = digest.lower() if force_lower else digest 578 while len(digest) < length: 579 digest = digest + digest 580 return digest[:length] 581 582 583def in_cloud_console(): 584 return os.environ.get('ACC_CLOUD', None) 585 586 587def get_arg_list(op): 588 import inspect 589 590 try: 591 # only supported in python3 - falling back to argspec if not available 592 sig = inspect.signature(op) 593 return sig.parameters 594 except AttributeError: 595 sig = inspect.getargspec(op) # pylint: disable=deprecated-method 596 return sig.args 597 598 599def is_track2(client_class): 600 """ IS this client a autorestv3/track2 one?. 601 Could be refined later if necessary. 602 """ 603 from inspect import getfullargspec as get_arg_spec 604 args = get_arg_spec(client_class.__init__).args 605 return "credential" in args 606 607 608DISABLE_VERIFY_VARIABLE_NAME = "AZURE_CLI_DISABLE_CONNECTION_VERIFICATION" 609 610 611def should_disable_connection_verify(): 612 return bool(os.environ.get(DISABLE_VERIFY_VARIABLE_NAME)) 613 614 615def poller_classes(): 616 from msrestazure.azure_operation import AzureOperationPoller 617 from msrest.polling.poller import LROPoller 618 from azure.core.polling import LROPoller as AzureCoreLROPoller 619 return (AzureOperationPoller, LROPoller, AzureCoreLROPoller) 620 621 622def augment_no_wait_handler_args(no_wait_enabled, handler, handler_args): 623 """ Populates handler_args with the appropriate args for no wait """ 624 h_args = get_arg_list(handler) 625 if 'no_wait' in h_args: 626 handler_args['no_wait'] = no_wait_enabled 627 if 'raw' in h_args and no_wait_enabled: 628 # support autorest 2 629 handler_args['raw'] = True 630 if 'polling' in h_args and no_wait_enabled: 631 # support autorest 3 632 handler_args['polling'] = False 633 634 # Support track2 SDK. 635 # In track2 SDK, there is no parameter 'polling' in SDK, but just use '**kwargs'. 636 # So we check the name of the operation to see if it's a long running operation. 637 # The name of long running operation in SDK is like 'begin_xxx_xxx'. 638 op_name = handler.__name__ 639 if op_name and op_name.startswith('begin_') and no_wait_enabled: 640 handler_args['polling'] = False 641 642 643def sdk_no_wait(no_wait, func, *args, **kwargs): 644 if no_wait: 645 kwargs.update({'polling': False}) 646 return func(*args, **kwargs) 647 648 649def open_page_in_browser(url): 650 import subprocess 651 import webbrowser 652 platform_name, _ = _get_platform_info() 653 654 if is_wsl(): # windows 10 linux subsystem 655 try: 656 # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe 657 # Ampersand (&) should be quoted 658 return subprocess.Popen( 659 ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(url)]).wait() 660 except OSError: # WSL might be too old # FileNotFoundError introduced in Python 3 661 pass 662 elif platform_name == 'darwin': 663 # handle 2 things: 664 # a. On OSX sierra, 'python -m webbrowser -t <url>' emits out "execution error: <url> doesn't 665 # understand the "open location" message" 666 # b. Python 2.x can't sniff out the default browser 667 return subprocess.Popen(['open', url]) 668 try: 669 return webbrowser.open(url, new=2) # 2 means: open in a new tab, if possible 670 except TypeError: # See https://bugs.python.org/msg322439 671 return webbrowser.open(url, new=2) 672 673 674def _get_platform_info(): 675 uname = platform.uname() 676 # python 2, `platform.uname()` returns: tuple(system, node, release, version, machine, processor) 677 platform_name = getattr(uname, 'system', None) or uname[0] 678 release = getattr(uname, 'release', None) or uname[2] 679 return platform_name.lower(), release.lower() 680 681 682def is_wsl(): 683 platform_name, release = _get_platform_info() 684 # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364 685 # Run `uname -a` to get 'release' without python 686 # - WSL 1: '4.4.0-19041-Microsoft' 687 # - WSL 2: '4.19.128-microsoft-standard' 688 return platform_name == 'linux' and 'microsoft' in release 689 690 691def is_windows(): 692 platform_name, _ = _get_platform_info() 693 return platform_name == 'windows' 694 695 696def can_launch_browser(): 697 import webbrowser 698 platform_name, _ = _get_platform_info() 699 if is_wsl() or platform_name != 'linux': 700 return True 701 # per https://unix.stackexchange.com/questions/46305/is-there-a-way-to-retrieve-the-name-of-the-desktop-environment 702 # and https://unix.stackexchange.com/questions/193827/what-is-display-0 703 # we can check a few env vars 704 gui_env_vars = ['DESKTOP_SESSION', 'XDG_CURRENT_DESKTOP', 'DISPLAY'] 705 result = True 706 if platform_name == 'linux': 707 if any(os.getenv(v) for v in gui_env_vars): 708 try: 709 default_browser = webbrowser.get() 710 if getattr(default_browser, 'name', None) == 'www-browser': # text browser won't work 711 result = False 712 except webbrowser.Error: 713 result = False 714 else: 715 result = False 716 717 return result 718 719 720def get_command_type_kwarg(custom_command=False): 721 return 'custom_command_type' if custom_command else 'command_type' 722 723 724def reload_module(module): 725 # reloading the imported module to update 726 if module in sys.modules: 727 from importlib import reload 728 reload(sys.modules[module]) 729 730 731def get_default_admin_username(): 732 try: 733 username = getpass.getuser() 734 except KeyError: 735 username = None 736 if username is None or username.lower() in DISALLOWED_USER_NAMES: 737 logger.warning('Default username %s is a reserved username. Use azureuser instead.', username) 738 username = 'azureuser' 739 return username 740 741 742def _find_child(parent, *args, **kwargs): 743 # tuple structure (path, key, dest) 744 path = kwargs.get('path', None) 745 key_path = kwargs.get('key_path', None) 746 comps = zip(path.split('.'), key_path.split('.'), args) 747 current = parent 748 for path, key, val in comps: 749 current = getattr(current, path, None) 750 if current is None: 751 raise CLIError("collection '{}' not found".format(path)) 752 match = next((x for x in current if getattr(x, key).lower() == val.lower()), None) 753 if match is None: 754 raise CLIError("item '{}' not found in {}".format(val, path)) 755 current = match 756 return current 757 758 759def find_child_item(parent, *args, **kwargs): 760 path = kwargs.get('path', '') 761 key_path = kwargs.get('key_path', '') 762 if len(args) != len(path.split('.')) != len(key_path.split('.')): 763 raise CLIError('command authoring error: args, path and key_path must have equal number of components.') 764 return _find_child(parent, *args, path=path, key_path=key_path) 765 766 767def find_child_collection(parent, *args, **kwargs): 768 path = kwargs.get('path', '') 769 key_path = kwargs.get('key_path', '') 770 arg_len = len(args) 771 key_len = len(key_path.split('.')) 772 path_len = len(path.split('.')) 773 if arg_len != key_len and path_len != arg_len + 1: 774 raise CLIError('command authoring error: args and key_path must have equal number of components, and ' 775 'path must have one extra component (the path to the collection of interest.') 776 parent = _find_child(parent, *args, path=path, key_path=key_path) 777 collection_path = path.split('.')[-1] 778 collection = getattr(parent, collection_path, None) 779 if collection is None: 780 raise CLIError("collection '{}' not found".format(collection_path)) 781 return collection 782 783 784def check_connectivity(url='https://azure.microsoft.com', max_retries=5, timeout=1): 785 import requests 786 import timeit 787 start = timeit.default_timer() 788 success = None 789 try: 790 with requests.Session() as s: 791 s.mount(url, requests.adapters.HTTPAdapter(max_retries=max_retries)) 792 s.head(url, timeout=timeout) 793 success = True 794 except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as ex: 795 logger.info('Connectivity problem detected.') 796 logger.debug(ex) 797 success = False 798 stop = timeit.default_timer() 799 logger.debug('Connectivity check: %s sec', stop - start) 800 return success 801 802 803def send_raw_request(cli_ctx, method, url, headers=None, uri_parameters=None, # pylint: disable=too-many-locals,too-many-branches,too-many-statements 804 body=None, skip_authorization_header=False, resource=None, output_file=None, 805 generated_client_request_id_name='x-ms-client-request-id'): 806 import uuid 807 from requests import Session, Request 808 from requests.structures import CaseInsensitiveDict 809 810 result = CaseInsensitiveDict() 811 for s in headers or []: 812 try: 813 temp = shell_safe_json_parse(s) 814 result.update(temp) 815 except CLIError: 816 key, value = s.split('=', 1) 817 result[key] = value 818 headers = result 819 820 # If Authorization header is already provided, don't bother with the token 821 if 'Authorization' in headers: 822 skip_authorization_header = True 823 824 # Handle User-Agent 825 agents = [get_az_rest_user_agent()] 826 827 # Borrow AZURE_HTTP_USER_AGENT from msrest 828 # https://github.com/Azure/msrest-for-python/blob/4cc8bc84e96036f03b34716466230fb257e27b36/msrest/pipeline/universal.py#L70 829 _ENV_ADDITIONAL_USER_AGENT = 'AZURE_HTTP_USER_AGENT' 830 if _ENV_ADDITIONAL_USER_AGENT in os.environ: 831 agents.append(os.environ[_ENV_ADDITIONAL_USER_AGENT]) 832 833 # Custom User-Agent provided as command argument 834 if 'User-Agent' in headers: 835 agents.append(headers['User-Agent']) 836 headers['User-Agent'] = ' '.join(agents) 837 838 if generated_client_request_id_name: 839 headers[generated_client_request_id_name] = str(uuid.uuid4()) 840 841 # try to figure out the correct content type 842 if body: 843 try: 844 _ = shell_safe_json_parse(body) 845 if 'Content-Type' not in headers: 846 headers['Content-Type'] = 'application/json' 847 except Exception: # pylint: disable=broad-except 848 pass 849 850 # add telemetry 851 headers['CommandName'] = cli_ctx.data['command'] 852 if cli_ctx.data.get('safe_params'): 853 headers['ParameterSetName'] = ' '.join(cli_ctx.data['safe_params']) 854 855 result = {} 856 for s in uri_parameters or []: 857 try: 858 temp = shell_safe_json_parse(s) 859 result.update(temp) 860 except CLIError: 861 key, value = s.split('=', 1) 862 result[key] = value 863 uri_parameters = result or None 864 865 endpoints = cli_ctx.cloud.endpoints 866 # If url is an ARM resource ID, like /subscriptions/xxx/resourcegroups/xxx?api-version=2019-07-01, 867 # default to Azure Resource Manager. 868 # https://management.azure.com + /subscriptions/xxx/resourcegroups/xxx?api-version=2019-07-01 869 if '://' not in url: 870 url = endpoints.resource_manager.rstrip('/') + url 871 872 # Replace common tokens with real values. It is for smooth experience if users copy and paste the url from 873 # Azure Rest API doc 874 from azure.cli.core._profile import Profile 875 profile = Profile(cli_ctx=cli_ctx) 876 if '{subscriptionId}' in url: 877 url = url.replace('{subscriptionId}', cli_ctx.data['subscription_id'] or profile.get_subscription_id()) 878 879 # Prepare the Bearer token for `Authorization` header 880 if not skip_authorization_header and url.lower().startswith('https://'): 881 # Prepare `resource` for `get_raw_token` 882 if not resource: 883 # If url starts with ARM endpoint, like `https://management.azure.com/`, 884 # use `active_directory_resource_id` for resource, like `https://management.core.windows.net/`. 885 # This follows the same behavior as `azure.cli.core.commands.client_factory._get_mgmt_service_client` 886 if url.lower().startswith(endpoints.resource_manager.rstrip('/')): 887 resource = endpoints.active_directory_resource_id 888 else: 889 from azure.cli.core.cloud import CloudEndpointNotSetException 890 for p in [x for x in dir(endpoints) if not x.startswith('_')]: 891 try: 892 value = getattr(endpoints, p) 893 except CloudEndpointNotSetException: 894 continue 895 if isinstance(value, str) and url.lower().startswith(value.lower()): 896 resource = value 897 break 898 if resource: 899 # Prepare `subscription` for `get_raw_token` 900 # If this is an ARM request, try to extract subscription ID from the URL. 901 # But there are APIs which don't require subscription ID, like /subscriptions, /tenants 902 # TODO: In the future when multi-tenant subscription is supported, we won't be able to uniquely identify 903 # the token from subscription anymore. 904 token_subscription = None 905 if url.lower().startswith(endpoints.resource_manager.rstrip('/')): 906 token_subscription = _extract_subscription_id(url) 907 if token_subscription: 908 logger.debug('Retrieving token for resource %s, subscription %s', resource, token_subscription) 909 token_info, _, _ = profile.get_raw_token(resource, subscription=token_subscription) 910 else: 911 logger.debug('Retrieving token for resource %s', resource) 912 token_info, _, _ = profile.get_raw_token(resource) 913 token_type, token, _ = token_info 914 headers = headers or {} 915 headers['Authorization'] = '{} {}'.format(token_type, token) 916 else: 917 logger.warning("Can't derive appropriate Azure AD resource from --url to acquire an access token. " 918 "If access token is required, use --resource to specify the resource") 919 920 # https://requests.readthedocs.io/en/latest/user/advanced/#prepared-requests 921 s = Session() 922 req = Request(method=method, url=url, headers=headers, params=uri_parameters, data=body) 923 prepped = s.prepare_request(req) 924 925 # Merge environment settings into session 926 settings = s.merge_environment_settings(prepped.url, {}, None, not should_disable_connection_verify(), None) 927 _log_request(prepped) 928 r = s.send(prepped, **settings) 929 _log_response(r) 930 931 if not r.ok: 932 reason = r.reason 933 if r.text: 934 reason += '({})'.format(r.text) 935 raise CLIError(reason) 936 if output_file: 937 with open(output_file, 'wb') as fd: 938 for chunk in r.iter_content(chunk_size=128): 939 fd.write(chunk) 940 return r 941 942 943def _extract_subscription_id(url): 944 """Extract the subscription ID from an ARM request URL.""" 945 subscription_regex = '/subscriptions/([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})' 946 match = re.search(subscription_regex, url, re.IGNORECASE) 947 if match: 948 subscription_id = match.groups()[0] 949 logger.debug('Found subscription ID %s in the URL %s', subscription_id, url) 950 return subscription_id 951 logger.debug('No subscription ID specified in the URL %s', url) 952 return None 953 954 955def _log_request(request): 956 """Log a client request. Copied from msrest 957 https://github.com/Azure/msrest-for-python/blob/3653d29fc44da408898b07c710290a83d196b777/msrest/http_logger.py#L39 958 """ 959 if not logger.isEnabledFor(logging.DEBUG): 960 return 961 962 try: 963 logger.info("Request URL: %r", request.url) 964 logger.info("Request method: %r", request.method) 965 logger.info("Request headers:") 966 for header, value in request.headers.items(): 967 if header.lower() == 'authorization': 968 # Trim at least half of the token but keep at most 20 characters 969 preserve_length = min(int(len(value) * 0.5), 20) 970 value = value[:preserve_length] + '...' 971 logger.info(" %r: %r", header, value) 972 logger.info("Request body:") 973 974 # We don't want to log the binary data of a file upload. 975 import types 976 if isinstance(request.body, types.GeneratorType): 977 logger.info("File upload") 978 else: 979 logger.info(str(request.body)) 980 except Exception as err: # pylint: disable=broad-except 981 logger.info("Failed to log request: %r", err) 982 983 984def _log_response(response, **kwargs): 985 """Log a server response. Copied from msrest 986 https://github.com/Azure/msrest-for-python/blob/3653d29fc44da408898b07c710290a83d196b777/msrest/http_logger.py#L68 987 """ 988 if not logger.isEnabledFor(logging.DEBUG): 989 return None 990 991 try: 992 logger.info("Response status: %r", response.status_code) 993 logger.info("Response headers:") 994 for res_header, value in response.headers.items(): 995 logger.info(" %r: %r", res_header, value) 996 997 # We don't want to log binary data if the response is a file. 998 logger.info("Response content:") 999 pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE) 1000 header = response.headers.get('content-disposition') 1001 1002 if header and pattern.match(header): 1003 filename = header.partition('=')[2] 1004 logger.info("File attachments: %s", filename) 1005 elif response.headers.get("content-type", "").endswith("octet-stream"): 1006 logger.info("Body contains binary data.") 1007 elif response.headers.get("content-type", "").startswith("image"): 1008 logger.info("Body contains image data.") 1009 else: 1010 if kwargs.get('stream', False): 1011 logger.info("Body is streamable") 1012 else: 1013 logger.info(response.content.decode("utf-8-sig")) 1014 return response 1015 except Exception as err: # pylint: disable=broad-except 1016 logger.info("Failed to log response: %s", repr(err)) 1017 return response 1018 1019 1020class ScopedConfig: 1021 1022 def __init__(self, cli_config, use_local_config=None): 1023 self.use_local_config = use_local_config 1024 if self.use_local_config is None: 1025 self.use_local_config = False 1026 self.cli_config = cli_config 1027 # here we use getattr/setattr to prepare the situation that "use_local_config" might not be available 1028 self.original_use_local_config = getattr(cli_config, 'use_local_config', None) 1029 1030 def __enter__(self): 1031 self.cli_config.use_local_config = self.use_local_config 1032 1033 def __exit__(self, exc_type, exc_val, exc_tb): 1034 setattr(self.cli_config, 'use_local_config', self.original_use_local_config) 1035 1036 1037ConfiguredDefaultSetter = ScopedConfig 1038 1039 1040def _ssl_context(): 1041 if sys.version_info < (3, 4) or (in_cloud_console() and platform.system() == 'Windows'): 1042 try: 1043 return ssl.SSLContext(ssl.PROTOCOL_TLS) # added in python 2.7.13 and 3.6 1044 except AttributeError: 1045 return ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1046 1047 return ssl.create_default_context() 1048 1049 1050def urlretrieve(url): 1051 req = urlopen(url, context=_ssl_context()) 1052 return req.read() 1053 1054 1055def parse_proxy_resource_id(rid): 1056 """Parses a resource_id into its various parts. 1057 1058 Return an empty dictionary, if invalid resource id. 1059 1060 :param rid: The resource id being parsed 1061 :type rid: str 1062 :returns: A dictionary with with following key/value pairs (if found): 1063 1064 - subscription: Subscription id 1065 - resource_group: Name of resource group 1066 - namespace: Namespace for the resource provider (i.e. Microsoft.Compute) 1067 - type: Type of the root resource (i.e. virtualMachines) 1068 - name: Name of the root resource 1069 - child_type_{level}: Type of the child resource of that level 1070 - child_name_{level}: Name of the child resource of that level 1071 - last_child_num: Level of the last child 1072 1073 :rtype: dict[str,str] 1074 """ 1075 if not rid: 1076 return {} 1077 match = _PROXYID_RE.match(rid) 1078 if match: 1079 result = match.groupdict() 1080 children = _CHILDREN_RE.finditer(result['children'] or '') 1081 count = None 1082 for count, child in enumerate(children): 1083 result.update({ 1084 key + '_%d' % (count + 1): group for key, group in child.groupdict().items()}) 1085 result['last_child_num'] = count + 1 if isinstance(count, int) else None 1086 result.pop('children', None) 1087 return {key: value for key, value in result.items() if value is not None} 1088 return None 1089 1090 1091def get_az_user_agent(): 1092 # Dynamically load the core version 1093 from azure.cli.core import __version__ as core_version 1094 1095 agents = ["AZURECLI/{}".format(core_version)] 1096 1097 from azure.cli.core._environment import _ENV_AZ_INSTALLER 1098 if _ENV_AZ_INSTALLER in os.environ: 1099 agents.append('({})'.format(os.environ[_ENV_AZ_INSTALLER])) 1100 1101 # msrest already has this 1102 # https://github.com/Azure/msrest-for-python/blob/4cc8bc84e96036f03b34716466230fb257e27b36/msrest/pipeline/universal.py#L70 1103 # if ENV_ADDITIONAL_USER_AGENT in os.environ: 1104 # agents.append(os.environ[ENV_ADDITIONAL_USER_AGENT]) 1105 1106 return ' '.join(agents) 1107 1108 1109def get_az_rest_user_agent(): 1110 """Get User-Agent for az rest calls""" 1111 1112 agents = ['python/{}'.format(platform.python_version()), 1113 '({})'.format(platform.platform()), 1114 get_az_user_agent() 1115 ] 1116 1117 return ' '.join(agents) 1118 1119 1120def user_confirmation(message, yes=False): 1121 if yes: 1122 return 1123 from knack.prompting import prompt_y_n, NoTTYException 1124 try: 1125 if not prompt_y_n(message): 1126 raise CLIError('Operation cancelled.') 1127 except NoTTYException: 1128 raise CLIError( 1129 'Unable to prompt for confirmation as no tty available. Use --yes.') 1130 1131 1132def get_linux_distro(): 1133 if platform.system() != 'Linux': 1134 return None, None 1135 1136 try: 1137 with open('/etc/os-release') as lines: 1138 tokens = [line.strip() for line in lines] 1139 except Exception: # pylint: disable=broad-except 1140 return None, None 1141 1142 release_info = {} 1143 for token in tokens: 1144 if '=' in token: 1145 k, v = token.split('=', 1) 1146 release_info[k.lower()] = v.strip('"') 1147 1148 return release_info.get('name', None), release_info.get('version_id', None) 1149 1150 1151def roughly_parse_command(args): 1152 # Roughly parse the command part: <az vm create> --name vm1 1153 # Similar to knack.invocation.CommandInvoker._rudimentary_get_command, but we don't need to bother with 1154 # positional args 1155 nouns = [] 1156 for arg in args: 1157 if arg and arg[0] != '-': 1158 nouns.append(arg) 1159 else: 1160 break 1161 return ' '.join(nouns).lower() 1162 1163 1164def is_guid(guid): 1165 import uuid 1166 try: 1167 uuid.UUID(guid) 1168 return True 1169 except ValueError: 1170 return False 1171 1172 1173def handle_version_update(): 1174 """Clean up information in local files that may be invalidated 1175 because of a version update of Azure CLI 1176 """ 1177 try: 1178 from azure.cli.core._session import VERSIONS 1179 from packaging.version import parse # pylint: disable=import-error,no-name-in-module 1180 from azure.cli.core import __version__ 1181 if not VERSIONS['versions']: 1182 get_cached_latest_versions() 1183 elif parse(VERSIONS['versions']['core']['local']) != parse(__version__): 1184 logger.debug("Azure CLI has been updated.") 1185 logger.debug("Clean up versions and refresh cloud endpoints information in local files.") 1186 VERSIONS['versions'] = {} 1187 VERSIONS['update_time'] = '' 1188 from azure.cli.core.cloud import refresh_known_clouds 1189 refresh_known_clouds() 1190 except Exception as ex: # pylint: disable=broad-except 1191 logger.warning(ex) 1192 1193 1194def resource_to_scopes(resource): 1195 """Convert the ADAL resource ID to MSAL scopes by appending the /.default suffix and return a list. 1196 For example: 1197 'https://management.core.windows.net/' -> ['https://management.core.windows.net//.default'] 1198 'https://managedhsm.azure.com' -> ['https://managedhsm.azure.com/.default'] 1199 1200 :param resource: The ADAL resource ID 1201 :return: A list of scopes 1202 """ 1203 # https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-permissions-and-consent#trailing-slash-and-default 1204 # We should not trim the trailing slash, like in https://management.azure.com/ 1205 # In other word, the trailing slash should be preserved and scope should be https://management.azure.com//.default 1206 scope = resource + '/.default' 1207 return [scope] 1208 1209 1210def scopes_to_resource(scopes): 1211 """Convert MSAL scopes to ADAL resource by stripping the /.default suffix and return a str. 1212 For example: 1213 ['https://management.core.windows.net//.default'] -> 'https://management.core.windows.net/' 1214 ['https://managedhsm.azure.com/.default'] -> 'https://managedhsm.azure.com' 1215 1216 :param scopes: The MSAL scopes. It can be a list or tuple of string 1217 :return: The ADAL resource 1218 :rtype: str 1219 """ 1220 scope = scopes[0] 1221 1222 suffixes = ['/.default', '/user_impersonation'] 1223 1224 for s in suffixes: 1225 if scope.endswith(s): 1226 return scope[:-len(s)] 1227 1228 return scope 1229 1230 1231def _get_parent_proc_name(): 1232 # Un-cached function to get parent process name. 1233 try: 1234 import psutil 1235 except ImportError as ex: 1236 logger.debug(ex) 1237 return None 1238 1239 try: 1240 parent = psutil.Process(os.getpid()).parent() 1241 1242 # On Windows, when CLI is run inside a virtual env, there will be 2 python.exe. 1243 if parent and parent.name().lower() == 'python.exe': 1244 parent = parent.parent() 1245 1246 if parent: 1247 # On Windows, powershell.exe launches cmd.exe to launch python.exe. 1248 grandparent = parent.parent() 1249 if grandparent: 1250 grandparent_name = grandparent.name().lower() 1251 if grandparent_name in ("powershell.exe", "pwsh.exe"): 1252 return grandparent.name() 1253 # if powershell.exe or pwsh.exe is not the grandparent, simply return the parent's name. 1254 return parent.name() 1255 except psutil.AccessDenied as ex: 1256 # Ignore due to https://github.com/giampaolo/psutil/issues/1980 1257 logger.debug(ex) 1258 return None 1259 1260 1261def get_parent_proc_name(): 1262 # This function wraps _get_parent_proc_name, as psutil calls are time-consuming, so use a 1263 # function-level cache to save the result. 1264 # NOTE: The return value may be None if getting parent proc name fails, so always remember to 1265 # check it first before calling string methods like lower(). 1266 if not hasattr(get_parent_proc_name, "return_value"): 1267 parent_proc_name = _get_parent_proc_name() 1268 setattr(get_parent_proc_name, "return_value", parent_proc_name) 1269 return getattr(get_parent_proc_name, "return_value") 1270 1271 1272def is_modern_terminal(): 1273 """In addition to knack.util.is_modern_terminal, detect Cloud Shell.""" 1274 import knack.util 1275 return knack.util.is_modern_terminal() or in_cloud_console() 1276 1277 1278def rmtree_with_retry(path): 1279 # A workaround for https://bugs.python.org/issue33240 1280 # Retry shutil.rmtree several times, but even if it fails after several retries, don't block the command execution. 1281 retry_num = 3 1282 import time 1283 while True: 1284 try: 1285 import shutil 1286 shutil.rmtree(path) 1287 return 1288 except OSError as err: 1289 if retry_num > 0: 1290 logger.warning("Failed to delete '%s': %s. Retrying ...", path, err) 1291 retry_num -= 1 1292 time.sleep(1) 1293 else: 1294 logger.warning("Failed to delete '%s': %s. You may try to delete it manually.", path, err) 1295 break 1296