1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6import types 7from collections import OrderedDict, defaultdict 8from importlib import import_module 9 10from .deprecation import Deprecated 11from .preview import PreviewItem 12from .experimental import ExperimentalItem 13from .prompting import prompt_y_n, NoTTYException 14from .util import CLIError, CtxTypeError 15from .arguments import ArgumentRegistry, CLICommandArgument 16from .introspection import extract_args_from_signature, extract_full_summary_from_signature 17from .events import (EVENT_CMDLOADER_LOAD_COMMAND_TABLE, EVENT_CMDLOADER_LOAD_ARGUMENTS, 18 EVENT_COMMAND_CANCELLED) 19from .log import get_logger 20from .validators import DefaultInt, DefaultStr 21 22logger = get_logger(__name__) 23 24 25PREVIEW_EXPERIMENTAL_CONFLICT_ERROR = "Failed to register {} '{}', " \ 26 "is_preview and is_experimental can't be true at the same time" 27 28 29class CLICommand(object): # pylint:disable=too-many-instance-attributes 30 31 # pylint: disable=unused-argument 32 def __init__(self, cli_ctx, name, handler, description=None, table_transformer=None, 33 arguments_loader=None, description_loader=None, 34 formatter_class=None, deprecate_info=None, validator=None, confirmation=None, preview_info=None, 35 experimental_info=None, **kwargs): 36 """ The command object that goes into the command table. 37 38 :param cli_ctx: CLI Context 39 :type cli_ctx: knack.cli.CLI 40 :param name: The name of the command (e.g. 'mygroup mycommand') 41 :type name: str 42 :param handler: The function that will handle this command 43 :type handler: function 44 :param description: The description for the command 45 :type description: str 46 :param table_transformer: A function that transforms the command output for displaying in a table 47 :type table_transformer: function 48 :param arguments_loader: The function that defines how the arguments for the command should be loaded 49 :type arguments_loader: function 50 :param description_loader: The function that defines how the description for the command should be loaded 51 :type description_loader: function 52 :param formatter_class: The formatter for how help should be displayed 53 :type formatter_class: class 54 :param deprecate_info: Deprecation message to display when this command is invoked 55 :type deprecate_info: str 56 :param preview_info: Indicates a command is in preview 57 :type preview_info: bool 58 :param experimental_info: Indicates a command is experimental 59 :type experimental_info: bool 60 :param validator: The command validator 61 :param confirmation: User confirmation required for command 62 :type confirmation: bool, str, callable 63 :param kwargs: Extra kwargs that are currently ignored 64 """ 65 from .cli import CLI 66 if cli_ctx is not None and not isinstance(cli_ctx, CLI): 67 raise CtxTypeError(cli_ctx) 68 self.cli_ctx = cli_ctx 69 self.name = name 70 self.handler = handler 71 self.help = None 72 self.description = description_loader if description_loader and self.should_load_description() else description 73 self.arguments = {} 74 self.arguments_loader = arguments_loader 75 self.table_transformer = table_transformer 76 self.formatter_class = formatter_class 77 self.deprecate_info = deprecate_info 78 self.preview_info = preview_info 79 self.experimental_info = experimental_info 80 self.confirmation = confirmation 81 self.validator = validator 82 83 def should_load_description(self): 84 return not self.cli_ctx.data['completer_active'] 85 86 def _resolve_default_value_from_config_file(self, arg, overrides): 87 default_key = overrides.settings.get('configured_default', None) 88 if not default_key: 89 return 90 91 defaults_section = self.cli_ctx.config.defaults_section_name 92 use_local_config_original = self.cli_ctx.config.use_local_config 93 self.cli_ctx.config.set_to_use_local_config(True) 94 config_value = self.cli_ctx.config.get(defaults_section, default_key, None) 95 self.cli_ctx.config.set_to_use_local_config(use_local_config_original) 96 if config_value: 97 logger.info("Configured default '%s' for arg %s", config_value, arg.name) 98 overrides.settings['default'] = DefaultStr(config_value) 99 overrides.settings['required'] = False 100 overrides.settings['default_value_source'] = 'Config' 101 102 def load_arguments(self): 103 if self.arguments_loader: 104 cmd_args = self.arguments_loader() 105 if self.confirmation: 106 cmd_args.append(('yes', 107 CLICommandArgument(dest='yes', options_list=['--yes', '-y'], 108 action='store_true', help='Do not prompt for confirmation.'))) 109 self.arguments.update(cmd_args) 110 111 def add_argument(self, param_name, *option_strings, **kwargs): 112 dest = kwargs.pop('dest', None) 113 argument = CLICommandArgument(dest or param_name, options_list=option_strings, **kwargs) 114 self.arguments[param_name] = argument 115 116 def update_argument(self, param_name, argtype): 117 arg = self.arguments[param_name] 118 # resolve defaults from either environment variable or config file 119 self._resolve_default_value_from_config_file(arg, argtype) 120 arg.type.update(other=argtype) 121 arg_default = arg.type.settings.get('default', None) 122 # apply DefaultStr and DefaultInt to allow distinguishing between 123 # when a default was applied or when the user specified a value 124 # that coincides with the default 125 if isinstance(arg_default, str): 126 arg_default = DefaultStr(arg_default) 127 elif isinstance(arg_default, int): 128 arg_default = DefaultInt(arg_default) 129 # update the default 130 if arg_default: 131 arg.type.settings['default'] = arg_default 132 133 def execute(self, **kwargs): 134 return self(**kwargs) 135 136 def __call__(self, *args, **kwargs): 137 138 cmd_args = args[0] 139 140 confirm = self.confirmation and not cmd_args.pop('yes', None) \ 141 and not self.cli_ctx.config.getboolean('core', 'disable_confirm_prompt', fallback=False) 142 143 if confirm and not self._user_confirmed(self.confirmation, cmd_args): 144 self.cli_ctx.raise_event(EVENT_COMMAND_CANCELLED, command=self.name, command_args=cmd_args) 145 raise CLIError('Operation cancelled.') 146 return self.handler(*args, **kwargs) 147 148 @staticmethod 149 def _user_confirmed(confirmation, command_args): 150 if callable(confirmation): 151 return confirmation(command_args) 152 try: 153 if isinstance(confirmation, str): 154 return prompt_y_n(confirmation) 155 return prompt_y_n('Are you sure you want to perform this operation?') 156 except NoTTYException: 157 logger.warning('Unable to prompt for confirmation as no tty available. Use --yes.') 158 return False 159 160 161# pylint: disable=too-many-instance-attributes 162class CLICommandsLoader(object): 163 164 def __init__(self, cli_ctx=None, command_cls=CLICommand, excluded_command_handler_args=None): 165 """ The loader of commands. It contains the command table and argument registries. 166 167 :param cli_ctx: CLI Context 168 :type cli_ctx: knack.cli.CLI 169 :param command_cls: The command type that the command table will be populated with 170 :type command_cls: knack.commands.CLICommand 171 :param excluded_command_handler_args: List of params to ignore and not extract from a commands handler. 172 By default we ignore ['self', 'kwargs']. 173 :type excluded_command_handler_args: list of str 174 """ 175 from .cli import CLI 176 if cli_ctx is not None and not isinstance(cli_ctx, CLI): 177 raise CtxTypeError(cli_ctx) 178 self.cli_ctx = cli_ctx 179 self.command_cls = command_cls 180 self.skip_applicability = False 181 self.excluded_command_handler_args = excluded_command_handler_args 182 # A command table is a dictionary of name -> CLICommand instances 183 self.command_table = dict() 184 # A command group table is a dictionary of names -> CommandGroup instances 185 self.command_group_table = dict() 186 # An argument registry stores all arguments for commands 187 self.argument_registry = ArgumentRegistry() 188 self.extra_argument_registry = defaultdict(lambda: {}) 189 190 def _populate_command_group_table_with_subgroups(self, name): 191 if not name: 192 return 193 194 # ensure all subgroups have some entry in the command group table 195 name_components = name.split() 196 for i, _ in enumerate(name_components): 197 subgroup_name = ' '.join(name_components[:i + 1]) 198 if subgroup_name not in self.command_group_table: 199 self.command_group_table[subgroup_name] = {} 200 201 def load_command_table(self, args): # pylint: disable=unused-argument 202 """ Load commands into the command table 203 204 :param args: List of the arguments from the command line 205 :type args: list 206 :return: The ordered command table 207 :rtype: collections.OrderedDict 208 """ 209 self.cli_ctx.raise_event(EVENT_CMDLOADER_LOAD_COMMAND_TABLE, cmd_tbl=self.command_table) 210 return OrderedDict(self.command_table) 211 212 def load_arguments(self, command): 213 """ Load the arguments for the specified command 214 215 :param command: The command to load arguments for 216 :type command: str 217 """ 218 from knack.arguments import ArgumentsContext 219 220 self.cli_ctx.raise_event(EVENT_CMDLOADER_LOAD_ARGUMENTS, cmd_tbl=self.command_table, command=command) 221 try: 222 self.command_table[command].load_arguments() 223 except KeyError: 224 return 225 226 # ensure global 'cmd' is ignored 227 with ArgumentsContext(self, '') as c: 228 c.ignore('cmd') 229 230 self._apply_parameter_info(command, self.command_table[command]) 231 232 def _apply_parameter_info(self, command_name, command): 233 for argument_name in command.arguments: 234 overrides = self.argument_registry.get_cli_argument(command_name, argument_name) 235 command.update_argument(argument_name, overrides) 236 # Add any arguments explicitly registered for this command 237 for argument_name, argument_definition in self.extra_argument_registry[command_name].items(): 238 command.arguments[argument_name] = argument_definition 239 command.update_argument(argument_name, self.argument_registry.get_cli_argument(command_name, argument_name)) 240 241 def create_command(self, name, operation, **kwargs): 242 """ Constructs the command object that can then be added to the command table """ 243 if not isinstance(operation, str): 244 raise ValueError("Operation must be a string. Got '{}'".format(operation)) 245 246 name = ' '.join(name.split()) 247 248 client_factory = kwargs.get('client_factory', None) 249 250 def _command_handler(command_args): 251 op = CLICommandsLoader._get_op_handler(operation) 252 client = client_factory(command_args) if client_factory else None 253 result = op(client, **command_args) if client else op(**command_args) 254 return result 255 256 def arguments_loader(): 257 return list(extract_args_from_signature(CLICommandsLoader._get_op_handler(operation), 258 excluded_params=self.excluded_command_handler_args)) 259 260 def description_loader(): 261 return extract_full_summary_from_signature(CLICommandsLoader._get_op_handler(operation)) 262 263 kwargs['arguments_loader'] = arguments_loader 264 kwargs['description_loader'] = description_loader 265 266 cmd = self.command_cls(self.cli_ctx, name, _command_handler, **kwargs) 267 return cmd 268 269 @staticmethod 270 def _get_op_handler(operation): 271 """ Import and load the operation handler """ 272 try: 273 mod_to_import, attr_path = operation.split('#') 274 op = import_module(mod_to_import) 275 for part in attr_path.split('.'): 276 op = getattr(op, part) 277 if isinstance(op, types.FunctionType): 278 return op 279 # op as types.MethodType 280 return op.__func__ 281 except (ValueError, AttributeError) as ex: 282 raise ValueError("The operation '{}' is invalid.".format(operation)) from ex 283 284 def deprecate(self, **kwargs): 285 kwargs['object_type'] = 'command group' 286 return Deprecated(self.cli_ctx, **kwargs) 287 288 289class CommandGroup(object): 290 291 def __init__(self, command_loader, group_name, operations_tmpl, **kwargs): 292 """ Context manager for registering commands that share common properties. 293 294 :param command_loader: The command loader that commands will be registered into 295 :type command_loader: knack.commands.CLICommandsLoader 296 :param group_name: The name of the group of commands in the command hierarchy 297 :type group_name: str 298 :param operations_tmpl: The template for handlers for this group of commands (e.g. '__main__#{}') 299 :type operations_tmpl: str 300 :param kwargs: Kwargs to apply to all commands in this group. 301 Possible values: `client_factory`, `arguments_loader`, `description_loader`, `description`, 302 `formatter_class`, `table_transformer`, `deprecate_info`, `validator`, `confirmation`. 303 """ 304 self.command_loader = command_loader 305 self.group_name = group_name 306 self.operations_tmpl = operations_tmpl 307 self.group_kwargs = kwargs 308 Deprecated.ensure_new_style_deprecation(self.command_loader.cli_ctx, self.group_kwargs, 'command group') 309 if kwargs['deprecate_info']: 310 kwargs['deprecate_info'].target = group_name 311 312 is_preview = kwargs.get('is_preview', False) 313 is_experimental = kwargs.get('is_experimental', False) 314 if is_preview and is_experimental: 315 raise CLIError(PREVIEW_EXPERIMENTAL_CONFLICT_ERROR.format("command group", group_name)) 316 if is_preview: 317 kwargs['preview_info'] = PreviewItem( 318 cli_ctx=self.command_loader.cli_ctx, 319 target=group_name, 320 object_type='command group' 321 ) 322 if is_experimental: 323 kwargs['experimental_info'] = ExperimentalItem( 324 cli_ctx=self.command_loader.cli_ctx, 325 target=group_name, 326 object_type='command group' 327 ) 328 command_loader._populate_command_group_table_with_subgroups(group_name) # pylint: disable=protected-access 329 self.command_loader.command_group_table[group_name] = self 330 331 def __enter__(self): 332 return self 333 334 def __exit__(self, exc_type, exc_val, exc_tb): 335 pass 336 337 def command(self, name, handler_name, **kwargs): 338 """ Register a command into the command table 339 340 :param name: The name of the command 341 :type name: str 342 :param handler_name: The name of the handler that will be applied to the operations template 343 :type handler_name: str 344 :param kwargs: Kwargs to apply to the command. 345 Possible values: `client_factory`, `arguments_loader`, `description_loader`, `description`, 346 `formatter_class`, `table_transformer`, `deprecate_info`, `validator`, `confirmation`, 347 `is_preview`, `is_experimental`. 348 """ 349 import copy 350 351 command_name = '{} {}'.format(self.group_name, name) if self.group_name else name 352 command_kwargs = copy.deepcopy(self.group_kwargs) 353 command_kwargs.update(kwargs) 354 355 # don't inherit deprecation, preview and experimental info from command group 356 # https://github.com/Azure/azure-cli/blob/683b9709b67c4c9e8df92f9fbd53cbf83b6973d3/src/azure-cli-core/azure/cli/core/commands/__init__.py#L1155 357 command_kwargs['deprecate_info'] = kwargs.get('deprecate_info', None) 358 359 is_preview = kwargs.get('is_preview', False) 360 is_experimental = kwargs.get('is_experimental', False) 361 if is_preview and is_experimental: 362 raise CLIError(PREVIEW_EXPERIMENTAL_CONFLICT_ERROR.format("command", self.group_name + " " + name)) 363 364 command_kwargs['preview_info'] = None 365 if is_preview: 366 command_kwargs['preview_info'] = PreviewItem(self.command_loader.cli_ctx, object_type='command') 367 command_kwargs['experimental_info'] = None 368 if is_experimental: 369 command_kwargs['experimental_info'] = ExperimentalItem(self.command_loader.cli_ctx, object_type='command') 370 371 self.command_loader._populate_command_group_table_with_subgroups(' '.join(command_name.split()[:-1])) # pylint: disable=protected-access 372 self.command_loader.command_table[command_name] = self.command_loader.create_command( 373 command_name, 374 self.operations_tmpl.format(handler_name), 375 **command_kwargs) 376 377 def deprecate(self, **kwargs): 378 kwargs['object_type'] = 'command' 379 return Deprecated(self.command_loader.cli_ctx, **kwargs) 380