1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6import sys 7 8from collections import defaultdict 9 10from .deprecation import ImplicitDeprecated, resolve_deprecate_info 11from .preview import ImplicitPreviewItem, resolve_preview_info 12from .experimental import ImplicitExperimentalItem, resolve_experimental_info 13from .util import CLIError, CtxTypeError, CommandResultItem, todict 14from .parser import CLICommandParser 15from .commands import CLICommandsLoader 16from .events import (EVENT_INVOKER_PRE_CMD_TBL_CREATE, EVENT_INVOKER_POST_CMD_TBL_CREATE, 17 EVENT_INVOKER_CMD_TBL_LOADED, EVENT_INVOKER_PRE_PARSE_ARGS, 18 EVENT_INVOKER_POST_PARSE_ARGS, EVENT_INVOKER_TRANSFORM_RESULT, 19 EVENT_INVOKER_FILTER_RESULT) 20from .help import CLIHelp 21 22 23class CommandInvoker(object): 24 25 def __init__(self, 26 cli_ctx=None, 27 parser_cls=CLICommandParser, 28 commands_loader_cls=CLICommandsLoader, 29 help_cls=CLIHelp, 30 initial_data=None): 31 """ Manages a single invocation of the CLI (i.e. running a command) 32 33 :param cli_ctx: CLI Context 34 :type cli_ctx: knack.cli.CLI 35 :param parser_cls: A class to handle command parsing 36 :type parser_cls: knack.parser.CLICommandParser 37 :param commands_loader_cls: A class to handle loading commands 38 :type commands_loader_cls: knack.commands.CLICommandsLoader 39 :param help_cls: A class to handle help 40 :type help_cls: knack.help.CLIHelp 41 :param initial_data: The initial in-memory collection for this command invocation 42 :type initial_data: dict 43 """ 44 from .cli import CLI 45 if cli_ctx is not None and not isinstance(cli_ctx, CLI): 46 raise CtxTypeError(cli_ctx) 47 self.cli_ctx = cli_ctx 48 # In memory collection of key-value data for this current invocation This does not persist between invocations. 49 self.data = initial_data or defaultdict(lambda: None) 50 self.data['command'] = 'unknown' 51 self._global_parser = parser_cls.create_global_parser(cli_ctx=self.cli_ctx) 52 self.help = help_cls(cli_ctx=self.cli_ctx) 53 self.parser = parser_cls(cli_ctx=self.cli_ctx, cli_help=self.help, 54 prog=self.cli_ctx.name, parents=[self._global_parser]) 55 self.commands_loader = commands_loader_cls(cli_ctx=self.cli_ctx) 56 57 def _filter_params(self, args): # pylint: disable=no-self-use 58 # Consider - we are using any args that start with an underscore (_) as 'private' 59 # arguments and remove them from the arguments that we pass to the actual function. 60 params = {key: value 61 for key, value in args.__dict__.items() 62 if not key.startswith('_')} 63 params.pop('func', None) 64 params.pop('command', None) 65 return params 66 67 def _rudimentary_get_command(self, args): 68 """ Rudimentary parsing to get the command """ 69 nouns = [] 70 command_names = self.commands_loader.command_table.keys() 71 for arg in args: 72 if arg and arg[0] != '-': 73 nouns.append(arg) 74 else: 75 break 76 77 def _find_args(args): 78 search = ' '.join(args).lower() 79 return next((x for x in command_names if x.startswith(search)), False) 80 81 # since the command name may be immediately followed by a positional arg, strip those off 82 while nouns and not _find_args(nouns): 83 del nouns[-1] 84 85 # ensure the command string is case-insensitive 86 for i in range(len(nouns)): 87 args[i] = args[i].lower() 88 89 return ' '.join(nouns) 90 91 def _validate_cmd_level(self, ns, cmd_validator): # pylint: disable=no-self-use 92 if cmd_validator: 93 cmd_validator(ns) 94 try: 95 delattr(ns, '_command_validator') 96 except AttributeError: 97 pass 98 99 def _validate_arg_level(self, ns, **_): # pylint: disable=no-self-use 100 for validator in getattr(ns, '_argument_validators', []): 101 validator(ns) 102 try: 103 delattr(ns, '_argument_validators') 104 except AttributeError: 105 pass 106 107 def _validation(self, parsed_ns): 108 try: 109 cmd_validator = getattr(parsed_ns, '_command_validator', None) 110 if cmd_validator: 111 self._validate_cmd_level(parsed_ns, cmd_validator) 112 else: 113 self._validate_arg_level(parsed_ns) 114 except CLIError: 115 raise 116 except Exception: # pylint: disable=broad-except 117 err = sys.exc_info()[1] 118 getattr(parsed_ns, '_parser', self.parser).validation_error(str(err)) 119 120 # pylint: disable=too-many-statements 121 def execute(self, args): 122 """ Executes the command invocation 123 124 :param args: The command arguments for this invocation 125 :type args: list 126 :return: The command result 127 :rtype: knack.util.CommandResultItem 128 """ 129 130 self.cli_ctx.raise_event(EVENT_INVOKER_PRE_CMD_TBL_CREATE, args=args) 131 cmd_tbl = self.commands_loader.load_command_table(args) 132 command = self._rudimentary_get_command(args) 133 self.cli_ctx.invocation.data['command_string'] = command 134 self.commands_loader.load_arguments(command) 135 136 self.cli_ctx.raise_event(EVENT_INVOKER_POST_CMD_TBL_CREATE, cmd_tbl=cmd_tbl) 137 self.parser.load_command_table(self.commands_loader) 138 self.cli_ctx.raise_event(EVENT_INVOKER_CMD_TBL_LOADED, parser=self.parser) 139 140 arg_check = [a for a in args if a not in ['--verbose', '--debug', '--only-show-warnings']] 141 if not arg_check: 142 self.cli_ctx.completion.enable_autocomplete(self.parser) 143 subparser = self.parser.subparsers[tuple()] 144 self.help.show_welcome(subparser) 145 return CommandResultItem(None, exit_code=0) 146 147 if args[0].lower() == 'help': 148 args[0] = '--help' 149 150 self.cli_ctx.completion.enable_autocomplete(self.parser) 151 152 self.cli_ctx.raise_event(EVENT_INVOKER_PRE_PARSE_ARGS, args=args) 153 parsed_args = self.parser.parse_args(args) 154 self.cli_ctx.raise_event(EVENT_INVOKER_POST_PARSE_ARGS, command=parsed_args.command, args=parsed_args) 155 156 self._validation(parsed_args) 157 158 # save the command name (leaf in the tree) 159 self.data['command'] = parsed_args.command 160 cmd = parsed_args.func 161 if hasattr(parsed_args, 'cmd'): 162 parsed_args.cmd = cmd 163 deprecations = getattr(parsed_args, '_argument_deprecations', []) 164 if cmd.deprecate_info: 165 deprecations.append(cmd.deprecate_info) 166 167 previews = getattr(parsed_args, '_argument_previews', []) 168 if cmd.preview_info: 169 previews.append(cmd.preview_info) 170 171 experimentals = getattr(parsed_args, '_argument_experimentals', []) 172 if cmd.experimental_info: 173 experimentals.append(cmd.experimental_info) 174 175 params = self._filter_params(parsed_args) 176 177 # search for implicit deprecation 178 path_comps = cmd.name.split()[:-1] 179 implicit_deprecate_info = None 180 while path_comps and not implicit_deprecate_info: 181 implicit_deprecate_info = resolve_deprecate_info(self.cli_ctx, ' '.join(path_comps)) 182 del path_comps[-1] 183 184 if implicit_deprecate_info: 185 deprecate_kwargs = implicit_deprecate_info.__dict__.copy() 186 deprecate_kwargs['object_type'] = 'command' 187 del deprecate_kwargs['_get_tag'] 188 del deprecate_kwargs['_get_message'] 189 deprecations.append(ImplicitDeprecated(cli_ctx=self.cli_ctx, **deprecate_kwargs)) 190 191 # search for implicit preview 192 path_comps = cmd.name.split()[:-1] 193 implicit_preview_info = None 194 while path_comps and not implicit_preview_info: 195 implicit_preview_info = resolve_preview_info(self.cli_ctx, ' '.join(path_comps)) 196 del path_comps[-1] 197 198 if implicit_preview_info: 199 preview_kwargs = implicit_preview_info.__dict__.copy() 200 preview_kwargs['object_type'] = 'command' 201 previews.append(ImplicitPreviewItem(cli_ctx=self.cli_ctx, **preview_kwargs)) 202 203 # search for implicit experimental 204 path_comps = cmd.name.split()[:-1] 205 implicit_experimental_info = None 206 while path_comps and not implicit_experimental_info: 207 implicit_experimental_info = resolve_experimental_info(self.cli_ctx, ' '.join(path_comps)) 208 del path_comps[-1] 209 210 if implicit_experimental_info: 211 experimental_kwargs = implicit_experimental_info.__dict__.copy() 212 experimental_kwargs['object_type'] = 'command' 213 experimentals.append(ImplicitExperimentalItem(cli_ctx=self.cli_ctx, **experimental_kwargs)) 214 215 if not self.cli_ctx.only_show_errors: 216 for d in deprecations: 217 print(d.message, file=sys.stderr) 218 for p in previews: 219 print(p.message, file=sys.stderr) 220 for p in experimentals: 221 print(p.message, file=sys.stderr) 222 223 cmd_result = parsed_args.func(params) 224 cmd_result = todict(cmd_result) 225 226 event_data = {'result': cmd_result} 227 self.cli_ctx.raise_event(EVENT_INVOKER_TRANSFORM_RESULT, event_data=event_data) 228 self.cli_ctx.raise_event(EVENT_INVOKER_FILTER_RESULT, event_data=event_data) 229 230 return CommandResultItem(event_data['result'], 231 exit_code=0, 232 table_transformer=cmd_tbl[parsed_args.command].table_transformer, 233 is_query_active=self.data['query_active'], 234 raw_result=cmd_result) 235