1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6import difflib 7 8import argparse 9import argcomplete 10 11import azure.cli.core.telemetry as telemetry 12from azure.cli.core.extension import get_extension 13from azure.cli.core.commands import ExtensionCommandSource 14from azure.cli.core.commands import AzCliCommandInvoker 15from azure.cli.core.commands.events import EVENT_INVOKER_ON_TAB_COMPLETION 16from azure.cli.core.command_recommender import CommandRecommender 17from azure.cli.core.azclierror import UnrecognizedArgumentError 18from azure.cli.core.azclierror import RequiredArgumentMissingError 19from azure.cli.core.azclierror import InvalidArgumentValueError 20from azure.cli.core.azclierror import ArgumentUsageError 21from azure.cli.core.azclierror import CommandNotFoundError 22from azure.cli.core.azclierror import ValidationError 23 24from knack.log import get_logger 25from knack.parser import CLICommandParser 26from knack.util import CLIError 27 28logger = get_logger(__name__) 29 30EXTENSION_REFERENCE = ("If the command is from an extension, " 31 "please make sure the corresponding extension is installed. " 32 "To learn more about extensions, please visit " 33 "'https://docs.microsoft.com/cli/azure/azure-cli-extensions-overview'") 34 35OVERVIEW_REFERENCE = ("https://aka.ms/cli_ref") 36 37 38class IncorrectUsageError(CLIError): 39 '''Raised when a command is incorrectly used and the usage should be 40 displayed to the user. 41 ''' 42 pass # pylint: disable=unnecessary-pass 43 44 45class AzCompletionFinder(argcomplete.CompletionFinder): 46 47 def _get_completions(self, comp_words, cword_prefix, cword_prequote, last_wordbreak_pos): 48 external_completions = [] 49 self._parser.cli_ctx.raise_event(EVENT_INVOKER_ON_TAB_COMPLETION, 50 external_completions=external_completions, 51 parser=self._parser, 52 comp_words=comp_words, 53 cword_prefix=cword_prefix, 54 cword_prequote=cword_prequote, 55 last_wordbreak_pos=last_wordbreak_pos) 56 57 return external_completions + super(AzCompletionFinder, self)._get_completions(comp_words, 58 cword_prefix, 59 cword_prequote, 60 last_wordbreak_pos) 61 62 63class AzCliCommandParser(CLICommandParser): 64 """ArgumentParser implementation specialized for the Azure CLI utility.""" 65 66 def __init__(self, cli_ctx=None, cli_help=None, **kwargs): 67 self.command_source = kwargs.pop('_command_source', None) 68 self._raw_arguments = None 69 self._namespace = None 70 self._suggestion_msg = [] 71 self.subparser_map = {} 72 self.specified_arguments = [] 73 super(AzCliCommandParser, self).__init__(cli_ctx, cli_help=cli_help, **kwargs) 74 75 def load_command_table(self, command_loader): 76 """Load a command table into our parser.""" 77 # If we haven't already added a subparser, we 78 # better do it. 79 cmd_tbl = command_loader.command_table 80 grp_tbl = command_loader.command_group_table 81 if not self.subparsers: 82 sp = self.add_subparsers(dest='_command_package') 83 sp.required = True 84 self.subparsers = {(): sp} 85 86 for command_name, metadata in cmd_tbl.items(): 87 subparser = self._get_subparser(command_name.split(), grp_tbl) 88 deprecate_info = metadata.deprecate_info 89 if not subparser or (deprecate_info and deprecate_info.expired()): 90 continue 91 92 command_verb = command_name.split()[-1] 93 # To work around http://bugs.python.org/issue9253, we artificially add any new 94 # parsers we add to the "choices" section of the subparser. 95 subparser.choices[command_verb] = command_verb 96 97 # inject command_module designer's help formatter -- default is HelpFormatter 98 fc = metadata.formatter_class or argparse.HelpFormatter 99 100 command_parser = subparser.add_parser(command_verb, 101 description=metadata.description, 102 parents=self.parents, 103 conflict_handler='error', 104 help_file=metadata.help, 105 formatter_class=fc, 106 cli_help=self.cli_help, 107 _command_source=metadata.command_source) 108 self.subparser_map[command_name] = command_parser 109 command_parser.cli_ctx = self.cli_ctx 110 command_validator = metadata.validator 111 argument_validators = [] 112 argument_groups = {} 113 for _, arg in metadata.arguments.items(): 114 # don't add deprecated arguments to the parser 115 deprecate_info = arg.type.settings.get('deprecate_info', None) 116 if deprecate_info and deprecate_info.expired(): 117 continue 118 119 if arg.validator: 120 argument_validators.append(arg.validator) 121 try: 122 if arg.arg_group: 123 try: 124 group = argument_groups[arg.arg_group] 125 except KeyError: 126 # group not found so create 127 group_name = '{} Arguments'.format(arg.arg_group) 128 group = command_parser.add_argument_group(arg.arg_group, group_name) 129 argument_groups[arg.arg_group] = group 130 param = AzCliCommandParser._add_argument(group, arg) 131 else: 132 param = AzCliCommandParser._add_argument(command_parser, arg) 133 except argparse.ArgumentError as ex: 134 raise CLIError("command authoring error for '{}': '{}' {}".format( 135 command_name, ex.args[0].dest, ex.message)) # pylint: disable=no-member 136 param.completer = arg.completer 137 param.deprecate_info = arg.deprecate_info 138 param.preview_info = arg.preview_info 139 param.experimental_info = arg.experimental_info 140 param.default_value_source = arg.default_value_source 141 command_parser.set_defaults( 142 func=metadata, 143 command=command_name, 144 _cmd=metadata, 145 _command_validator=command_validator, 146 _argument_validators=argument_validators, 147 _parser=command_parser) 148 149 def validation_error(self, message): 150 az_error = ValidationError(message) 151 az_error.print_error() 152 az_error.send_telemetry() 153 self.exit(2) 154 155 def error(self, message): 156 # Get a recommended command from the CommandRecommender 157 command_arguments = self._get_failure_recovery_arguments() 158 cli_ctx = self.cli_ctx or (self.cli_help.cli_ctx if self.cli_help else None) 159 recommender = CommandRecommender(*command_arguments, message, cli_ctx) 160 recommender.set_help_examples(self.get_examples(self.prog)) 161 recommendations = recommender.provide_recommendations() 162 163 az_error = ArgumentUsageError(message) 164 if 'unrecognized arguments' in message: 165 az_error = UnrecognizedArgumentError(message) 166 elif 'arguments are required' in message: 167 az_error = RequiredArgumentMissingError(message) 168 elif 'invalid' in message: 169 az_error = InvalidArgumentValueError(message) 170 171 if '--query' in message: 172 from azure.cli.core.util import QUERY_REFERENCE 173 az_error.set_recommendation(QUERY_REFERENCE) 174 elif recommendations: 175 az_error.set_aladdin_recommendation(recommendations) 176 az_error.print_error() 177 az_error.send_telemetry() 178 self.exit(2) 179 180 def format_help(self): 181 extension_version = None 182 extension_name = None 183 try: 184 if isinstance(self.command_source, ExtensionCommandSource): 185 extension_name = self.command_source.extension_name 186 extension_version = get_extension(self.command_source.extension_name).version 187 except Exception: # pylint: disable=broad-except 188 pass 189 190 telemetry.set_command_details( 191 command=self.prog[3:], 192 extension_name=extension_name, 193 extension_version=extension_version) 194 telemetry.set_success(summary='show help') 195 super(AzCliCommandParser, self).format_help() 196 197 def get_examples(self, command): 198 if not self.cli_help: 199 return [] 200 is_group = self.is_group() 201 return self.cli_help.get_examples(command, 202 self._actions[-1] if is_group else self, 203 is_group) 204 205 def enable_autocomplete(self): 206 argcomplete.autocomplete = AzCompletionFinder() 207 argcomplete.autocomplete(self, validator=lambda c, p: c.lower().startswith(p.lower()), 208 default_completer=lambda *args, **kwargs: ()) 209 210 def _get_failure_recovery_arguments(self, action=None): 211 # Strip the leading "az " and any extraneous whitespace. 212 command = self.prog[3:].strip() 213 parameters = [] 214 parameter_set = set() 215 raw_arguments = None 216 extension = None 217 218 # Extract only parameter names to ensure GPDR compliance 219 def extract_safe_params(parameters): 220 return AzCliCommandInvoker._extract_parameter_names(parameters) # pylint: disable=protected-access 221 222 # Check for extension name attribute 223 def has_extension_name(command_source): 224 is_extension_command_source = isinstance(command_source, ExtensionCommandSource) 225 has_extension_name = False 226 227 if is_extension_command_source: 228 has_extension_name = hasattr(command_source, 'extension_name') 229 230 return is_extension_command_source and has_extension_name 231 232 # If the arguments have been processed into a namespace... 233 if self._namespace: 234 # Select the parsed command. 235 if hasattr(self._namespace, 'command'): 236 command = self._namespace.command 237 # Parse parameter names from user input. 238 if self._raw_arguments: 239 raw_arguments = self._raw_arguments 240 parameters = extract_safe_params(self._raw_arguments) 241 242 for parameter in parameters: 243 parameter_set.add(parameter) 244 245 # If we can retrieve the extension from the current parser's command source... 246 if has_extension_name(self.command_source): 247 extension = self.command_source.extension_name 248 # Otherwise, the command may have not been in a command group. The command source will not be 249 # set in this case. 250 elif action and action.dest in ('_subcommand', '_command_package'): 251 # Get all parsers in the set of possible actions. 252 parsers = list(action.choices.values()) 253 parser = parsers[0] if parsers else None 254 # If the first parser comes from an extension... 255 if parser and has_extension_name(parser.command_source): 256 # We're looking for a subcommand under an extension command group. Set the 257 # extension to reflect this. 258 extension = parser.command_source.extension_name 259 # Extend the command if the first raw argument is not a parameter. 260 if raw_arguments and raw_arguments[0] not in parameter_set: 261 command = '{cmd} {arg}'.format(cmd=command, arg=raw_arguments[0]) 262 # Otherwise, only set the extension if every subparser comes from an extension. This occurs 263 # when an unrecognized argument is passed to a command from an extension. 264 elif isinstance(self.subparser_map, dict): 265 for _, subparser in self.subparser_map.items(): 266 if isinstance(subparser.command_source, ExtensionCommandSource): 267 extension = subparser.command_source.extension_name 268 else: 269 extension = None 270 break 271 272 return command, self._raw_arguments, extension 273 274 def _get_values(self, action, arg_strings): 275 value = super(AzCliCommandParser, self)._get_values(action, arg_strings) 276 if action.dest and isinstance(action.dest, str) and not action.dest.startswith('_'): 277 self.specified_arguments.append(action.dest) 278 return value 279 280 def parse_known_args(self, args=None, namespace=None): 281 # retrieve the raw argument list in case parsing known arguments fails. 282 self._raw_arguments = args 283 # if parsing known arguments succeeds, get the command namespace and the argument list 284 self._namespace, self._raw_arguments = super().parse_known_args(args=args, namespace=namespace) 285 return self._namespace, self._raw_arguments 286 287 def _check_value(self, action, value): 288 # Override to customize the error message when a argument is not among the available choices 289 # converted value must be one of the choices (if specified) 290 if action.choices is not None and value not in action.choices: # pylint: disable=too-many-nested-blocks 291 # self.cli_ctx is None when self.prog is beyond 'az', such as 'az iot'. 292 # use cli_ctx from cli_help which is not lost. 293 cli_ctx = self.cli_ctx or (self.cli_help.cli_ctx if self.cli_help else None) 294 295 command_name_inferred = self.prog 296 use_dynamic_install = 'no' 297 if not self.command_source: 298 from azure.cli.core.extension.dynamic_install import try_install_extension 299 candidates = [] 300 args = self.prog.split() + self._raw_arguments 301 # Check if the command is from an extension. If yes, try to fix by installing the extension, then exit. 302 # The command will be rerun in another process. 303 use_dynamic_install = try_install_extension(self, args) 304 # parser has no `command_source`, value is part of command itself 305 error_msg = "'{value}' is misspelled or not recognized by the system.".format(value=value) 306 az_error = CommandNotFoundError(error_msg) 307 candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7) 308 if candidates: 309 # use the most likely candidate to replace the misspelled command 310 args_inferred = [item if item != value else candidates[0] for item in args] 311 command_name_inferred = ' '.join(args_inferred).split('-')[0] 312 else: 313 # `command_source` indicates command values have been parsed, value is an argument 314 parameter = action.option_strings[0] if action.option_strings else action.dest 315 error_msg = "{prog}: '{value}' is not a valid value for '{param}'. Allowed values: {choices}.".format( 316 prog=self.prog, value=value, param=parameter, choices=', '.join([str(x) for x in action.choices])) 317 az_error = InvalidArgumentValueError(error_msg) 318 candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7) 319 320 command_arguments = self._get_failure_recovery_arguments(action) 321 if candidates: 322 az_error.set_recommendation("Did you mean '{}' ?".format(candidates[0])) 323 324 # recommend a command for user 325 recommender = CommandRecommender(*command_arguments, error_msg, cli_ctx) 326 recommender.set_help_examples(self.get_examples(command_name_inferred)) 327 recommendations = recommender.provide_recommendations() 328 if recommendations: 329 az_error.set_aladdin_recommendation(recommendations) 330 331 # remind user to check extensions if we can not find a command to recommend 332 if isinstance(az_error, CommandNotFoundError) \ 333 and not az_error.recommendations and self.prog == 'az' \ 334 and use_dynamic_install == 'no': 335 az_error.set_recommendation(EXTENSION_REFERENCE) 336 337 az_error.print_error() 338 az_error.send_telemetry() 339 340 self.exit(2) 341