1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5
6import difflib
7
8import argparse
9import argcomplete
10
11import azure.cli.core.telemetry as telemetry
12from azure.cli.core.extension import get_extension
13from azure.cli.core.commands import ExtensionCommandSource
14from azure.cli.core.commands import AzCliCommandInvoker
15from azure.cli.core.commands.events import EVENT_INVOKER_ON_TAB_COMPLETION
16from azure.cli.core.command_recommender import CommandRecommender
17from azure.cli.core.azclierror import UnrecognizedArgumentError
18from azure.cli.core.azclierror import RequiredArgumentMissingError
19from azure.cli.core.azclierror import InvalidArgumentValueError
20from azure.cli.core.azclierror import ArgumentUsageError
21from azure.cli.core.azclierror import CommandNotFoundError
22from azure.cli.core.azclierror import ValidationError
23
24from knack.log import get_logger
25from knack.parser import CLICommandParser
26from knack.util import CLIError
27
28logger = get_logger(__name__)
29
30EXTENSION_REFERENCE = ("If the command is from an extension, "
31                       "please make sure the corresponding extension is installed. "
32                       "To learn more about extensions, please visit "
33                       "'https://docs.microsoft.com/cli/azure/azure-cli-extensions-overview'")
34
35OVERVIEW_REFERENCE = ("https://aka.ms/cli_ref")
36
37
38class IncorrectUsageError(CLIError):
39    '''Raised when a command is incorrectly used and the usage should be
40    displayed to the user.
41    '''
42    pass  # pylint: disable=unnecessary-pass
43
44
45class AzCompletionFinder(argcomplete.CompletionFinder):
46
47    def _get_completions(self, comp_words, cword_prefix, cword_prequote, last_wordbreak_pos):
48        external_completions = []
49        self._parser.cli_ctx.raise_event(EVENT_INVOKER_ON_TAB_COMPLETION,
50                                         external_completions=external_completions,
51                                         parser=self._parser,
52                                         comp_words=comp_words,
53                                         cword_prefix=cword_prefix,
54                                         cword_prequote=cword_prequote,
55                                         last_wordbreak_pos=last_wordbreak_pos)
56
57        return external_completions + super(AzCompletionFinder, self)._get_completions(comp_words,
58                                                                                       cword_prefix,
59                                                                                       cword_prequote,
60                                                                                       last_wordbreak_pos)
61
62
63class AzCliCommandParser(CLICommandParser):
64    """ArgumentParser implementation specialized for the Azure CLI utility."""
65
66    def __init__(self, cli_ctx=None, cli_help=None, **kwargs):
67        self.command_source = kwargs.pop('_command_source', None)
68        self._raw_arguments = None
69        self._namespace = None
70        self._suggestion_msg = []
71        self.subparser_map = {}
72        self.specified_arguments = []
73        super(AzCliCommandParser, self).__init__(cli_ctx, cli_help=cli_help, **kwargs)
74
75    def load_command_table(self, command_loader):
76        """Load a command table into our parser."""
77        # If we haven't already added a subparser, we
78        # better do it.
79        cmd_tbl = command_loader.command_table
80        grp_tbl = command_loader.command_group_table
81        if not self.subparsers:
82            sp = self.add_subparsers(dest='_command_package')
83            sp.required = True
84            self.subparsers = {(): sp}
85
86        for command_name, metadata in cmd_tbl.items():
87            subparser = self._get_subparser(command_name.split(), grp_tbl)
88            deprecate_info = metadata.deprecate_info
89            if not subparser or (deprecate_info and deprecate_info.expired()):
90                continue
91
92            command_verb = command_name.split()[-1]
93            # To work around http://bugs.python.org/issue9253, we artificially add any new
94            # parsers we add to the "choices" section of the subparser.
95            subparser.choices[command_verb] = command_verb
96
97            # inject command_module designer's help formatter -- default is HelpFormatter
98            fc = metadata.formatter_class or argparse.HelpFormatter
99
100            command_parser = subparser.add_parser(command_verb,
101                                                  description=metadata.description,
102                                                  parents=self.parents,
103                                                  conflict_handler='error',
104                                                  help_file=metadata.help,
105                                                  formatter_class=fc,
106                                                  cli_help=self.cli_help,
107                                                  _command_source=metadata.command_source)
108            self.subparser_map[command_name] = command_parser
109            command_parser.cli_ctx = self.cli_ctx
110            command_validator = metadata.validator
111            argument_validators = []
112            argument_groups = {}
113            for _, arg in metadata.arguments.items():
114                # don't add deprecated arguments to the parser
115                deprecate_info = arg.type.settings.get('deprecate_info', None)
116                if deprecate_info and deprecate_info.expired():
117                    continue
118
119                if arg.validator:
120                    argument_validators.append(arg.validator)
121                try:
122                    if arg.arg_group:
123                        try:
124                            group = argument_groups[arg.arg_group]
125                        except KeyError:
126                            # group not found so create
127                            group_name = '{} Arguments'.format(arg.arg_group)
128                            group = command_parser.add_argument_group(arg.arg_group, group_name)
129                            argument_groups[arg.arg_group] = group
130                        param = AzCliCommandParser._add_argument(group, arg)
131                    else:
132                        param = AzCliCommandParser._add_argument(command_parser, arg)
133                except argparse.ArgumentError as ex:
134                    raise CLIError("command authoring error for '{}': '{}' {}".format(
135                        command_name, ex.args[0].dest, ex.message))  # pylint: disable=no-member
136                param.completer = arg.completer
137                param.deprecate_info = arg.deprecate_info
138                param.preview_info = arg.preview_info
139                param.experimental_info = arg.experimental_info
140                param.default_value_source = arg.default_value_source
141            command_parser.set_defaults(
142                func=metadata,
143                command=command_name,
144                _cmd=metadata,
145                _command_validator=command_validator,
146                _argument_validators=argument_validators,
147                _parser=command_parser)
148
149    def validation_error(self, message):
150        az_error = ValidationError(message)
151        az_error.print_error()
152        az_error.send_telemetry()
153        self.exit(2)
154
155    def error(self, message):
156        # Get a recommended command from the CommandRecommender
157        command_arguments = self._get_failure_recovery_arguments()
158        cli_ctx = self.cli_ctx or (self.cli_help.cli_ctx if self.cli_help else None)
159        recommender = CommandRecommender(*command_arguments, message, cli_ctx)
160        recommender.set_help_examples(self.get_examples(self.prog))
161        recommendations = recommender.provide_recommendations()
162
163        az_error = ArgumentUsageError(message)
164        if 'unrecognized arguments' in message:
165            az_error = UnrecognizedArgumentError(message)
166        elif 'arguments are required' in message:
167            az_error = RequiredArgumentMissingError(message)
168        elif 'invalid' in message:
169            az_error = InvalidArgumentValueError(message)
170
171        if '--query' in message:
172            from azure.cli.core.util import QUERY_REFERENCE
173            az_error.set_recommendation(QUERY_REFERENCE)
174        elif recommendations:
175            az_error.set_aladdin_recommendation(recommendations)
176        az_error.print_error()
177        az_error.send_telemetry()
178        self.exit(2)
179
180    def format_help(self):
181        extension_version = None
182        extension_name = None
183        try:
184            if isinstance(self.command_source, ExtensionCommandSource):
185                extension_name = self.command_source.extension_name
186                extension_version = get_extension(self.command_source.extension_name).version
187        except Exception:  # pylint: disable=broad-except
188            pass
189
190        telemetry.set_command_details(
191            command=self.prog[3:],
192            extension_name=extension_name,
193            extension_version=extension_version)
194        telemetry.set_success(summary='show help')
195        super(AzCliCommandParser, self).format_help()
196
197    def get_examples(self, command):
198        if not self.cli_help:
199            return []
200        is_group = self.is_group()
201        return self.cli_help.get_examples(command,
202                                          self._actions[-1] if is_group else self,
203                                          is_group)
204
205    def enable_autocomplete(self):
206        argcomplete.autocomplete = AzCompletionFinder()
207        argcomplete.autocomplete(self, validator=lambda c, p: c.lower().startswith(p.lower()),
208                                 default_completer=lambda *args, **kwargs: ())
209
210    def _get_failure_recovery_arguments(self, action=None):
211        # Strip the leading "az " and any extraneous whitespace.
212        command = self.prog[3:].strip()
213        parameters = []
214        parameter_set = set()
215        raw_arguments = None
216        extension = None
217
218        # Extract only parameter names to ensure GPDR compliance
219        def extract_safe_params(parameters):
220            return AzCliCommandInvoker._extract_parameter_names(parameters)  # pylint: disable=protected-access
221
222        # Check for extension name attribute
223        def has_extension_name(command_source):
224            is_extension_command_source = isinstance(command_source, ExtensionCommandSource)
225            has_extension_name = False
226
227            if is_extension_command_source:
228                has_extension_name = hasattr(command_source, 'extension_name')
229
230            return is_extension_command_source and has_extension_name
231
232        # If the arguments have been processed into a namespace...
233        if self._namespace:
234            # Select the parsed command.
235            if hasattr(self._namespace, 'command'):
236                command = self._namespace.command
237        # Parse parameter names from user input.
238        if self._raw_arguments:
239            raw_arguments = self._raw_arguments
240            parameters = extract_safe_params(self._raw_arguments)
241
242        for parameter in parameters:
243            parameter_set.add(parameter)
244
245        # If we can retrieve the extension from the current parser's command source...
246        if has_extension_name(self.command_source):
247            extension = self.command_source.extension_name
248        # Otherwise, the command may have not been in a command group. The command source will not be
249        # set in this case.
250        elif action and action.dest in ('_subcommand', '_command_package'):
251            # Get all parsers in the set of possible actions.
252            parsers = list(action.choices.values())
253            parser = parsers[0] if parsers else None
254            # If the first parser comes from an extension...
255            if parser and has_extension_name(parser.command_source):
256                # We're looking for a subcommand under an extension command group. Set the
257                # extension to reflect this.
258                extension = parser.command_source.extension_name
259            # Extend the command if the first raw argument is not a parameter.
260            if raw_arguments and raw_arguments[0] not in parameter_set:
261                command = '{cmd} {arg}'.format(cmd=command, arg=raw_arguments[0])
262        # Otherwise, only set the extension if every subparser comes from an extension. This occurs
263        # when an unrecognized argument is passed to a command from an extension.
264        elif isinstance(self.subparser_map, dict):
265            for _, subparser in self.subparser_map.items():
266                if isinstance(subparser.command_source, ExtensionCommandSource):
267                    extension = subparser.command_source.extension_name
268                else:
269                    extension = None
270                    break
271
272        return command, self._raw_arguments, extension
273
274    def _get_values(self, action, arg_strings):
275        value = super(AzCliCommandParser, self)._get_values(action, arg_strings)
276        if action.dest and isinstance(action.dest, str) and not action.dest.startswith('_'):
277            self.specified_arguments.append(action.dest)
278        return value
279
280    def parse_known_args(self, args=None, namespace=None):
281        # retrieve the raw argument list in case parsing known arguments fails.
282        self._raw_arguments = args
283        # if parsing known arguments succeeds, get the command namespace and the argument list
284        self._namespace, self._raw_arguments = super().parse_known_args(args=args, namespace=namespace)
285        return self._namespace, self._raw_arguments
286
287    def _check_value(self, action, value):
288        # Override to customize the error message when a argument is not among the available choices
289        # converted value must be one of the choices (if specified)
290        if action.choices is not None and value not in action.choices:  # pylint: disable=too-many-nested-blocks
291            # self.cli_ctx is None when self.prog is beyond 'az', such as 'az iot'.
292            # use cli_ctx from cli_help which is not lost.
293            cli_ctx = self.cli_ctx or (self.cli_help.cli_ctx if self.cli_help else None)
294
295            command_name_inferred = self.prog
296            use_dynamic_install = 'no'
297            if not self.command_source:
298                from azure.cli.core.extension.dynamic_install import try_install_extension
299                candidates = []
300                args = self.prog.split() + self._raw_arguments
301                # Check if the command is from an extension. If yes, try to fix by installing the extension, then exit.
302                # The command will be rerun in another process.
303                use_dynamic_install = try_install_extension(self, args)
304                # parser has no `command_source`, value is part of command itself
305                error_msg = "'{value}' is misspelled or not recognized by the system.".format(value=value)
306                az_error = CommandNotFoundError(error_msg)
307                candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7)
308                if candidates:
309                    # use the most likely candidate to replace the misspelled command
310                    args_inferred = [item if item != value else candidates[0] for item in args]
311                    command_name_inferred = ' '.join(args_inferred).split('-')[0]
312            else:
313                # `command_source` indicates command values have been parsed, value is an argument
314                parameter = action.option_strings[0] if action.option_strings else action.dest
315                error_msg = "{prog}: '{value}' is not a valid value for '{param}'. Allowed values: {choices}.".format(
316                    prog=self.prog, value=value, param=parameter, choices=', '.join([str(x) for x in action.choices]))
317                az_error = InvalidArgumentValueError(error_msg)
318                candidates = difflib.get_close_matches(value, action.choices, cutoff=0.7)
319
320            command_arguments = self._get_failure_recovery_arguments(action)
321            if candidates:
322                az_error.set_recommendation("Did you mean '{}' ?".format(candidates[0]))
323
324            # recommend a command for user
325            recommender = CommandRecommender(*command_arguments, error_msg, cli_ctx)
326            recommender.set_help_examples(self.get_examples(command_name_inferred))
327            recommendations = recommender.provide_recommendations()
328            if recommendations:
329                az_error.set_aladdin_recommendation(recommendations)
330
331            # remind user to check extensions if we can not find a command to recommend
332            if isinstance(az_error, CommandNotFoundError) \
333                    and not az_error.recommendations and self.prog == 'az' \
334                    and use_dynamic_install == 'no':
335                az_error.set_recommendation(EXTENSION_REFERENCE)
336
337            az_error.print_error()
338            az_error.send_telemetry()
339
340            self.exit(2)
341