1# --------------------------------------------------------------------------------------------
2# Copyright (c) Microsoft Corporation. All rights reserved.
3# Licensed under the MIT License. See License.txt in the project root for license information.
4# --------------------------------------------------------------------------------------------
5
6import types
7from collections import OrderedDict, defaultdict
8from importlib import import_module
9
10from .deprecation import Deprecated
11from .preview import PreviewItem
12from .experimental import ExperimentalItem
13from .prompting import prompt_y_n, NoTTYException
14from .util import CLIError, CtxTypeError
15from .arguments import ArgumentRegistry, CLICommandArgument
16from .introspection import extract_args_from_signature, extract_full_summary_from_signature
17from .events import (EVENT_CMDLOADER_LOAD_COMMAND_TABLE, EVENT_CMDLOADER_LOAD_ARGUMENTS,
18                     EVENT_COMMAND_CANCELLED)
19from .log import get_logger
20from .validators import DefaultInt, DefaultStr
21
22logger = get_logger(__name__)
23
24
25PREVIEW_EXPERIMENTAL_CONFLICT_ERROR = "Failed to register {} '{}', " \
26                                      "is_preview and is_experimental can't be true at the same time"
27
28
29class CLICommand(object):  # pylint:disable=too-many-instance-attributes
30
31    # pylint: disable=unused-argument
32    def __init__(self, cli_ctx, name, handler, description=None, table_transformer=None,
33                 arguments_loader=None, description_loader=None,
34                 formatter_class=None, deprecate_info=None, validator=None, confirmation=None, preview_info=None,
35                 experimental_info=None, **kwargs):
36        """ The command object that goes into the command table.
37
38        :param cli_ctx: CLI Context
39        :type cli_ctx: knack.cli.CLI
40        :param name: The name of the command (e.g. 'mygroup mycommand')
41        :type name: str
42        :param handler: The function that will handle this command
43        :type handler: function
44        :param description: The description for the command
45        :type description: str
46        :param table_transformer: A function that transforms the command output for displaying in a table
47        :type table_transformer: function
48        :param arguments_loader: The function that defines how the arguments for the command should be loaded
49        :type arguments_loader: function
50        :param description_loader: The function that defines how the description for the command should be loaded
51        :type description_loader: function
52        :param formatter_class: The formatter for how help should be displayed
53        :type formatter_class: class
54        :param deprecate_info: Deprecation message to display when this command is invoked
55        :type deprecate_info: str
56        :param preview_info: Indicates a command is in preview
57        :type preview_info: bool
58        :param experimental_info: Indicates a command is experimental
59        :type experimental_info: bool
60        :param validator: The command validator
61        :param confirmation: User confirmation required for command
62        :type confirmation: bool, str, callable
63        :param kwargs: Extra kwargs that are currently ignored
64        """
65        from .cli import CLI
66        if cli_ctx is not None and not isinstance(cli_ctx, CLI):
67            raise CtxTypeError(cli_ctx)
68        self.cli_ctx = cli_ctx
69        self.name = name
70        self.handler = handler
71        self.help = None
72        self.description = description_loader if description_loader and self.should_load_description() else description
73        self.arguments = {}
74        self.arguments_loader = arguments_loader
75        self.table_transformer = table_transformer
76        self.formatter_class = formatter_class
77        self.deprecate_info = deprecate_info
78        self.preview_info = preview_info
79        self.experimental_info = experimental_info
80        self.confirmation = confirmation
81        self.validator = validator
82
83    def should_load_description(self):
84        return not self.cli_ctx.data['completer_active']
85
86    def _resolve_default_value_from_config_file(self, arg, overrides):
87        default_key = overrides.settings.get('configured_default', None)
88        if not default_key:
89            return
90
91        defaults_section = self.cli_ctx.config.defaults_section_name
92        use_local_config_original = self.cli_ctx.config.use_local_config
93        self.cli_ctx.config.set_to_use_local_config(True)
94        config_value = self.cli_ctx.config.get(defaults_section, default_key, None)
95        self.cli_ctx.config.set_to_use_local_config(use_local_config_original)
96        if config_value:
97            logger.info("Configured default '%s' for arg %s", config_value, arg.name)
98            overrides.settings['default'] = DefaultStr(config_value)
99            overrides.settings['required'] = False
100            overrides.settings['default_value_source'] = 'Config'
101
102    def load_arguments(self):
103        if self.arguments_loader:
104            cmd_args = self.arguments_loader()
105            if self.confirmation:
106                cmd_args.append(('yes',
107                                 CLICommandArgument(dest='yes', options_list=['--yes', '-y'],
108                                                    action='store_true', help='Do not prompt for confirmation.')))
109            self.arguments.update(cmd_args)
110
111    def add_argument(self, param_name, *option_strings, **kwargs):
112        dest = kwargs.pop('dest', None)
113        argument = CLICommandArgument(dest or param_name, options_list=option_strings, **kwargs)
114        self.arguments[param_name] = argument
115
116    def update_argument(self, param_name, argtype):
117        arg = self.arguments[param_name]
118        # resolve defaults from either environment variable or config file
119        self._resolve_default_value_from_config_file(arg, argtype)
120        arg.type.update(other=argtype)
121        arg_default = arg.type.settings.get('default', None)
122        # apply DefaultStr and DefaultInt to allow distinguishing between
123        # when a default was applied or when the user specified a value
124        # that coincides with the default
125        if isinstance(arg_default, str):
126            arg_default = DefaultStr(arg_default)
127        elif isinstance(arg_default, int):
128            arg_default = DefaultInt(arg_default)
129        # update the default
130        if arg_default:
131            arg.type.settings['default'] = arg_default
132
133    def execute(self, **kwargs):
134        return self(**kwargs)
135
136    def __call__(self, *args, **kwargs):
137
138        cmd_args = args[0]
139
140        confirm = self.confirmation and not cmd_args.pop('yes', None) \
141            and not self.cli_ctx.config.getboolean('core', 'disable_confirm_prompt', fallback=False)
142
143        if confirm and not self._user_confirmed(self.confirmation, cmd_args):
144            self.cli_ctx.raise_event(EVENT_COMMAND_CANCELLED, command=self.name, command_args=cmd_args)
145            raise CLIError('Operation cancelled.')
146        return self.handler(*args, **kwargs)
147
148    @staticmethod
149    def _user_confirmed(confirmation, command_args):
150        if callable(confirmation):
151            return confirmation(command_args)
152        try:
153            if isinstance(confirmation, str):
154                return prompt_y_n(confirmation)
155            return prompt_y_n('Are you sure you want to perform this operation?')
156        except NoTTYException:
157            logger.warning('Unable to prompt for confirmation as no tty available. Use --yes.')
158            return False
159
160
161# pylint: disable=too-many-instance-attributes
162class CLICommandsLoader(object):
163
164    def __init__(self, cli_ctx=None, command_cls=CLICommand, excluded_command_handler_args=None):
165        """ The loader of commands. It contains the command table and argument registries.
166
167        :param cli_ctx: CLI Context
168        :type cli_ctx: knack.cli.CLI
169        :param command_cls: The command type that the command table will be populated with
170        :type command_cls: knack.commands.CLICommand
171        :param excluded_command_handler_args: List of params to ignore and not extract from a commands handler.
172                                              By default we ignore ['self', 'kwargs'].
173        :type excluded_command_handler_args: list of str
174        """
175        from .cli import CLI
176        if cli_ctx is not None and not isinstance(cli_ctx, CLI):
177            raise CtxTypeError(cli_ctx)
178        self.cli_ctx = cli_ctx
179        self.command_cls = command_cls
180        self.skip_applicability = False
181        self.excluded_command_handler_args = excluded_command_handler_args
182        # A command table is a dictionary of name -> CLICommand instances
183        self.command_table = dict()
184        # A command group table is a dictionary of names -> CommandGroup instances
185        self.command_group_table = dict()
186        # An argument registry stores all arguments for commands
187        self.argument_registry = ArgumentRegistry()
188        self.extra_argument_registry = defaultdict(lambda: {})
189
190    def _populate_command_group_table_with_subgroups(self, name):
191        if not name:
192            return
193
194        # ensure all subgroups have some entry in the command group table
195        name_components = name.split()
196        for i, _ in enumerate(name_components):
197            subgroup_name = ' '.join(name_components[:i + 1])
198            if subgroup_name not in self.command_group_table:
199                self.command_group_table[subgroup_name] = {}
200
201    def load_command_table(self, args):  # pylint: disable=unused-argument
202        """ Load commands into the command table
203
204        :param args: List of the arguments from the command line
205        :type args: list
206        :return: The ordered command table
207        :rtype: collections.OrderedDict
208        """
209        self.cli_ctx.raise_event(EVENT_CMDLOADER_LOAD_COMMAND_TABLE, cmd_tbl=self.command_table)
210        return OrderedDict(self.command_table)
211
212    def load_arguments(self, command):
213        """ Load the arguments for the specified command
214
215        :param command: The command to load arguments for
216        :type command: str
217        """
218        from knack.arguments import ArgumentsContext
219
220        self.cli_ctx.raise_event(EVENT_CMDLOADER_LOAD_ARGUMENTS, cmd_tbl=self.command_table, command=command)
221        try:
222            self.command_table[command].load_arguments()
223        except KeyError:
224            return
225
226        # ensure global 'cmd' is ignored
227        with ArgumentsContext(self, '') as c:
228            c.ignore('cmd')
229
230        self._apply_parameter_info(command, self.command_table[command])
231
232    def _apply_parameter_info(self, command_name, command):
233        for argument_name in command.arguments:
234            overrides = self.argument_registry.get_cli_argument(command_name, argument_name)
235            command.update_argument(argument_name, overrides)
236        # Add any arguments explicitly registered for this command
237        for argument_name, argument_definition in self.extra_argument_registry[command_name].items():
238            command.arguments[argument_name] = argument_definition
239            command.update_argument(argument_name, self.argument_registry.get_cli_argument(command_name, argument_name))
240
241    def create_command(self, name, operation, **kwargs):
242        """ Constructs the command object that can then be added to the command table """
243        if not isinstance(operation, str):
244            raise ValueError("Operation must be a string. Got '{}'".format(operation))
245
246        name = ' '.join(name.split())
247
248        client_factory = kwargs.get('client_factory', None)
249
250        def _command_handler(command_args):
251            op = CLICommandsLoader._get_op_handler(operation)
252            client = client_factory(command_args) if client_factory else None
253            result = op(client, **command_args) if client else op(**command_args)
254            return result
255
256        def arguments_loader():
257            return list(extract_args_from_signature(CLICommandsLoader._get_op_handler(operation),
258                                                    excluded_params=self.excluded_command_handler_args))
259
260        def description_loader():
261            return extract_full_summary_from_signature(CLICommandsLoader._get_op_handler(operation))
262
263        kwargs['arguments_loader'] = arguments_loader
264        kwargs['description_loader'] = description_loader
265
266        cmd = self.command_cls(self.cli_ctx, name, _command_handler, **kwargs)
267        return cmd
268
269    @staticmethod
270    def _get_op_handler(operation):
271        """ Import and load the operation handler """
272        try:
273            mod_to_import, attr_path = operation.split('#')
274            op = import_module(mod_to_import)
275            for part in attr_path.split('.'):
276                op = getattr(op, part)
277            if isinstance(op, types.FunctionType):
278                return op
279            # op as types.MethodType
280            return op.__func__
281        except (ValueError, AttributeError) as ex:
282            raise ValueError("The operation '{}' is invalid.".format(operation)) from ex
283
284    def deprecate(self, **kwargs):
285        kwargs['object_type'] = 'command group'
286        return Deprecated(self.cli_ctx, **kwargs)
287
288
289class CommandGroup(object):
290
291    def __init__(self, command_loader, group_name, operations_tmpl, **kwargs):
292        """ Context manager for registering commands that share common properties.
293
294        :param command_loader: The command loader that commands will be registered into
295        :type command_loader: knack.commands.CLICommandsLoader
296        :param group_name: The name of the group of commands in the command hierarchy
297        :type group_name: str
298        :param operations_tmpl: The template for handlers for this group of commands (e.g. '__main__#{}')
299        :type operations_tmpl: str
300        :param kwargs: Kwargs to apply to all commands in this group.
301                       Possible values: `client_factory`, `arguments_loader`, `description_loader`, `description`,
302                       `formatter_class`, `table_transformer`, `deprecate_info`, `validator`, `confirmation`.
303        """
304        self.command_loader = command_loader
305        self.group_name = group_name
306        self.operations_tmpl = operations_tmpl
307        self.group_kwargs = kwargs
308        Deprecated.ensure_new_style_deprecation(self.command_loader.cli_ctx, self.group_kwargs, 'command group')
309        if kwargs['deprecate_info']:
310            kwargs['deprecate_info'].target = group_name
311
312        is_preview = kwargs.get('is_preview', False)
313        is_experimental = kwargs.get('is_experimental', False)
314        if is_preview and is_experimental:
315            raise CLIError(PREVIEW_EXPERIMENTAL_CONFLICT_ERROR.format("command group", group_name))
316        if is_preview:
317            kwargs['preview_info'] = PreviewItem(
318                cli_ctx=self.command_loader.cli_ctx,
319                target=group_name,
320                object_type='command group'
321            )
322        if is_experimental:
323            kwargs['experimental_info'] = ExperimentalItem(
324                cli_ctx=self.command_loader.cli_ctx,
325                target=group_name,
326                object_type='command group'
327            )
328        command_loader._populate_command_group_table_with_subgroups(group_name)  # pylint: disable=protected-access
329        self.command_loader.command_group_table[group_name] = self
330
331    def __enter__(self):
332        return self
333
334    def __exit__(self, exc_type, exc_val, exc_tb):
335        pass
336
337    def command(self, name, handler_name, **kwargs):
338        """ Register a command into the command table
339
340        :param name: The name of the command
341        :type name: str
342        :param handler_name: The name of the handler that will be applied to the operations template
343        :type handler_name: str
344        :param kwargs: Kwargs to apply to the command.
345                       Possible values: `client_factory`, `arguments_loader`, `description_loader`, `description`,
346                       `formatter_class`, `table_transformer`, `deprecate_info`, `validator`, `confirmation`,
347                       `is_preview`, `is_experimental`.
348        """
349        import copy
350
351        command_name = '{} {}'.format(self.group_name, name) if self.group_name else name
352        command_kwargs = copy.deepcopy(self.group_kwargs)
353        command_kwargs.update(kwargs)
354
355        # don't inherit deprecation, preview and experimental info from command group
356        # https://github.com/Azure/azure-cli/blob/683b9709b67c4c9e8df92f9fbd53cbf83b6973d3/src/azure-cli-core/azure/cli/core/commands/__init__.py#L1155
357        command_kwargs['deprecate_info'] = kwargs.get('deprecate_info', None)
358
359        is_preview = kwargs.get('is_preview', False)
360        is_experimental = kwargs.get('is_experimental', False)
361        if is_preview and is_experimental:
362            raise CLIError(PREVIEW_EXPERIMENTAL_CONFLICT_ERROR.format("command", self.group_name + " " + name))
363
364        command_kwargs['preview_info'] = None
365        if is_preview:
366            command_kwargs['preview_info'] = PreviewItem(self.command_loader.cli_ctx, object_type='command')
367        command_kwargs['experimental_info'] = None
368        if is_experimental:
369            command_kwargs['experimental_info'] = ExperimentalItem(self.command_loader.cli_ctx, object_type='command')
370
371        self.command_loader._populate_command_group_table_with_subgroups(' '.join(command_name.split()[:-1]))  # pylint: disable=protected-access
372        self.command_loader.command_table[command_name] = self.command_loader.create_command(
373            command_name,
374            self.operations_tmpl.format(handler_name),
375            **command_kwargs)
376
377    def deprecate(self, **kwargs):
378        kwargs['object_type'] = 'command'
379        return Deprecated(self.command_loader.cli_ctx, **kwargs)
380