1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5# pylint: disable=too-many-lines
6
7import base64
8import binascii
9import getpass
10import json
11import logging
12import os
13import platform
14import re
15import ssl
16import sys
17from urllib.request import urlopen
18
19from knack.log import get_logger
20from knack.util import CLIError, to_snake_case
21
22logger = get_logger(__name__)
23
24CLI_PACKAGE_NAME = 'azure-cli'
25COMPONENT_PREFIX = 'azure-cli-'
26
27SSLERROR_TEMPLATE = ('Certificate verification failed. This typically happens when using Azure CLI behind a proxy '
28                     'that intercepts traffic with a self-signed certificate. '
29                     # pylint: disable=line-too-long
30                     'Please add this certificate to the trusted CA bundle. More info: https://docs.microsoft.com/cli/azure/use-cli-effectively#work-behind-a-proxy.')
31
32QUERY_REFERENCE = ("To learn more about --query, please visit: "
33                   "'https://docs.microsoft.com/cli/azure/query-azure-cli'")
34
35
36_PROXYID_RE = re.compile(
37    '(?i)/subscriptions/(?P<subscription>[^/]*)(/resourceGroups/(?P<resource_group>[^/]*))?'
38    '(/providers/(?P<namespace>[^/]*)/(?P<type>[^/]*)/(?P<name>[^/]*)(?P<children>.*))?')
39
40_CHILDREN_RE = re.compile('(?i)/(?P<child_type>[^/]*)/(?P<child_name>[^/]*)')
41
42_VERSION_CHECK_TIME = 'check_time'
43_VERSION_UPDATE_TIME = 'update_time'
44
45# A list of reserved names that cannot be used as admin username of VM
46DISALLOWED_USER_NAMES = [
47    "administrator", "admin", "user", "user1", "test", "user2",
48    "test1", "user3", "admin1", "1", "123", "a", "actuser", "adm",
49    "admin2", "aspnet", "backup", "console", "guest",
50    "owner", "root", "server", "sql", "support", "support_388945a0",
51    "sys", "test2", "test3", "user4", "user5"
52]
53
54
55def handle_exception(ex):  # pylint: disable=too-many-locals, too-many-statements, too-many-branches
56    # For error code, follow guidelines at https://docs.python.org/2/library/sys.html#sys.exit,
57    from jmespath.exceptions import JMESPathError
58    from msrestazure.azure_exceptions import CloudError
59    from msrest.exceptions import HttpOperationError, ValidationError, ClientRequestError
60    from azure.common import AzureException
61    from azure.core.exceptions import AzureError
62    from requests.exceptions import SSLError, HTTPError
63    import azure.cli.core.azclierror as azclierror
64    import traceback
65
66    logger.debug("azure.cli.core.util.handle_exception is called with an exception:")
67    # Print the traceback and exception message
68    logger.debug(traceback.format_exc())
69
70    error_msg = getattr(ex, 'message', str(ex))
71    exit_code = 1
72
73    if isinstance(ex, azclierror.AzCLIError):
74        az_error = ex
75
76    elif isinstance(ex, JMESPathError):
77        error_msg = "Invalid jmespath query supplied for `--query`: {}".format(error_msg)
78        az_error = azclierror.InvalidArgumentValueError(error_msg)
79        az_error.set_recommendation(QUERY_REFERENCE)
80
81    elif isinstance(ex, SSLError):
82        az_error = azclierror.AzureConnectionError(error_msg)
83        az_error.set_recommendation(SSLERROR_TEMPLATE)
84
85    elif isinstance(ex, CloudError):
86        if extract_common_error_message(ex):
87            error_msg = extract_common_error_message(ex)
88        status_code = str(getattr(ex, 'status_code', 'Unknown Code'))
89        AzCLIErrorType = get_error_type_by_status_code(status_code)
90        az_error = AzCLIErrorType(error_msg)
91
92    elif isinstance(ex, ValidationError):
93        az_error = azclierror.ValidationError(error_msg)
94
95    elif isinstance(ex, CLIError):
96        # TODO: Fine-grained analysis here
97        az_error = azclierror.UnclassifiedUserFault(error_msg)
98
99    elif isinstance(ex, AzureError):
100        if extract_common_error_message(ex):
101            error_msg = extract_common_error_message(ex)
102        AzCLIErrorType = get_error_type_by_azure_error(ex)
103        az_error = AzCLIErrorType(error_msg)
104
105    elif isinstance(ex, AzureException):
106        if is_azure_connection_error(error_msg):
107            az_error = azclierror.AzureConnectionError(error_msg)
108        else:
109            # TODO: Fine-grained analysis here for Unknown error
110            az_error = azclierror.UnknownError(error_msg)
111
112    elif isinstance(ex, ClientRequestError):
113        if is_azure_connection_error(error_msg):
114            az_error = azclierror.AzureConnectionError(error_msg)
115        elif isinstance(ex.inner_exception, SSLError):
116            # When msrest encounters SSLError, msrest wraps SSLError in ClientRequestError
117            az_error = azclierror.AzureConnectionError(error_msg)
118            az_error.set_recommendation(SSLERROR_TEMPLATE)
119        else:
120            az_error = azclierror.ClientRequestError(error_msg)
121
122    elif isinstance(ex, HttpOperationError):
123        message, _ = extract_http_operation_error(ex)
124        if message:
125            error_msg = message
126        status_code = str(getattr(ex.response, 'status_code', 'Unknown Code'))
127        AzCLIErrorType = get_error_type_by_status_code(status_code)
128        az_error = AzCLIErrorType(error_msg)
129
130    elif isinstance(ex, HTTPError):
131        status_code = str(getattr(ex.response, 'status_code', 'Unknown Code'))
132        AzCLIErrorType = get_error_type_by_status_code(status_code)
133        az_error = AzCLIErrorType(error_msg)
134
135    elif isinstance(ex, KeyboardInterrupt):
136        error_msg = 'Keyboard interrupt is captured.'
137        az_error = azclierror.ManualInterrupt(error_msg)
138
139    else:
140        error_msg = "The command failed with an unexpected error. Here is the traceback:"
141        az_error = azclierror.CLIInternalError(error_msg)
142        az_error.set_exception_trace(ex)
143        az_error.set_recommendation("To open an issue, please run: 'az feedback'")
144
145    if isinstance(az_error, azclierror.ResourceNotFoundError):
146        exit_code = 3
147
148    az_error.print_error()
149    az_error.send_telemetry()
150
151    return exit_code
152
153
154def extract_common_error_message(ex):
155    error_msg = None
156    try:
157        error_msg = ex.args[0]
158        for detail in ex.args[0].error.details:
159            error_msg += ('\n' + detail)
160    except Exception:  # pylint: disable=broad-except
161        pass
162    return error_msg
163
164
165def extract_http_operation_error(ex):
166    error_msg = None
167    status_code = 'Unknown Code'
168    try:
169        response = json.loads(ex.response.text)
170        if isinstance(response, str):
171            error = response
172        else:
173            error = response.get('error', response.get('Error', None))
174        # ARM should use ODATA v4. So should try this first.
175        # http://docs.oasis-open.org/odata/odata-json-format/v4.0/os/odata-json-format-v4.0-os.html#_Toc372793091
176        if isinstance(error, dict):
177            status_code = error.get('code', error.get('Code', 'Unknown Code'))
178            message = error.get('message', error.get('Message', ex))
179            error_msg = "{}: {}".format(status_code, message)
180        else:
181            error_msg = error
182    except (ValueError, KeyError):
183        pass
184    return error_msg, status_code
185
186
187def get_error_type_by_azure_error(ex):
188    import azure.core.exceptions as exceptions
189    import azure.cli.core.azclierror as azclierror
190
191    if isinstance(ex, exceptions.HttpResponseError):
192        status_code = str(ex.status_code)
193        return get_error_type_by_status_code(status_code)
194    if isinstance(ex, exceptions.ResourceNotFoundError):
195        return azclierror.ResourceNotFoundError
196    if isinstance(ex, exceptions.ServiceRequestError):
197        return azclierror.ClientRequestError
198    if isinstance(ex, exceptions.ServiceRequestTimeoutError):
199        return azclierror.AzureConnectionError
200    if isinstance(ex, (exceptions.ServiceResponseError, exceptions.ServiceResponseTimeoutError)):
201        return azclierror.AzureResponseError
202
203    return azclierror.UnknownError
204
205
206# pylint: disable=too-many-return-statements
207def get_error_type_by_status_code(status_code):
208    import azure.cli.core.azclierror as azclierror
209
210    if status_code == '400':
211        return azclierror.BadRequestError
212    if status_code == '401':
213        return azclierror.UnauthorizedError
214    if status_code == '403':
215        return azclierror.ForbiddenError
216    if status_code == '404':
217        return azclierror.ResourceNotFoundError
218    if status_code.startswith('4'):
219        return azclierror.UnclassifiedUserFault
220    if status_code.startswith('5'):
221        return azclierror.AzureInternalError
222
223    return azclierror.UnknownError
224
225
226def is_azure_connection_error(error_msg):
227    error_msg = error_msg.lower()
228    if 'connection error' in error_msg \
229            or 'connection broken' in error_msg \
230            or 'connection aborted' in error_msg:
231        return True
232    return False
233
234
235# pylint: disable=inconsistent-return-statements
236def empty_on_404(ex):
237    from msrestazure.azure_exceptions import CloudError
238    if isinstance(ex, CloudError) and ex.status_code == 404:
239        return None
240    raise ex
241
242
243def truncate_text(str_to_shorten, width=70, placeholder=' [...]'):
244    if width <= 0:
245        raise ValueError('width must be greater than 0.')
246    s_len = width - len(placeholder)
247    return str_to_shorten[:s_len] + (str_to_shorten[s_len:] and placeholder)
248
249
250def get_installed_cli_distributions():
251    # Stop importing pkg_resources, because importing it is slow (~200ms).
252    # from pkg_resources import working_set
253    # return [d for d in list(working_set) if d.key == CLI_PACKAGE_NAME or d.key.startswith(COMPONENT_PREFIX)]
254
255    # Use the hard-coded version instead of querying all modules under site-packages.
256    from azure.cli.core import __version__ as azure_cli_core_version
257    from azure.cli.telemetry import __version__ as azure_cli_telemetry_version
258
259    class VersionItem:  # pylint: disable=too-few-public-methods
260        """A mock of pkg_resources.EggInfoDistribution to maintain backward compatibility."""
261        def __init__(self, key, version):
262            self.key = key
263            self.version = version
264
265    return [
266        VersionItem('azure-cli', azure_cli_core_version),
267        VersionItem('azure-cli-core', azure_cli_core_version),
268        VersionItem('azure-cli-telemetry', azure_cli_telemetry_version)
269    ]
270
271
272def get_latest_from_github(package_path='azure-cli'):
273    try:
274        import requests
275        git_url = "https://raw.githubusercontent.com/Azure/azure-cli/main/src/{}/setup.py".format(package_path)
276        response = requests.get(git_url, timeout=10)
277        if response.status_code != 200:
278            logger.info("Failed to fetch the latest version from '%s' with status code '%s' and reason '%s'",
279                        git_url, response.status_code, response.reason)
280            return None
281        for line in response.iter_lines():
282            txt = line.decode('utf-8', errors='ignore')
283            if txt.startswith('VERSION'):
284                match = re.search(r'VERSION = \"(.*)\"$', txt)
285                if match:
286                    return match.group(1)
287    except Exception as ex:  # pylint: disable=broad-except
288        logger.info("Failed to get the latest version from '%s'. %s", git_url, str(ex))
289        return None
290
291
292def _update_latest_from_github(versions):
293    if not check_connectivity(max_retries=0):
294        return versions, False
295    success = True
296    for pkg in ['azure-cli-core', 'azure-cli-telemetry']:
297        version = get_latest_from_github(pkg)
298        if not version:
299            success = False
300        else:
301            versions[pkg.replace(COMPONENT_PREFIX, '')]['pypi'] = version
302    try:
303        versions[CLI_PACKAGE_NAME]['pypi'] = versions['core']['pypi']
304    except KeyError:
305        pass
306    return versions, success
307
308
309def get_cached_latest_versions(versions=None):
310    """ Get the latest versions from a cached file"""
311    import datetime
312    from azure.cli.core._session import VERSIONS
313
314    if not versions:
315        versions = _get_local_versions()
316
317    if VERSIONS[_VERSION_UPDATE_TIME]:
318        version_update_time = datetime.datetime.strptime(VERSIONS[_VERSION_UPDATE_TIME], '%Y-%m-%d %H:%M:%S.%f')
319        if datetime.datetime.now() < version_update_time + datetime.timedelta(days=1):
320            cache_versions = VERSIONS['versions']
321            if cache_versions and cache_versions['azure-cli']['local'] == versions['azure-cli']['local']:
322                return cache_versions.copy(), True
323
324    versions, success = _update_latest_from_github(versions)
325    VERSIONS['versions'] = versions
326    VERSIONS[_VERSION_UPDATE_TIME] = str(datetime.datetime.now())
327    return versions.copy(), success
328
329
330def _get_local_versions():
331    # get locally installed versions
332    versions = {}
333    for dist in get_installed_cli_distributions():
334        if dist.key == CLI_PACKAGE_NAME:
335            versions[CLI_PACKAGE_NAME] = {'local': dist.version}
336        elif dist.key.startswith(COMPONENT_PREFIX):
337            comp_name = dist.key.replace(COMPONENT_PREFIX, '')
338            versions[comp_name] = {'local': dist.version}
339    return versions
340
341
342def get_az_version_string(use_cache=False):  # pylint: disable=too-many-statements
343    from azure.cli.core.extension import get_extensions, EXTENSIONS_DIR, DEV_EXTENSION_SOURCES, EXTENSIONS_SYS_DIR
344    import io
345    output = io.StringIO()
346    versions = _get_local_versions()
347
348    # get the versions from pypi
349    versions, success = get_cached_latest_versions(versions) if use_cache else _update_latest_from_github(versions)
350    updates_available_components = []
351
352    def _print(val=''):
353        print(val, file=output)
354
355    def _get_version_string(name, version_dict):
356        from packaging.version import parse  # pylint: disable=import-error,no-name-in-module
357        local = version_dict['local']
358        pypi = version_dict.get('pypi', None)
359        if pypi and parse(pypi) > parse(local):
360            return name.ljust(25) + local.rjust(15) + ' *'
361        return name.ljust(25) + local.rjust(15)
362
363    ver_string = _get_version_string(CLI_PACKAGE_NAME, versions.pop(CLI_PACKAGE_NAME))
364    if '*' in ver_string:
365        updates_available_components.append(CLI_PACKAGE_NAME)
366    _print(ver_string)
367    _print()
368    for name in sorted(versions.keys()):
369        ver_string = _get_version_string(name, versions.pop(name))
370        if '*' in ver_string:
371            updates_available_components.append(name)
372        _print(ver_string)
373    _print()
374    extensions = get_extensions()
375    if extensions:
376        _print('Extensions:')
377        for ext in extensions:
378            if ext.ext_type == 'dev':
379                _print(ext.name.ljust(20) + (ext.version or 'Unknown').rjust(20) + ' (dev) ' + ext.path)
380            else:
381                _print(ext.name.ljust(20) + (ext.version or 'Unknown').rjust(20))
382        _print()
383    _print("Python location '{}'".format(os.path.abspath(sys.executable)))
384    _print("Extensions directory '{}'".format(EXTENSIONS_DIR))
385    if os.path.isdir(EXTENSIONS_SYS_DIR) and os.listdir(EXTENSIONS_SYS_DIR):
386        _print("Extensions system directory '{}'".format(EXTENSIONS_SYS_DIR))
387    if DEV_EXTENSION_SOURCES:
388        _print("Development extension sources:")
389        for source in DEV_EXTENSION_SOURCES:
390            _print('    {}'.format(source))
391    _print()
392    _print('Python ({}) {}'.format(platform.system(), sys.version))
393    _print()
394    _print('Legal docs and information: aka.ms/AzureCliLegal')
395    _print()
396    version_string = output.getvalue()
397
398    # if unable to query PyPI, use sentinel value to flag that
399    # we couldn't check for updates
400    if not success:
401        updates_available_components = None
402    return version_string, updates_available_components
403
404
405def get_az_version_json():
406    from azure.cli.core.extension import get_extensions
407    versions = {'extensions': {}}
408
409    for dist in get_installed_cli_distributions():
410        versions[dist.key] = dist.version
411    extensions = get_extensions()
412    if extensions:
413        for ext in extensions:
414            versions['extensions'][ext.name] = ext.version or 'Unknown'
415    return versions
416
417
418def show_updates_available(new_line_before=False, new_line_after=False):
419    from azure.cli.core._session import VERSIONS
420    import datetime
421
422    if VERSIONS[_VERSION_CHECK_TIME]:
423        version_check_time = datetime.datetime.strptime(VERSIONS[_VERSION_CHECK_TIME], '%Y-%m-%d %H:%M:%S.%f')
424        if datetime.datetime.now() < version_check_time + datetime.timedelta(days=7):
425            return
426
427    _, updates_available_components = get_az_version_string(use_cache=True)
428    if updates_available_components:
429        if new_line_before:
430            logger.warning("")
431        show_updates(updates_available_components, only_show_when_updates_available=True)
432        if new_line_after:
433            logger.warning("")
434    VERSIONS[_VERSION_CHECK_TIME] = str(datetime.datetime.now())
435
436
437def show_updates(updates_available_components, only_show_when_updates_available=False):
438    if updates_available_components is None:
439        if not only_show_when_updates_available:
440            logger.warning('Unable to check if your CLI is up-to-date. Check your internet connection.')
441    elif updates_available_components:  # pylint: disable=too-many-nested-blocks
442        if in_cloud_console():
443            warning_msg = 'You have %i updates available. They will be updated with the next build of Cloud Shell.'
444        else:
445            warning_msg = "You have %i updates available."
446            if CLI_PACKAGE_NAME in updates_available_components:
447                warning_msg = "{} Consider updating your CLI installation with 'az upgrade'".format(warning_msg)
448        logger.warning(warning_msg, len(updates_available_components))
449    elif not only_show_when_updates_available:
450        print('Your CLI is up-to-date.')
451
452
453def get_json_object(json_string):
454    """ Loads a JSON string as an object and converts all keys to snake case """
455
456    def _convert_to_snake_case(item):
457        if isinstance(item, dict):
458            new_item = {}
459            for key, val in item.items():
460                new_item[to_snake_case(key)] = _convert_to_snake_case(val)
461            return new_item
462        if isinstance(item, list):
463            return [_convert_to_snake_case(x) for x in item]
464        return item
465
466    return _convert_to_snake_case(shell_safe_json_parse(json_string))
467
468
469def get_file_json(file_path, throw_on_empty=True, preserve_order=False):
470    content = read_file_content(file_path)
471    if not content and not throw_on_empty:
472        return None
473    try:
474        return shell_safe_json_parse(content, preserve_order)
475    except CLIError as ex:
476        raise CLIError("Failed to parse {} with exception:\n    {}".format(file_path, ex))
477
478
479def read_file_content(file_path, allow_binary=False):
480    from codecs import open as codecs_open
481    # Note, always put 'utf-8-sig' first, so that BOM in WinOS won't cause trouble.
482    for encoding in ['utf-8-sig', 'utf-8', 'utf-16', 'utf-16le', 'utf-16be']:
483        try:
484            with codecs_open(file_path, encoding=encoding) as f:
485                logger.debug("attempting to read file %s as %s", file_path, encoding)
486                return f.read()
487        except (UnicodeError, UnicodeDecodeError):
488            pass
489
490    if allow_binary:
491        try:
492            with open(file_path, 'rb') as input_file:
493                logger.debug("attempting to read file %s as binary", file_path)
494                return base64.b64encode(input_file.read()).decode("utf-8")
495        except Exception:  # pylint: disable=broad-except
496            pass
497    raise CLIError('Failed to decode file {} - unknown decoding'.format(file_path))
498
499
500def shell_safe_json_parse(json_or_dict_string, preserve_order=False, strict=True):
501    """ Allows the passing of JSON or Python dictionary strings. This is needed because certain
502    JSON strings in CMD shell are not received in main's argv. This allows the user to specify
503    the alternative notation, which does not have this problem (but is technically not JSON). """
504    try:
505        if not preserve_order:
506            return json.loads(json_or_dict_string, strict=strict)
507        from collections import OrderedDict
508        return json.loads(json_or_dict_string, object_pairs_hook=OrderedDict, strict=strict)
509    except ValueError as json_ex:
510        try:
511            import ast
512            return ast.literal_eval(json_or_dict_string)
513        except Exception as ex:
514            logger.debug(ex)  # log the exception which could be a python dict parsing error.
515
516            # Echo the JSON received by CLI
517            msg = "Failed to parse JSON: {}\nError detail: {}".format(json_or_dict_string, json_ex)
518
519            # Recommendation for all shells
520            from azure.cli.core.azclierror import InvalidArgumentValueError
521            recommendation = "The JSON may have been parsed by the shell. See " \
522                             "https://docs.microsoft.com/cli/azure/use-cli-effectively#quoting-issues"
523
524            # Recommendation especially for PowerShell
525            parent_proc = get_parent_proc_name()
526            if parent_proc and parent_proc.lower() in ("powershell.exe", "pwsh.exe"):
527                recommendation += "\nPowerShell requires additional quoting rules. See " \
528                                  "https://github.com/Azure/azure-cli/blob/dev/doc/quoting-issues-with-powershell.md"
529
530            # Raise from json_ex error which is more likely to be the original error
531            raise InvalidArgumentValueError(msg, recommendation=recommendation) from json_ex
532
533
534def b64encode(s):
535    """
536    Encodes a string to base64 on 2.x and 3.x
537    :param str s: latin_1 encoded string
538    :return: base64 encoded string
539    :rtype: str
540    """
541    encoded = base64.b64encode(s.encode("latin-1"))
542    return encoded if encoded is str else encoded.decode('latin-1')
543
544
545def b64_to_hex(s):
546    """
547    Decodes a string to base64 on 2.x and 3.x
548    :param str s: base64 encoded string
549    :return: uppercase hex string
550    :rtype: str
551    """
552    decoded = base64.b64decode(s)
553    hex_data = binascii.hexlify(decoded).upper()
554    if isinstance(hex_data, bytes):
555        return str(hex_data.decode("utf-8"))
556    return hex_data
557
558
559def random_string(length=16, force_lower=False, digits_only=False):
560    from string import ascii_letters, digits, ascii_lowercase
561    from random import choice
562    choice_set = digits
563    if not digits_only:
564        choice_set += ascii_lowercase if force_lower else ascii_letters
565    return ''.join([choice(choice_set) for _ in range(length)])
566
567
568def hash_string(value, length=16, force_lower=False):
569    """ Generate a deterministic hashed string."""
570    import hashlib
571    m = hashlib.sha256()
572    try:
573        m.update(value)
574    except TypeError:
575        m.update(value.encode())
576    digest = m.hexdigest()
577    digest = digest.lower() if force_lower else digest
578    while len(digest) < length:
579        digest = digest + digest
580    return digest[:length]
581
582
583def in_cloud_console():
584    return os.environ.get('ACC_CLOUD', None)
585
586
587def get_arg_list(op):
588    import inspect
589
590    try:
591        # only supported in python3 - falling back to argspec if not available
592        sig = inspect.signature(op)
593        return sig.parameters
594    except AttributeError:
595        sig = inspect.getargspec(op)  # pylint: disable=deprecated-method
596        return sig.args
597
598
599def is_track2(client_class):
600    """ IS this client a autorestv3/track2 one?.
601    Could be refined later if necessary.
602    """
603    from inspect import getfullargspec as get_arg_spec
604    args = get_arg_spec(client_class.__init__).args
605    return "credential" in args
606
607
608DISABLE_VERIFY_VARIABLE_NAME = "AZURE_CLI_DISABLE_CONNECTION_VERIFICATION"
609
610
611def should_disable_connection_verify():
612    return bool(os.environ.get(DISABLE_VERIFY_VARIABLE_NAME))
613
614
615def poller_classes():
616    from msrestazure.azure_operation import AzureOperationPoller
617    from msrest.polling.poller import LROPoller
618    from azure.core.polling import LROPoller as AzureCoreLROPoller
619    return (AzureOperationPoller, LROPoller, AzureCoreLROPoller)
620
621
622def augment_no_wait_handler_args(no_wait_enabled, handler, handler_args):
623    """ Populates handler_args with the appropriate args for no wait """
624    h_args = get_arg_list(handler)
625    if 'no_wait' in h_args:
626        handler_args['no_wait'] = no_wait_enabled
627    if 'raw' in h_args and no_wait_enabled:
628        # support autorest 2
629        handler_args['raw'] = True
630    if 'polling' in h_args and no_wait_enabled:
631        # support autorest 3
632        handler_args['polling'] = False
633
634    # Support track2 SDK.
635    # In track2 SDK, there is no parameter 'polling' in SDK, but just use '**kwargs'.
636    # So we check the name of the operation to see if it's a long running operation.
637    # The name of long running operation in SDK is like 'begin_xxx_xxx'.
638    op_name = handler.__name__
639    if op_name and op_name.startswith('begin_') and no_wait_enabled:
640        handler_args['polling'] = False
641
642
643def sdk_no_wait(no_wait, func, *args, **kwargs):
644    if no_wait:
645        kwargs.update({'polling': False})
646    return func(*args, **kwargs)
647
648
649def open_page_in_browser(url):
650    import subprocess
651    import webbrowser
652    platform_name, _ = _get_platform_info()
653
654    if is_wsl():   # windows 10 linux subsystem
655        try:
656            # https://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/about/about_powershell_exe
657            # Ampersand (&) should be quoted
658            return subprocess.Popen(
659                ['powershell.exe', '-NoProfile', '-Command', 'Start-Process "{}"'.format(url)]).wait()
660        except OSError:  # WSL might be too old  # FileNotFoundError introduced in Python 3
661            pass
662    elif platform_name == 'darwin':
663        # handle 2 things:
664        # a. On OSX sierra, 'python -m webbrowser -t <url>' emits out "execution error: <url> doesn't
665        #    understand the "open location" message"
666        # b. Python 2.x can't sniff out the default browser
667        return subprocess.Popen(['open', url])
668    try:
669        return webbrowser.open(url, new=2)  # 2 means: open in a new tab, if possible
670    except TypeError:  # See https://bugs.python.org/msg322439
671        return webbrowser.open(url, new=2)
672
673
674def _get_platform_info():
675    uname = platform.uname()
676    # python 2, `platform.uname()` returns: tuple(system, node, release, version, machine, processor)
677    platform_name = getattr(uname, 'system', None) or uname[0]
678    release = getattr(uname, 'release', None) or uname[2]
679    return platform_name.lower(), release.lower()
680
681
682def is_wsl():
683    platform_name, release = _get_platform_info()
684    # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364
685    # Run `uname -a` to get 'release' without python
686    #   - WSL 1: '4.4.0-19041-Microsoft'
687    #   - WSL 2: '4.19.128-microsoft-standard'
688    return platform_name == 'linux' and 'microsoft' in release
689
690
691def is_windows():
692    platform_name, _ = _get_platform_info()
693    return platform_name == 'windows'
694
695
696def can_launch_browser():
697    import webbrowser
698    platform_name, _ = _get_platform_info()
699    if is_wsl() or platform_name != 'linux':
700        return True
701    # per https://unix.stackexchange.com/questions/46305/is-there-a-way-to-retrieve-the-name-of-the-desktop-environment
702    # and https://unix.stackexchange.com/questions/193827/what-is-display-0
703    # we can check a few env vars
704    gui_env_vars = ['DESKTOP_SESSION', 'XDG_CURRENT_DESKTOP', 'DISPLAY']
705    result = True
706    if platform_name == 'linux':
707        if any(os.getenv(v) for v in gui_env_vars):
708            try:
709                default_browser = webbrowser.get()
710                if getattr(default_browser, 'name', None) == 'www-browser':  # text browser won't work
711                    result = False
712            except webbrowser.Error:
713                result = False
714        else:
715            result = False
716
717    return result
718
719
720def get_command_type_kwarg(custom_command=False):
721    return 'custom_command_type' if custom_command else 'command_type'
722
723
724def reload_module(module):
725    # reloading the imported module to update
726    if module in sys.modules:
727        from importlib import reload
728        reload(sys.modules[module])
729
730
731def get_default_admin_username():
732    try:
733        username = getpass.getuser()
734    except KeyError:
735        username = None
736    if username is None or username.lower() in DISALLOWED_USER_NAMES:
737        logger.warning('Default username %s is a reserved username. Use azureuser instead.', username)
738        username = 'azureuser'
739    return username
740
741
742def _find_child(parent, *args, **kwargs):
743    # tuple structure (path, key, dest)
744    path = kwargs.get('path', None)
745    key_path = kwargs.get('key_path', None)
746    comps = zip(path.split('.'), key_path.split('.'), args)
747    current = parent
748    for path, key, val in comps:
749        current = getattr(current, path, None)
750        if current is None:
751            raise CLIError("collection '{}' not found".format(path))
752        match = next((x for x in current if getattr(x, key).lower() == val.lower()), None)
753        if match is None:
754            raise CLIError("item '{}' not found in {}".format(val, path))
755        current = match
756    return current
757
758
759def find_child_item(parent, *args, **kwargs):
760    path = kwargs.get('path', '')
761    key_path = kwargs.get('key_path', '')
762    if len(args) != len(path.split('.')) != len(key_path.split('.')):
763        raise CLIError('command authoring error: args, path and key_path must have equal number of components.')
764    return _find_child(parent, *args, path=path, key_path=key_path)
765
766
767def find_child_collection(parent, *args, **kwargs):
768    path = kwargs.get('path', '')
769    key_path = kwargs.get('key_path', '')
770    arg_len = len(args)
771    key_len = len(key_path.split('.'))
772    path_len = len(path.split('.'))
773    if arg_len != key_len and path_len != arg_len + 1:
774        raise CLIError('command authoring error: args and key_path must have equal number of components, and '
775                       'path must have one extra component (the path to the collection of interest.')
776    parent = _find_child(parent, *args, path=path, key_path=key_path)
777    collection_path = path.split('.')[-1]
778    collection = getattr(parent, collection_path, None)
779    if collection is None:
780        raise CLIError("collection '{}' not found".format(collection_path))
781    return collection
782
783
784def check_connectivity(url='https://azure.microsoft.com', max_retries=5, timeout=1):
785    import requests
786    import timeit
787    start = timeit.default_timer()
788    success = None
789    try:
790        with requests.Session() as s:
791            s.mount(url, requests.adapters.HTTPAdapter(max_retries=max_retries))
792            s.head(url, timeout=timeout)
793            success = True
794    except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as ex:
795        logger.info('Connectivity problem detected.')
796        logger.debug(ex)
797        success = False
798    stop = timeit.default_timer()
799    logger.debug('Connectivity check: %s sec', stop - start)
800    return success
801
802
803def send_raw_request(cli_ctx, method, url, headers=None, uri_parameters=None,  # pylint: disable=too-many-locals,too-many-branches,too-many-statements
804                     body=None, skip_authorization_header=False, resource=None, output_file=None,
805                     generated_client_request_id_name='x-ms-client-request-id'):
806    import uuid
807    from requests import Session, Request
808    from requests.structures import CaseInsensitiveDict
809
810    result = CaseInsensitiveDict()
811    for s in headers or []:
812        try:
813            temp = shell_safe_json_parse(s)
814            result.update(temp)
815        except CLIError:
816            key, value = s.split('=', 1)
817            result[key] = value
818    headers = result
819
820    # If Authorization header is already provided, don't bother with the token
821    if 'Authorization' in headers:
822        skip_authorization_header = True
823
824    # Handle User-Agent
825    agents = [get_az_rest_user_agent()]
826
827    # Borrow AZURE_HTTP_USER_AGENT from msrest
828    # https://github.com/Azure/msrest-for-python/blob/4cc8bc84e96036f03b34716466230fb257e27b36/msrest/pipeline/universal.py#L70
829    _ENV_ADDITIONAL_USER_AGENT = 'AZURE_HTTP_USER_AGENT'
830    if _ENV_ADDITIONAL_USER_AGENT in os.environ:
831        agents.append(os.environ[_ENV_ADDITIONAL_USER_AGENT])
832
833    # Custom User-Agent provided as command argument
834    if 'User-Agent' in headers:
835        agents.append(headers['User-Agent'])
836    headers['User-Agent'] = ' '.join(agents)
837
838    if generated_client_request_id_name:
839        headers[generated_client_request_id_name] = str(uuid.uuid4())
840
841    # try to figure out the correct content type
842    if body:
843        try:
844            _ = shell_safe_json_parse(body)
845            if 'Content-Type' not in headers:
846                headers['Content-Type'] = 'application/json'
847        except Exception:  # pylint: disable=broad-except
848            pass
849
850    # add telemetry
851    headers['CommandName'] = cli_ctx.data['command']
852    if cli_ctx.data.get('safe_params'):
853        headers['ParameterSetName'] = ' '.join(cli_ctx.data['safe_params'])
854
855    result = {}
856    for s in uri_parameters or []:
857        try:
858            temp = shell_safe_json_parse(s)
859            result.update(temp)
860        except CLIError:
861            key, value = s.split('=', 1)
862            result[key] = value
863    uri_parameters = result or None
864
865    endpoints = cli_ctx.cloud.endpoints
866    # If url is an ARM resource ID, like /subscriptions/xxx/resourcegroups/xxx?api-version=2019-07-01,
867    # default to Azure Resource Manager.
868    # https://management.azure.com + /subscriptions/xxx/resourcegroups/xxx?api-version=2019-07-01
869    if '://' not in url:
870        url = endpoints.resource_manager.rstrip('/') + url
871
872    # Replace common tokens with real values. It is for smooth experience if users copy and paste the url from
873    # Azure Rest API doc
874    from azure.cli.core._profile import Profile
875    profile = Profile(cli_ctx=cli_ctx)
876    if '{subscriptionId}' in url:
877        url = url.replace('{subscriptionId}', cli_ctx.data['subscription_id'] or profile.get_subscription_id())
878
879    # Prepare the Bearer token for `Authorization` header
880    if not skip_authorization_header and url.lower().startswith('https://'):
881        # Prepare `resource` for `get_raw_token`
882        if not resource:
883            # If url starts with ARM endpoint, like `https://management.azure.com/`,
884            # use `active_directory_resource_id` for resource, like `https://management.core.windows.net/`.
885            # This follows the same behavior as `azure.cli.core.commands.client_factory._get_mgmt_service_client`
886            if url.lower().startswith(endpoints.resource_manager.rstrip('/')):
887                resource = endpoints.active_directory_resource_id
888            else:
889                from azure.cli.core.cloud import CloudEndpointNotSetException
890                for p in [x for x in dir(endpoints) if not x.startswith('_')]:
891                    try:
892                        value = getattr(endpoints, p)
893                    except CloudEndpointNotSetException:
894                        continue
895                    if isinstance(value, str) and url.lower().startswith(value.lower()):
896                        resource = value
897                        break
898        if resource:
899            # Prepare `subscription` for `get_raw_token`
900            # If this is an ARM request, try to extract subscription ID from the URL.
901            # But there are APIs which don't require subscription ID, like /subscriptions, /tenants
902            # TODO: In the future when multi-tenant subscription is supported, we won't be able to uniquely identify
903            #   the token from subscription anymore.
904            token_subscription = None
905            if url.lower().startswith(endpoints.resource_manager.rstrip('/')):
906                token_subscription = _extract_subscription_id(url)
907            if token_subscription:
908                logger.debug('Retrieving token for resource %s, subscription %s', resource, token_subscription)
909                token_info, _, _ = profile.get_raw_token(resource, subscription=token_subscription)
910            else:
911                logger.debug('Retrieving token for resource %s', resource)
912                token_info, _, _ = profile.get_raw_token(resource)
913            token_type, token, _ = token_info
914            headers = headers or {}
915            headers['Authorization'] = '{} {}'.format(token_type, token)
916        else:
917            logger.warning("Can't derive appropriate Azure AD resource from --url to acquire an access token. "
918                           "If access token is required, use --resource to specify the resource")
919
920    # https://requests.readthedocs.io/en/latest/user/advanced/#prepared-requests
921    s = Session()
922    req = Request(method=method, url=url, headers=headers, params=uri_parameters, data=body)
923    prepped = s.prepare_request(req)
924
925    # Merge environment settings into session
926    settings = s.merge_environment_settings(prepped.url, {}, None, not should_disable_connection_verify(), None)
927    _log_request(prepped)
928    r = s.send(prepped, **settings)
929    _log_response(r)
930
931    if not r.ok:
932        reason = r.reason
933        if r.text:
934            reason += '({})'.format(r.text)
935        raise CLIError(reason)
936    if output_file:
937        with open(output_file, 'wb') as fd:
938            for chunk in r.iter_content(chunk_size=128):
939                fd.write(chunk)
940    return r
941
942
943def _extract_subscription_id(url):
944    """Extract the subscription ID from an ARM request URL."""
945    subscription_regex = '/subscriptions/([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})'
946    match = re.search(subscription_regex, url, re.IGNORECASE)
947    if match:
948        subscription_id = match.groups()[0]
949        logger.debug('Found subscription ID %s in the URL %s', subscription_id, url)
950        return subscription_id
951    logger.debug('No subscription ID specified in the URL %s', url)
952    return None
953
954
955def _log_request(request):
956    """Log a client request. Copied from msrest
957    https://github.com/Azure/msrest-for-python/blob/3653d29fc44da408898b07c710290a83d196b777/msrest/http_logger.py#L39
958    """
959    if not logger.isEnabledFor(logging.DEBUG):
960        return
961
962    try:
963        logger.info("Request URL: %r", request.url)
964        logger.info("Request method: %r", request.method)
965        logger.info("Request headers:")
966        for header, value in request.headers.items():
967            if header.lower() == 'authorization':
968                # Trim at least half of the token but keep at most 20 characters
969                preserve_length = min(int(len(value) * 0.5), 20)
970                value = value[:preserve_length] + '...'
971            logger.info("    %r: %r", header, value)
972        logger.info("Request body:")
973
974        # We don't want to log the binary data of a file upload.
975        import types
976        if isinstance(request.body, types.GeneratorType):
977            logger.info("File upload")
978        else:
979            logger.info(str(request.body))
980    except Exception as err:  # pylint: disable=broad-except
981        logger.info("Failed to log request: %r", err)
982
983
984def _log_response(response, **kwargs):
985    """Log a server response. Copied from msrest
986    https://github.com/Azure/msrest-for-python/blob/3653d29fc44da408898b07c710290a83d196b777/msrest/http_logger.py#L68
987    """
988    if not logger.isEnabledFor(logging.DEBUG):
989        return None
990
991    try:
992        logger.info("Response status: %r", response.status_code)
993        logger.info("Response headers:")
994        for res_header, value in response.headers.items():
995            logger.info("    %r: %r", res_header, value)
996
997        # We don't want to log binary data if the response is a file.
998        logger.info("Response content:")
999        pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE)
1000        header = response.headers.get('content-disposition')
1001
1002        if header and pattern.match(header):
1003            filename = header.partition('=')[2]
1004            logger.info("File attachments: %s", filename)
1005        elif response.headers.get("content-type", "").endswith("octet-stream"):
1006            logger.info("Body contains binary data.")
1007        elif response.headers.get("content-type", "").startswith("image"):
1008            logger.info("Body contains image data.")
1009        else:
1010            if kwargs.get('stream', False):
1011                logger.info("Body is streamable")
1012            else:
1013                logger.info(response.content.decode("utf-8-sig"))
1014        return response
1015    except Exception as err:  # pylint: disable=broad-except
1016        logger.info("Failed to log response: %s", repr(err))
1017        return response
1018
1019
1020class ScopedConfig:
1021
1022    def __init__(self, cli_config, use_local_config=None):
1023        self.use_local_config = use_local_config
1024        if self.use_local_config is None:
1025            self.use_local_config = False
1026        self.cli_config = cli_config
1027        # here we use getattr/setattr to prepare the situation that "use_local_config" might not be available
1028        self.original_use_local_config = getattr(cli_config, 'use_local_config', None)
1029
1030    def __enter__(self):
1031        self.cli_config.use_local_config = self.use_local_config
1032
1033    def __exit__(self, exc_type, exc_val, exc_tb):
1034        setattr(self.cli_config, 'use_local_config', self.original_use_local_config)
1035
1036
1037ConfiguredDefaultSetter = ScopedConfig
1038
1039
1040def _ssl_context():
1041    if sys.version_info < (3, 4) or (in_cloud_console() and platform.system() == 'Windows'):
1042        try:
1043            return ssl.SSLContext(ssl.PROTOCOL_TLS)  # added in python 2.7.13 and 3.6
1044        except AttributeError:
1045            return ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1046
1047    return ssl.create_default_context()
1048
1049
1050def urlretrieve(url):
1051    req = urlopen(url, context=_ssl_context())
1052    return req.read()
1053
1054
1055def parse_proxy_resource_id(rid):
1056    """Parses a resource_id into its various parts.
1057
1058    Return an empty dictionary, if invalid resource id.
1059
1060    :param rid: The resource id being parsed
1061    :type rid: str
1062    :returns: A dictionary with with following key/value pairs (if found):
1063
1064        - subscription:            Subscription id
1065        - resource_group:          Name of resource group
1066        - namespace:               Namespace for the resource provider (i.e. Microsoft.Compute)
1067        - type:                    Type of the root resource (i.e. virtualMachines)
1068        - name:                    Name of the root resource
1069        - child_type_{level}:      Type of the child resource of that level
1070        - child_name_{level}:      Name of the child resource of that level
1071        - last_child_num:          Level of the last child
1072
1073    :rtype: dict[str,str]
1074    """
1075    if not rid:
1076        return {}
1077    match = _PROXYID_RE.match(rid)
1078    if match:
1079        result = match.groupdict()
1080        children = _CHILDREN_RE.finditer(result['children'] or '')
1081        count = None
1082        for count, child in enumerate(children):
1083            result.update({
1084                key + '_%d' % (count + 1): group for key, group in child.groupdict().items()})
1085        result['last_child_num'] = count + 1 if isinstance(count, int) else None
1086        result.pop('children', None)
1087        return {key: value for key, value in result.items() if value is not None}
1088    return None
1089
1090
1091def get_az_user_agent():
1092    # Dynamically load the core version
1093    from azure.cli.core import __version__ as core_version
1094
1095    agents = ["AZURECLI/{}".format(core_version)]
1096
1097    from azure.cli.core._environment import _ENV_AZ_INSTALLER
1098    if _ENV_AZ_INSTALLER in os.environ:
1099        agents.append('({})'.format(os.environ[_ENV_AZ_INSTALLER]))
1100
1101    # msrest already has this
1102    # https://github.com/Azure/msrest-for-python/blob/4cc8bc84e96036f03b34716466230fb257e27b36/msrest/pipeline/universal.py#L70
1103    # if ENV_ADDITIONAL_USER_AGENT in os.environ:
1104    #     agents.append(os.environ[ENV_ADDITIONAL_USER_AGENT])
1105
1106    return ' '.join(agents)
1107
1108
1109def get_az_rest_user_agent():
1110    """Get User-Agent for az rest calls"""
1111
1112    agents = ['python/{}'.format(platform.python_version()),
1113              '({})'.format(platform.platform()),
1114              get_az_user_agent()
1115              ]
1116
1117    return ' '.join(agents)
1118
1119
1120def user_confirmation(message, yes=False):
1121    if yes:
1122        return
1123    from knack.prompting import prompt_y_n, NoTTYException
1124    try:
1125        if not prompt_y_n(message):
1126            raise CLIError('Operation cancelled.')
1127    except NoTTYException:
1128        raise CLIError(
1129            'Unable to prompt for confirmation as no tty available. Use --yes.')
1130
1131
1132def get_linux_distro():
1133    if platform.system() != 'Linux':
1134        return None, None
1135
1136    try:
1137        with open('/etc/os-release') as lines:
1138            tokens = [line.strip() for line in lines]
1139    except Exception:  # pylint: disable=broad-except
1140        return None, None
1141
1142    release_info = {}
1143    for token in tokens:
1144        if '=' in token:
1145            k, v = token.split('=', 1)
1146            release_info[k.lower()] = v.strip('"')
1147
1148    return release_info.get('name', None), release_info.get('version_id', None)
1149
1150
1151def roughly_parse_command(args):
1152    # Roughly parse the command part: <az vm create> --name vm1
1153    # Similar to knack.invocation.CommandInvoker._rudimentary_get_command, but we don't need to bother with
1154    # positional args
1155    nouns = []
1156    for arg in args:
1157        if arg and arg[0] != '-':
1158            nouns.append(arg)
1159        else:
1160            break
1161    return ' '.join(nouns).lower()
1162
1163
1164def is_guid(guid):
1165    import uuid
1166    try:
1167        uuid.UUID(guid)
1168        return True
1169    except ValueError:
1170        return False
1171
1172
1173def handle_version_update():
1174    """Clean up information in local files that may be invalidated
1175    because of a version update of Azure CLI
1176    """
1177    try:
1178        from azure.cli.core._session import VERSIONS
1179        from packaging.version import parse  # pylint: disable=import-error,no-name-in-module
1180        from azure.cli.core import __version__
1181        if not VERSIONS['versions']:
1182            get_cached_latest_versions()
1183        elif parse(VERSIONS['versions']['core']['local']) != parse(__version__):
1184            logger.debug("Azure CLI has been updated.")
1185            logger.debug("Clean up versions and refresh cloud endpoints information in local files.")
1186            VERSIONS['versions'] = {}
1187            VERSIONS['update_time'] = ''
1188            from azure.cli.core.cloud import refresh_known_clouds
1189            refresh_known_clouds()
1190    except Exception as ex:  # pylint: disable=broad-except
1191        logger.warning(ex)
1192
1193
1194def resource_to_scopes(resource):
1195    """Convert the ADAL resource ID to MSAL scopes by appending the /.default suffix and return a list.
1196    For example:
1197       'https://management.core.windows.net/' -> ['https://management.core.windows.net//.default']
1198       'https://managedhsm.azure.com' -> ['https://managedhsm.azure.com/.default']
1199
1200    :param resource: The ADAL resource ID
1201    :return: A list of scopes
1202    """
1203    # https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-permissions-and-consent#trailing-slash-and-default
1204    # We should not trim the trailing slash, like in https://management.azure.com/
1205    # In other word, the trailing slash should be preserved and scope should be https://management.azure.com//.default
1206    scope = resource + '/.default'
1207    return [scope]
1208
1209
1210def scopes_to_resource(scopes):
1211    """Convert MSAL scopes to ADAL resource by stripping the /.default suffix and return a str.
1212    For example:
1213       ['https://management.core.windows.net//.default'] -> 'https://management.core.windows.net/'
1214       ['https://managedhsm.azure.com/.default'] -> 'https://managedhsm.azure.com'
1215
1216    :param scopes: The MSAL scopes. It can be a list or tuple of string
1217    :return: The ADAL resource
1218    :rtype: str
1219    """
1220    scope = scopes[0]
1221
1222    suffixes = ['/.default', '/user_impersonation']
1223
1224    for s in suffixes:
1225        if scope.endswith(s):
1226            return scope[:-len(s)]
1227
1228    return scope
1229
1230
1231def _get_parent_proc_name():
1232    # Un-cached function to get parent process name.
1233    try:
1234        import psutil
1235    except ImportError as ex:
1236        logger.debug(ex)
1237        return None
1238
1239    try:
1240        parent = psutil.Process(os.getpid()).parent()
1241
1242        # On Windows, when CLI is run inside a virtual env, there will be 2 python.exe.
1243        if parent and parent.name().lower() == 'python.exe':
1244            parent = parent.parent()
1245
1246        if parent:
1247            # On Windows, powershell.exe launches cmd.exe to launch python.exe.
1248            grandparent = parent.parent()
1249            if grandparent:
1250                grandparent_name = grandparent.name().lower()
1251                if grandparent_name in ("powershell.exe", "pwsh.exe"):
1252                    return grandparent.name()
1253            # if powershell.exe or pwsh.exe is not the grandparent, simply return the parent's name.
1254            return parent.name()
1255    except psutil.AccessDenied as ex:
1256        # Ignore due to https://github.com/giampaolo/psutil/issues/1980
1257        logger.debug(ex)
1258    return None
1259
1260
1261def get_parent_proc_name():
1262    # This function wraps _get_parent_proc_name, as psutil calls are time-consuming, so use a
1263    # function-level cache to save the result.
1264    # NOTE: The return value may be None if getting parent proc name fails, so always remember to
1265    # check it first before calling string methods like lower().
1266    if not hasattr(get_parent_proc_name, "return_value"):
1267        parent_proc_name = _get_parent_proc_name()
1268        setattr(get_parent_proc_name, "return_value", parent_proc_name)
1269    return getattr(get_parent_proc_name, "return_value")
1270
1271
1272def is_modern_terminal():
1273    """In addition to knack.util.is_modern_terminal, detect Cloud Shell."""
1274    import knack.util
1275    return knack.util.is_modern_terminal() or in_cloud_console()
1276
1277
1278def rmtree_with_retry(path):
1279    # A workaround for https://bugs.python.org/issue33240
1280    # Retry shutil.rmtree several times, but even if it fails after several retries, don't block the command execution.
1281    retry_num = 3
1282    import time
1283    while True:
1284        try:
1285            import shutil
1286            shutil.rmtree(path)
1287            return
1288        except OSError as err:
1289            if retry_num > 0:
1290                logger.warning("Failed to delete '%s': %s. Retrying ...", path, err)
1291                retry_num -= 1
1292                time.sleep(1)
1293            else:
1294                logger.warning("Failed to delete '%s': %s. You may try to delete it manually.", path, err)
1295                break
1296