1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5
6import sys
7
8from collections import defaultdict
9
10from .deprecation import ImplicitDeprecated, resolve_deprecate_info
11from .preview import ImplicitPreviewItem, resolve_preview_info
12from .experimental import ImplicitExperimentalItem, resolve_experimental_info
13from .util import CLIError, CtxTypeError, CommandResultItem, todict
14from .parser import CLICommandParser
15from .commands import CLICommandsLoader
16from .events import (EVENT_INVOKER_PRE_CMD_TBL_CREATE, EVENT_INVOKER_POST_CMD_TBL_CREATE,
17                     EVENT_INVOKER_CMD_TBL_LOADED, EVENT_INVOKER_PRE_PARSE_ARGS,
18                     EVENT_INVOKER_POST_PARSE_ARGS, EVENT_INVOKER_TRANSFORM_RESULT,
19                     EVENT_INVOKER_FILTER_RESULT)
20from .help import CLIHelp
21
22
23class CommandInvoker(object):
24
25    def __init__(self,
26                 cli_ctx=None,
27                 parser_cls=CLICommandParser,
28                 commands_loader_cls=CLICommandsLoader,
29                 help_cls=CLIHelp,
30                 initial_data=None):
31        """ Manages a single invocation of the CLI (i.e. running a command)
32
33        :param cli_ctx: CLI Context
34        :type cli_ctx: knack.cli.CLI
35        :param parser_cls: A class to handle command parsing
36        :type parser_cls: knack.parser.CLICommandParser
37        :param commands_loader_cls: A class to handle loading commands
38        :type commands_loader_cls: knack.commands.CLICommandsLoader
39        :param help_cls: A class to handle help
40        :type help_cls: knack.help.CLIHelp
41        :param initial_data: The initial in-memory collection for this command invocation
42        :type initial_data: dict
43        """
44        from .cli import CLI
45        if cli_ctx is not None and not isinstance(cli_ctx, CLI):
46            raise CtxTypeError(cli_ctx)
47        self.cli_ctx = cli_ctx
48        # In memory collection of key-value data for this current invocation This does not persist between invocations.
49        self.data = initial_data or defaultdict(lambda: None)
50        self.data['command'] = 'unknown'
51        self._global_parser = parser_cls.create_global_parser(cli_ctx=self.cli_ctx)
52        self.help = help_cls(cli_ctx=self.cli_ctx)
53        self.parser = parser_cls(cli_ctx=self.cli_ctx, cli_help=self.help,
54                                 prog=self.cli_ctx.name, parents=[self._global_parser])
55        self.commands_loader = commands_loader_cls(cli_ctx=self.cli_ctx)
56
57    def _filter_params(self, args):  # pylint: disable=no-self-use
58        # Consider - we are using any args that start with an underscore (_) as 'private'
59        # arguments and remove them from the arguments that we pass to the actual function.
60        params = {key: value
61                  for key, value in args.__dict__.items()
62                  if not key.startswith('_')}
63        params.pop('func', None)
64        params.pop('command', None)
65        return params
66
67    def _rudimentary_get_command(self, args):
68        """ Rudimentary parsing to get the command """
69        nouns = []
70        command_names = self.commands_loader.command_table.keys()
71        for arg in args:
72            if arg and arg[0] != '-':
73                nouns.append(arg)
74            else:
75                break
76
77        def _find_args(args):
78            search = ' '.join(args).lower()
79            return next((x for x in command_names if x.startswith(search)), False)
80
81        # since the command name may be immediately followed by a positional arg, strip those off
82        while nouns and not _find_args(nouns):
83            del nouns[-1]
84
85        # ensure the command string is case-insensitive
86        for i in range(len(nouns)):
87            args[i] = args[i].lower()
88
89        return ' '.join(nouns)
90
91    def _validate_cmd_level(self, ns, cmd_validator):  # pylint: disable=no-self-use
92        if cmd_validator:
93            cmd_validator(ns)
94        try:
95            delattr(ns, '_command_validator')
96        except AttributeError:
97            pass
98
99    def _validate_arg_level(self, ns, **_):  # pylint: disable=no-self-use
100        for validator in getattr(ns, '_argument_validators', []):
101            validator(ns)
102        try:
103            delattr(ns, '_argument_validators')
104        except AttributeError:
105            pass
106
107    def _validation(self, parsed_ns):
108        try:
109            cmd_validator = getattr(parsed_ns, '_command_validator', None)
110            if cmd_validator:
111                self._validate_cmd_level(parsed_ns, cmd_validator)
112            else:
113                self._validate_arg_level(parsed_ns)
114        except CLIError:
115            raise
116        except Exception:  # pylint: disable=broad-except
117            err = sys.exc_info()[1]
118            getattr(parsed_ns, '_parser', self.parser).validation_error(str(err))
119
120    # pylint: disable=too-many-statements
121    def execute(self, args):
122        """ Executes the command invocation
123
124        :param args: The command arguments for this invocation
125        :type args: list
126        :return: The command result
127        :rtype: knack.util.CommandResultItem
128        """
129
130        self.cli_ctx.raise_event(EVENT_INVOKER_PRE_CMD_TBL_CREATE, args=args)
131        cmd_tbl = self.commands_loader.load_command_table(args)
132        command = self._rudimentary_get_command(args)
133        self.cli_ctx.invocation.data['command_string'] = command
134        self.commands_loader.load_arguments(command)
135
136        self.cli_ctx.raise_event(EVENT_INVOKER_POST_CMD_TBL_CREATE, cmd_tbl=cmd_tbl)
137        self.parser.load_command_table(self.commands_loader)
138        self.cli_ctx.raise_event(EVENT_INVOKER_CMD_TBL_LOADED, parser=self.parser)
139
140        arg_check = [a for a in args if a not in ['--verbose', '--debug', '--only-show-warnings']]
141        if not arg_check:
142            self.cli_ctx.completion.enable_autocomplete(self.parser)
143            subparser = self.parser.subparsers[tuple()]
144            self.help.show_welcome(subparser)
145            return CommandResultItem(None, exit_code=0)
146
147        if args[0].lower() == 'help':
148            args[0] = '--help'
149
150        self.cli_ctx.completion.enable_autocomplete(self.parser)
151
152        self.cli_ctx.raise_event(EVENT_INVOKER_PRE_PARSE_ARGS, args=args)
153        parsed_args = self.parser.parse_args(args)
154        self.cli_ctx.raise_event(EVENT_INVOKER_POST_PARSE_ARGS, command=parsed_args.command, args=parsed_args)
155
156        self._validation(parsed_args)
157
158        # save the command name (leaf in the tree)
159        self.data['command'] = parsed_args.command
160        cmd = parsed_args.func
161        if hasattr(parsed_args, 'cmd'):
162            parsed_args.cmd = cmd
163        deprecations = getattr(parsed_args, '_argument_deprecations', [])
164        if cmd.deprecate_info:
165            deprecations.append(cmd.deprecate_info)
166
167        previews = getattr(parsed_args, '_argument_previews', [])
168        if cmd.preview_info:
169            previews.append(cmd.preview_info)
170
171        experimentals = getattr(parsed_args, '_argument_experimentals', [])
172        if cmd.experimental_info:
173            experimentals.append(cmd.experimental_info)
174
175        params = self._filter_params(parsed_args)
176
177        # search for implicit deprecation
178        path_comps = cmd.name.split()[:-1]
179        implicit_deprecate_info = None
180        while path_comps and not implicit_deprecate_info:
181            implicit_deprecate_info = resolve_deprecate_info(self.cli_ctx, ' '.join(path_comps))
182            del path_comps[-1]
183
184        if implicit_deprecate_info:
185            deprecate_kwargs = implicit_deprecate_info.__dict__.copy()
186            deprecate_kwargs['object_type'] = 'command'
187            del deprecate_kwargs['_get_tag']
188            del deprecate_kwargs['_get_message']
189            deprecations.append(ImplicitDeprecated(cli_ctx=self.cli_ctx, **deprecate_kwargs))
190
191        # search for implicit preview
192        path_comps = cmd.name.split()[:-1]
193        implicit_preview_info = None
194        while path_comps and not implicit_preview_info:
195            implicit_preview_info = resolve_preview_info(self.cli_ctx, ' '.join(path_comps))
196            del path_comps[-1]
197
198        if implicit_preview_info:
199            preview_kwargs = implicit_preview_info.__dict__.copy()
200            preview_kwargs['object_type'] = 'command'
201            previews.append(ImplicitPreviewItem(cli_ctx=self.cli_ctx, **preview_kwargs))
202
203        # search for implicit experimental
204        path_comps = cmd.name.split()[:-1]
205        implicit_experimental_info = None
206        while path_comps and not implicit_experimental_info:
207            implicit_experimental_info = resolve_experimental_info(self.cli_ctx, ' '.join(path_comps))
208            del path_comps[-1]
209
210        if implicit_experimental_info:
211            experimental_kwargs = implicit_experimental_info.__dict__.copy()
212            experimental_kwargs['object_type'] = 'command'
213            experimentals.append(ImplicitExperimentalItem(cli_ctx=self.cli_ctx, **experimental_kwargs))
214
215        if not self.cli_ctx.only_show_errors:
216            for d in deprecations:
217                print(d.message, file=sys.stderr)
218            for p in previews:
219                print(p.message, file=sys.stderr)
220            for p in experimentals:
221                print(p.message, file=sys.stderr)
222
223        cmd_result = parsed_args.func(params)
224        cmd_result = todict(cmd_result)
225
226        event_data = {'result': cmd_result}
227        self.cli_ctx.raise_event(EVENT_INVOKER_TRANSFORM_RESULT, event_data=event_data)
228        self.cli_ctx.raise_event(EVENT_INVOKER_FILTER_RESULT, event_data=event_data)
229
230        return CommandResultItem(event_data['result'],
231                                 exit_code=0,
232                                 table_transformer=cmd_tbl[parsed_args.command].table_transformer,
233                                 is_query_active=self.data['query_active'],
234                                 raw_result=cmd_result)
235