1# coding=utf-8
2"""Variant on standard library's cmd with extra features.
3
4To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you
5were using the standard library's cmd, while enjoying the extra features.
6
7Searchable command history (commands: "history")
8Run commands from file, save to file, edit commands in file
9Multi-line commands
10Special-character shortcut commands (beyond cmd's "?" and "!")
11Settable environment parameters
12Parsing commands with `argparse` argument parsers (flags)
13Redirection to file or paste buffer (clipboard) with > or >>
14Easy transcript-based testing of applications (see examples/example.py)
15Bash-style ``select`` available
16
17Note that redirection with > and | will only work if `self.poutput()`
18is used in place of `print`.
19
20- Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com
21
22Git repository on GitHub at https://github.com/python-cmd2/cmd2
23"""
24# This module has many imports, quite a few of which are only
25# infrequently utilized. To reduce the initial overhead of
26# import this module, many of these imports are lazy-loaded
27# i.e. we only import the module when we use it
28# For example, we don't import the 'traceback' module
29# until the pexcept() function is called and the debug
30# setting is True
31import argparse
32import cmd
33import functools
34import glob
35import inspect
36import os
37import pydoc
38import re
39import sys
40import threading
41from code import (
42    InteractiveConsole,
43)
44from collections import (
45    OrderedDict,
46    namedtuple,
47)
48from contextlib import (
49    redirect_stdout,
50)
51from types import (
52    FrameType,
53    ModuleType,
54)
55from typing import (
56    Any,
57    Callable,
58    Dict,
59    Iterable,
60    List,
61    Mapping,
62    Optional,
63    Set,
64    TextIO,
65    Tuple,
66    Type,
67    TypeVar,
68    Union,
69    cast,
70)
71
72from . import (
73    ansi,
74    argparse_completer,
75    argparse_custom,
76    constants,
77    plugin,
78    utils,
79)
80from .argparse_custom import (
81    ChoicesProviderFunc,
82    CompleterFunc,
83    CompletionItem,
84)
85from .clipboard import (
86    can_clip,
87    get_paste_buffer,
88    write_to_paste_buffer,
89)
90from .command_definition import (
91    CommandFunc,
92    CommandSet,
93)
94from .constants import (
95    CLASS_ATTR_DEFAULT_HELP_CATEGORY,
96    COMMAND_FUNC_PREFIX,
97    COMPLETER_FUNC_PREFIX,
98    HELP_FUNC_PREFIX,
99)
100from .decorators import (
101    as_subcommand_to,
102    with_argparser,
103)
104from .exceptions import (
105    Cmd2ShlexError,
106    CommandSetRegistrationError,
107    CompletionError,
108    EmbeddedConsoleExit,
109    EmptyStatement,
110    PassThroughException,
111    RedirectionError,
112    SkipPostcommandHooks,
113)
114from .history import (
115    History,
116    HistoryItem,
117)
118from .parsing import (
119    Macro,
120    MacroArg,
121    Statement,
122    StatementParser,
123    shlex_split,
124)
125from .rl_utils import (
126    RlType,
127    rl_escape_prompt,
128    rl_get_point,
129    rl_get_prompt,
130    rl_set_prompt,
131    rl_type,
132    rl_warning,
133    vt100_support,
134)
135from .table_creator import (
136    Column,
137    SimpleTable,
138)
139from .utils import (
140    Settable,
141    get_defining_class,
142)
143
144# Set up readline
145if rl_type == RlType.NONE:  # pragma: no cover
146    sys.stderr.write(ansi.style_warning(rl_warning))
147else:
148    from .rl_utils import (  # type: ignore[attr-defined]
149        readline,
150        rl_force_redisplay,
151    )
152
153    # Used by rlcompleter in Python console loaded by py command
154    orig_rl_delims = readline.get_completer_delims()
155
156    if rl_type == RlType.PYREADLINE:
157
158        # Save the original pyreadline display completion function since we need to override it and restore it
159        # noinspection PyProtectedMember,PyUnresolvedReferences
160        orig_pyreadline_display = readline.rl.mode._display_completions
161
162    elif rl_type == RlType.GNU:
163
164        # Get the readline lib so we can make changes to it
165        import ctypes
166
167        from .rl_utils import (
168            readline_lib,
169        )
170
171        rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters")
172        orig_rl_basic_quotes = cast(bytes, ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value)
173
174
175class _SavedReadlineSettings:
176    """readline settings that are backed up when switching between readline environments"""
177
178    def __init__(self) -> None:
179        self.completer = None
180        self.delims = ''
181        self.basic_quotes: Optional[bytes] = None
182
183
184class _SavedCmd2Env:
185    """cmd2 environment settings that are backed up when entering an interactive Python shell"""
186
187    def __init__(self) -> None:
188        self.readline_settings = _SavedReadlineSettings()
189        self.readline_module: Optional[ModuleType] = None
190        self.history: List[str] = []
191        self.sys_stdout: Optional[TextIO] = None
192        self.sys_stdin: Optional[TextIO] = None
193
194
195# Contains data about a disabled command which is used to restore its original functions when the command is enabled
196DisabledCommand = namedtuple('DisabledCommand', ['command_function', 'help_function', 'completer_function'])
197
198
199class Cmd(cmd.Cmd):
200    """An easy but powerful framework for writing line-oriented command interpreters.
201
202    Extends the Python Standard Library’s cmd package by adding a lot of useful features
203    to the out of the box configuration.
204
205    Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes.
206    """
207
208    DEFAULT_EDITOR = utils.find_editor()
209
210    INTERNAL_COMMAND_EPILOG = (
211        "Notes:\n" "  This command is for internal use and is not intended to be called from the\n" "  command line."
212    )
213
214    # Sorting keys for strings
215    ALPHABETICAL_SORT_KEY = utils.norm_fold
216    NATURAL_SORT_KEY = utils.natural_keys
217
218    def __init__(
219        self,
220        completekey: str = 'tab',
221        stdin: Optional[TextIO] = None,
222        stdout: Optional[TextIO] = None,
223        *,
224        persistent_history_file: str = '',
225        persistent_history_length: int = 1000,
226        startup_script: str = '',
227        silence_startup_script: bool = False,
228        include_py: bool = False,
229        include_ipy: bool = False,
230        allow_cli_args: bool = True,
231        transcript_files: Optional[List[str]] = None,
232        allow_redirection: bool = True,
233        multiline_commands: Optional[List[str]] = None,
234        terminators: Optional[List[str]] = None,
235        shortcuts: Optional[Dict[str, str]] = None,
236        command_sets: Optional[Iterable[CommandSet]] = None,
237        auto_load_commands: bool = True,
238    ) -> None:
239        """An easy but powerful framework for writing line-oriented command
240        interpreters. Extends Python's cmd package.
241
242        :param completekey: readline name of a completion key, default to Tab
243        :param stdin: alternate input file object, if not specified, sys.stdin is used
244        :param stdout: alternate output file object, if not specified, sys.stdout is used
245        :param persistent_history_file: file path to load a persistent cmd2 command history from
246        :param persistent_history_length: max number of history items to write
247                                          to the persistent history file
248        :param startup_script: file path to a script to execute at startup
249        :param silence_startup_script: if ``True``, then the startup script's output will be
250                                       suppressed. Anything written to stderr will still display.
251        :param include_py: should the "py" command be included for an embedded Python shell
252        :param include_ipy: should the "ipy" command be included for an embedded IPython shell
253        :param allow_cli_args: if ``True``, then :meth:`cmd2.Cmd.__init__` will process command
254                               line arguments as either commands to be run or, if ``-t`` or
255                               ``--test`` are given, transcript files to run. This should be
256                               set to ``False`` if your application parses its own command line
257                               arguments.
258        :param transcript_files: pass a list of transcript files to be run on initialization.
259                                 This allows running transcript tests when ``allow_cli_args``
260                                 is ``False``. If ``allow_cli_args`` is ``True`` this parameter
261                                 is ignored.
262        :param allow_redirection: If ``False``, prevent output redirection and piping to shell
263                                  commands. This parameter prevents redirection and piping, but
264                                  does not alter parsing behavior. A user can still type
265                                  redirection and piping tokens, and they will be parsed as such
266                                  but they won't do anything.
267        :param multiline_commands: list of commands allowed to accept multi-line input
268        :param terminators: list of characters that terminate a command. These are mainly
269                            intended for terminating multiline commands, but will also
270                            terminate single-line commands. If not supplied, the default
271                            is a semicolon. If your app only contains single-line commands
272                            and you want terminators to be treated as literals by the parser,
273                            then set this to an empty list.
274        :param shortcuts: dictionary containing shortcuts for commands. If not supplied,
275                          then defaults to constants.DEFAULT_SHORTCUTS. If you do not want
276                          any shortcuts, pass an empty dictionary.
277        :param command_sets: Provide CommandSet instances to load during cmd2 initialization.
278                             This allows CommandSets with custom constructor parameters to be
279                             loaded.  This also allows the a set of CommandSets to be provided
280                             when `auto_load_commands` is set to False
281        :param auto_load_commands: If True, cmd2 will check for all subclasses of `CommandSet`
282                                   that are currently loaded by Python and automatically
283                                   instantiate and register all commands. If False, CommandSets
284                                   must be manually installed with `register_command_set`.
285        """
286        # Check if py or ipy need to be disabled in this instance
287        if not include_py:
288            setattr(self, 'do_py', None)
289        if not include_ipy:
290            setattr(self, 'do_ipy', None)
291
292        # initialize plugin system
293        # needs to be done before we call __init__(0)
294        self._initialize_plugin_system()
295
296        # Call super class constructor
297        super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
298
299        # Attributes which should NOT be dynamically settable via the set command at runtime
300        self.default_to_shell = False  # Attempt to run unrecognized commands as shell commands
301        self.allow_redirection = allow_redirection  # Security setting to prevent redirection of stdout
302
303        # Attributes which ARE dynamically settable via the set command at runtime
304        self.always_show_hint = False
305        self.debug = False
306        self.echo = False
307        self.editor = Cmd.DEFAULT_EDITOR
308        self.feedback_to_output = False  # Do not include nonessentials in >, | output by default (things like timing)
309        self.quiet = False  # Do not suppress nonessential output
310        self.timing = False  # Prints elapsed time for each command
311
312        # The maximum number of CompletionItems to display during tab completion. If the number of completion
313        # suggestions exceeds this number, they will be displayed in the typical columnized format and will
314        # not include the description value of the CompletionItems.
315        self.max_completion_items = 50
316
317        # A dictionary mapping settable names to their Settable instance
318        self._settables: Dict[str, Settable] = dict()
319        self._always_prefix_settables: bool = False
320
321        # CommandSet containers
322        self._installed_command_sets: Set[CommandSet] = set()
323        self._cmd_to_command_sets: Dict[str, CommandSet] = {}
324
325        self.build_settables()
326
327        # Use as prompt for multiline commands on the 2nd+ line of input
328        self.continuation_prompt = '> '
329
330        # Allow access to your application in embedded Python shells and scripts py via self
331        self.self_in_py = False
332
333        # Commands to exclude from the help menu and tab completion
334        self.hidden_commands = ['eof', '_relative_run_script']
335
336        # Initialize history
337        self._persistent_history_length = persistent_history_length
338        self._initialize_history(persistent_history_file)
339
340        # Commands to exclude from the history command
341        self.exclude_from_history = ['eof', 'history']
342
343        # Dictionary of macro names and their values
344        self.macros: Dict[str, Macro] = dict()
345
346        # Keeps track of typed command history in the Python shell
347        self._py_history: List[str] = []
348
349        # The name by which Python environments refer to the PyBridge to call app commands
350        self.py_bridge_name = 'app'
351
352        # Defines app-specific variables/functions available in Python shells and pyscripts
353        self.py_locals: Dict[str, Any] = dict()
354
355        # True if running inside a Python shell or pyscript, False otherwise
356        self._in_py = False
357
358        self.statement_parser = StatementParser(
359            terminators=terminators, multiline_commands=multiline_commands, shortcuts=shortcuts
360        )
361
362        # Stores results from the last command run to enable usage of results in Python shells and pyscripts
363        self.last_result: Any = None
364
365        # Used by run_script command to store current script dir as a LIFO queue to support _relative_run_script command
366        self._script_dir: List[str] = []
367
368        # Context manager used to protect critical sections in the main thread from stopping due to a KeyboardInterrupt
369        self.sigint_protection = utils.ContextFlag()
370
371        # If the current command created a process to pipe to, then this will be a ProcReader object.
372        # Otherwise it will be None. It's used to know when a pipe process can be killed and/or waited upon.
373        self._cur_pipe_proc_reader: Optional[utils.ProcReader] = None
374
375        # Used to keep track of whether we are redirecting or piping output
376        self._redirecting = False
377
378        # Used to keep track of whether a continuation prompt is being displayed
379        self._at_continuation_prompt = False
380
381        # The multiline command currently being typed which is used to tab complete multiline commands.
382        self._multiline_in_progress = ''
383
384        # Set the header used for the help function's listing of documented functions
385        self.doc_header = "Documented commands (use 'help -v' for verbose/'help <topic>' for details):"
386
387        # The error that prints when no help information can be found
388        self.help_error = "No help on {}"
389
390        # The error that prints when a non-existent command is run
391        self.default_error = "{} is not a recognized command, alias, or macro"
392
393        # If non-empty, this string will be displayed if a broken pipe error occurs
394        self.broken_pipe_warning = ''
395
396        # Commands that will run at the beginning of the command loop
397        self._startup_commands: List[str] = []
398
399        # If a startup script is provided and exists, then execute it in the startup commands
400        if startup_script:
401            startup_script = os.path.abspath(os.path.expanduser(startup_script))
402            if os.path.exists(startup_script):
403                script_cmd = f"run_script {utils.quote_string(startup_script)}"
404                if silence_startup_script:
405                    script_cmd += f" {constants.REDIRECTION_OUTPUT} {os.devnull}"
406                self._startup_commands.append(script_cmd)
407
408        # Transcript files to run instead of interactive command loop
409        self._transcript_files: Optional[List[str]] = None
410
411        # Check for command line args
412        if allow_cli_args:
413            parser = argparse_custom.DEFAULT_ARGUMENT_PARSER()
414            parser.add_argument('-t', '--test', action="store_true", help='Test against transcript(s) in FILE (wildcards OK)')
415            callopts, callargs = parser.parse_known_args()
416
417            # If transcript testing was called for, use other arguments as transcript files
418            if callopts.test:
419                self._transcript_files = callargs
420            # If commands were supplied at invocation, then add them to the command queue
421            elif callargs:
422                self._startup_commands.extend(callargs)
423        elif transcript_files:
424            self._transcript_files = transcript_files
425
426        # Set the pager(s) for use with the ppaged() method for displaying output using a pager
427        if sys.platform.startswith('win'):
428            self.pager = self.pager_chop = 'more'
429        else:
430            # Here is the meaning of the various flags we are using with the less command:
431            # -S causes lines longer than the screen width to be chopped (truncated) rather than wrapped
432            # -R causes ANSI "style" escape sequences to be output in raw form (i.e. colors are displayed)
433            # -X disables sending the termcap initialization and deinitialization strings to the terminal
434            # -F causes less to automatically exit if the entire file can be displayed on the first screen
435            self.pager = 'less -RXF'
436            self.pager_chop = 'less -SRXF'
437
438        # This boolean flag determines whether or not the cmd2 application can interact with the clipboard
439        self._can_clip = can_clip
440
441        # This determines the value returned by cmdloop() when exiting the application
442        self.exit_code = 0
443
444        # This lock should be acquired before doing any asynchronous changes to the terminal to
445        # ensure the updates to the terminal don't interfere with the input being typed or output
446        # being printed by a command.
447        self.terminal_lock = threading.RLock()
448
449        # Commands that have been disabled from use. This is to support commands that are only available
450        # during specific states of the application. This dictionary's keys are the command names and its
451        # values are DisabledCommand objects.
452        self.disabled_commands: Dict[str, DisabledCommand] = dict()
453
454        # If any command has been categorized, then all other commands that haven't been categorized
455        # will display under this section in the help output.
456        self.default_category = 'Uncategorized'
457
458        # The default key for sorting string results. Its default value performs a case-insensitive alphabetical sort.
459        # If natural sorting is preferred, then set this to NATURAL_SORT_KEY.
460        # cmd2 uses this key for sorting:
461        #     command and category names
462        #     alias, macro, settable, and shortcut names
463        #     tab completion results when self.matches_sorted is False
464        self.default_sort_key = Cmd.ALPHABETICAL_SORT_KEY
465
466        ############################################################################################################
467        # The following variables are used by tab completion functions. They are reset each time complete() is run
468        # in _reset_completion_defaults() and it is up to completer functions to set them before returning results.
469        ############################################################################################################
470
471        # If True and a single match is returned to complete(), then a space will be appended
472        # if the match appears at the end of the line
473        self.allow_appended_space = True
474
475        # If True and a single match is returned to complete(), then a closing quote
476        # will be added if there is an unmatched opening quote
477        self.allow_closing_quote = True
478
479        # An optional hint which prints above tab completion suggestions
480        self.completion_hint = ''
481
482        # Normally cmd2 uses readline's formatter to columnize the list of completion suggestions.
483        # If a custom format is preferred, write the formatted completions to this string. cmd2 will
484        # then print it instead of the readline format. ANSI style sequences and newlines are supported
485        # when using this value. Even when using formatted_completions, the full matches must still be returned
486        # from your completer function. ArgparseCompleter writes its tab completion tables to this string.
487        self.formatted_completions = ''
488
489        # Used by complete() for readline tab completion
490        self.completion_matches: List[str] = []
491
492        # Use this list if you need to display tab completion suggestions that are different than the actual text
493        # of the matches. For instance, if you are completing strings that contain a common delimiter and you only
494        # want to display the final portion of the matches as the tab completion suggestions. The full matches
495        # still must be returned from your completer function. For an example, look at path_complete() which
496        # uses this to show only the basename of paths as the suggestions. delimiter_complete() also populates
497        # this list. These are ignored if self.formatted_completions is populated.
498        self.display_matches: List[str] = []
499
500        # Used by functions like path_complete() and delimiter_complete() to properly
501        # quote matches that are completed in a delimited fashion
502        self.matches_delimited = False
503
504        # Set to True before returning matches to complete() in cases where matches have already been sorted.
505        # If False, then complete() will sort the matches using self.default_sort_key before they are displayed.
506        # This does not affect self.formatted_completions.
507        self.matches_sorted = False
508
509        ############################################################################################################
510        # The following code block loads CommandSets, verifies command names, and registers subcommands.
511        # This block should appear after all attributes have been created since the registration code
512        # depends on them and it's possible a module's on_register() method may need to access some.
513        ############################################################################################################
514        # Load modular commands
515        if command_sets:
516            for command_set in command_sets:
517                self.register_command_set(command_set)
518
519        if auto_load_commands:
520            self._autoload_commands()
521
522        # Verify commands don't have invalid names (like starting with a shortcut)
523        for cur_cmd in self.get_all_commands():
524            valid, errmsg = self.statement_parser.is_valid_command(cur_cmd)
525            if not valid:
526                raise ValueError(f"Invalid command name '{cur_cmd}': {errmsg}")
527
528        # Add functions decorated to be subcommands
529        self._register_subcommands(self)
530
531    def find_commandsets(self, commandset_type: Type[CommandSet], *, subclass_match: bool = False) -> List[CommandSet]:
532        """
533        Find all CommandSets that match the provided CommandSet type.
534        By default, locates a CommandSet that is an exact type match but may optionally return all CommandSets that
535        are sub-classes of the provided type
536        :param commandset_type: CommandSet sub-class type to search for
537        :param subclass_match: If True, return all sub-classes of provided type, otherwise only search for exact match
538        :return: Matching CommandSets
539        """
540        return [
541            cmdset
542            for cmdset in self._installed_command_sets
543            if type(cmdset) == commandset_type or (subclass_match and isinstance(cmdset, commandset_type))
544        ]
545
546    def find_commandset_for_command(self, command_name: str) -> Optional[CommandSet]:
547        """
548        Finds the CommandSet that registered the command name
549        :param command_name: command name to search
550        :return: CommandSet that provided the command
551        """
552        return self._cmd_to_command_sets.get(command_name)
553
554    def _autoload_commands(self) -> None:
555        """Load modular command definitions."""
556        # Search for all subclasses of CommandSet, instantiate them if they weren't already provided in the constructor
557        all_commandset_defs = CommandSet.__subclasses__()
558        existing_commandset_types = [type(command_set) for command_set in self._installed_command_sets]
559
560        def load_commandset_by_type(commandset_types: List[Type[CommandSet]]) -> None:
561            for cmdset_type in commandset_types:
562                # check if the type has sub-classes. We will only auto-load leaf class types.
563                subclasses = cmdset_type.__subclasses__()
564                if subclasses:
565                    load_commandset_by_type(subclasses)
566                else:
567                    init_sig = inspect.signature(cmdset_type.__init__)
568                    if not (
569                        cmdset_type in existing_commandset_types
570                        or len(init_sig.parameters) != 1
571                        or 'self' not in init_sig.parameters
572                    ):
573                        cmdset = cmdset_type()
574                        self.register_command_set(cmdset)
575
576        load_commandset_by_type(all_commandset_defs)
577
578    def register_command_set(self, cmdset: CommandSet) -> None:
579        """
580        Installs a CommandSet, loading all commands defined in the CommandSet
581
582        :param cmdset: CommandSet to load
583        """
584        existing_commandset_types = [type(command_set) for command_set in self._installed_command_sets]
585        if type(cmdset) in existing_commandset_types:
586            raise CommandSetRegistrationError('CommandSet ' + type(cmdset).__name__ + ' is already installed')
587
588        all_settables = self.settables
589        if self.always_prefix_settables:
590            if not cmdset.settable_prefix.strip():
591                raise CommandSetRegistrationError('CommandSet settable prefix must not be empty')
592            for key in cmdset.settables.keys():
593                prefixed_name = f'{cmdset.settable_prefix}.{key}'
594                if prefixed_name in all_settables:
595                    raise CommandSetRegistrationError(f'Duplicate settable: {key}')
596
597        else:
598            for key in cmdset.settables.keys():
599                if key in all_settables:
600                    raise CommandSetRegistrationError(f'Duplicate settable {key} is already registered')
601
602        cmdset.on_register(self)
603        methods = inspect.getmembers(
604            cmdset,
605            predicate=lambda meth: isinstance(meth, Callable)  # type: ignore[arg-type]
606            and hasattr(meth, '__name__')
607            and meth.__name__.startswith(COMMAND_FUNC_PREFIX),
608        )
609
610        default_category = getattr(cmdset, CLASS_ATTR_DEFAULT_HELP_CATEGORY, None)
611
612        installed_attributes = []
613        try:
614            for method_name, method in methods:
615                command = method_name[len(COMMAND_FUNC_PREFIX) :]
616
617                self._install_command_function(command, method, type(cmdset).__name__)
618                installed_attributes.append(method_name)
619
620                completer_func_name = COMPLETER_FUNC_PREFIX + command
621                cmd_completer = getattr(cmdset, completer_func_name, None)
622                if cmd_completer is not None:
623                    self._install_completer_function(command, cmd_completer)
624                    installed_attributes.append(completer_func_name)
625
626                help_func_name = HELP_FUNC_PREFIX + command
627                cmd_help = getattr(cmdset, help_func_name, None)
628                if cmd_help is not None:
629                    self._install_help_function(command, cmd_help)
630                    installed_attributes.append(help_func_name)
631
632                self._cmd_to_command_sets[command] = cmdset
633
634                if default_category and not hasattr(method, constants.CMD_ATTR_HELP_CATEGORY):
635                    utils.categorize(method, default_category)
636
637            self._installed_command_sets.add(cmdset)
638
639            self._register_subcommands(cmdset)
640            cmdset.on_registered()
641        except Exception:
642            cmdset.on_unregister()
643            for attrib in installed_attributes:
644                delattr(self, attrib)
645            if cmdset in self._installed_command_sets:
646                self._installed_command_sets.remove(cmdset)
647            if cmdset in self._cmd_to_command_sets.values():
648                self._cmd_to_command_sets = {key: val for key, val in self._cmd_to_command_sets.items() if val is not cmdset}
649            cmdset.on_unregistered()
650            raise
651
652    def _install_command_function(self, command: str, command_wrapper: Callable[..., Any], context: str = '') -> None:
653        cmd_func_name = COMMAND_FUNC_PREFIX + command
654
655        # Make sure command function doesn't share name with existing attribute
656        if hasattr(self, cmd_func_name):
657            raise CommandSetRegistrationError(f'Attribute already exists: {cmd_func_name} ({context})')
658
659        # Check if command has an invalid name
660        valid, errmsg = self.statement_parser.is_valid_command(command)
661        if not valid:
662            raise CommandSetRegistrationError(f"Invalid command name '{command}': {errmsg}")
663
664        # Check if command shares a name with an alias
665        if command in self.aliases:
666            self.pwarning(f"Deleting alias '{command}' because it shares its name with a new command")
667            del self.aliases[command]
668
669        # Check if command shares a name with a macro
670        if command in self.macros:
671            self.pwarning(f"Deleting macro '{command}' because it shares its name with a new command")
672            del self.macros[command]
673
674        setattr(self, cmd_func_name, command_wrapper)
675
676    def _install_completer_function(self, cmd_name: str, cmd_completer: CompleterFunc) -> None:
677        completer_func_name = COMPLETER_FUNC_PREFIX + cmd_name
678
679        if hasattr(self, completer_func_name):
680            raise CommandSetRegistrationError(f'Attribute already exists: {completer_func_name}')
681        setattr(self, completer_func_name, cmd_completer)
682
683    def _install_help_function(self, cmd_name: str, cmd_help: Callable[..., None]) -> None:
684        help_func_name = HELP_FUNC_PREFIX + cmd_name
685
686        if hasattr(self, help_func_name):
687            raise CommandSetRegistrationError(f'Attribute already exists: {help_func_name}')
688        setattr(self, help_func_name, cmd_help)
689
690    def unregister_command_set(self, cmdset: CommandSet) -> None:
691        """
692        Uninstalls a CommandSet and unloads all associated commands
693
694        :param cmdset: CommandSet to uninstall
695        """
696        if cmdset in self._installed_command_sets:
697            self._check_uninstallable(cmdset)
698            cmdset.on_unregister()
699            self._unregister_subcommands(cmdset)
700
701            methods = inspect.getmembers(
702                cmdset,
703                predicate=lambda meth: isinstance(meth, Callable)  # type: ignore[arg-type]
704                and hasattr(meth, '__name__')
705                and meth.__name__.startswith(COMMAND_FUNC_PREFIX),
706            )
707
708            for method in methods:
709                cmd_name = method[0][len(COMMAND_FUNC_PREFIX) :]
710
711                # Enable the command before uninstalling it to make sure we remove both
712                # the real functions and the ones used by the DisabledCommand object.
713                if cmd_name in self.disabled_commands:
714                    self.enable_command(cmd_name)
715
716                if cmd_name in self._cmd_to_command_sets:
717                    del self._cmd_to_command_sets[cmd_name]
718
719                delattr(self, COMMAND_FUNC_PREFIX + cmd_name)
720
721                if hasattr(self, COMPLETER_FUNC_PREFIX + cmd_name):
722                    delattr(self, COMPLETER_FUNC_PREFIX + cmd_name)
723                if hasattr(self, HELP_FUNC_PREFIX + cmd_name):
724                    delattr(self, HELP_FUNC_PREFIX + cmd_name)
725
726            cmdset.on_unregistered()
727            self._installed_command_sets.remove(cmdset)
728
729    def _check_uninstallable(self, cmdset: CommandSet) -> None:
730        methods = inspect.getmembers(
731            cmdset,
732            predicate=lambda meth: isinstance(meth, Callable)  # type: ignore[arg-type]
733            and hasattr(meth, '__name__')
734            and meth.__name__.startswith(COMMAND_FUNC_PREFIX),
735        )
736
737        for method in methods:
738            command_name = method[0][len(COMMAND_FUNC_PREFIX) :]
739
740            # Search for the base command function and verify it has an argparser defined
741            if command_name in self.disabled_commands:
742                command_func = self.disabled_commands[command_name].command_function
743            else:
744                command_func = self.cmd_func(command_name)
745
746            command_parser = cast(argparse.ArgumentParser, getattr(command_func, constants.CMD_ATTR_ARGPARSER, None))
747
748            def check_parser_uninstallable(parser: argparse.ArgumentParser) -> None:
749                for action in parser._actions:
750                    if isinstance(action, argparse._SubParsersAction):
751                        for subparser in action.choices.values():
752                            attached_cmdset = getattr(subparser, constants.PARSER_ATTR_COMMANDSET, None)
753                            if attached_cmdset is not None and attached_cmdset is not cmdset:
754                                raise CommandSetRegistrationError(
755                                    'Cannot uninstall CommandSet when another CommandSet depends on it'
756                                )
757                            check_parser_uninstallable(subparser)
758                        break
759
760            if command_parser is not None:
761                check_parser_uninstallable(command_parser)
762
763    def _register_subcommands(self, cmdset: Union[CommandSet, 'Cmd']) -> None:
764        """
765        Register subcommands with their base command
766
767        :param cmdset: CommandSet or cmd2.Cmd subclass containing subcommands
768        """
769        if not (cmdset is self or cmdset in self._installed_command_sets):
770            raise CommandSetRegistrationError('Cannot register subcommands with an unregistered CommandSet')
771
772        # find methods that have the required attributes necessary to be recognized as a sub-command
773        methods = inspect.getmembers(
774            cmdset,
775            predicate=lambda meth: isinstance(meth, Callable)  # type: ignore[arg-type]
776            and hasattr(meth, constants.SUBCMD_ATTR_NAME)
777            and hasattr(meth, constants.SUBCMD_ATTR_COMMAND)
778            and hasattr(meth, constants.CMD_ATTR_ARGPARSER),
779        )
780
781        # iterate through all matching methods
782        for method_name, method in methods:
783            subcommand_name: str = getattr(method, constants.SUBCMD_ATTR_NAME)
784            full_command_name: str = getattr(method, constants.SUBCMD_ATTR_COMMAND)
785            subcmd_parser = getattr(method, constants.CMD_ATTR_ARGPARSER)
786
787            subcommand_valid, errmsg = self.statement_parser.is_valid_command(subcommand_name, is_subcommand=True)
788            if not subcommand_valid:
789                raise CommandSetRegistrationError(f'Subcommand {str(subcommand_name)} is not valid: {errmsg}')
790
791            command_tokens = full_command_name.split()
792            command_name = command_tokens[0]
793            subcommand_names = command_tokens[1:]
794
795            # Search for the base command function and verify it has an argparser defined
796            if command_name in self.disabled_commands:
797                command_func = self.disabled_commands[command_name].command_function
798            else:
799                command_func = self.cmd_func(command_name)
800
801            if command_func is None:
802                raise CommandSetRegistrationError(
803                    f"Could not find command '{command_name}' needed by subcommand: {str(method)}"
804                )
805            command_parser = getattr(command_func, constants.CMD_ATTR_ARGPARSER, None)
806            if command_parser is None:
807                raise CommandSetRegistrationError(
808                    f"Could not find argparser for command '{command_name}' needed by subcommand: {str(method)}"
809                )
810
811            def find_subcommand(action: argparse.ArgumentParser, subcmd_names: List[str]) -> argparse.ArgumentParser:
812                if not subcmd_names:
813                    return action
814                cur_subcmd = subcmd_names.pop(0)
815                for sub_action in action._actions:
816                    if isinstance(sub_action, argparse._SubParsersAction):
817                        for choice_name, choice in sub_action.choices.items():
818                            if choice_name == cur_subcmd:
819                                return find_subcommand(choice, subcmd_names)
820                        break
821                raise CommandSetRegistrationError(f"Could not find subcommand '{full_command_name}'")
822
823            target_parser = find_subcommand(command_parser, subcommand_names)
824
825            for action in target_parser._actions:
826                if isinstance(action, argparse._SubParsersAction):
827                    # Temporary workaround for avoiding subcommand help text repeatedly getting added to
828                    # action._choices_actions. Until we have instance-specific parser objects, we will remove
829                    # any existing subcommand which has the same name before replacing it. This problem is
830                    # exercised when more than one cmd2.Cmd-based object is created and the same subcommands
831                    # get added each time. Argparse overwrites the previous subcommand but keeps growing the help
832                    # text which is shown by running something like 'alias -h'.
833                    action.remove_parser(subcommand_name)  # type: ignore[arg-type,attr-defined]
834
835                    # Get the kwargs for add_parser()
836                    add_parser_kwargs = getattr(method, constants.SUBCMD_ATTR_ADD_PARSER_KWARGS, {})
837
838                    # Set subcmd_parser as the parent to the parser we're creating to get its arguments
839                    add_parser_kwargs['parents'] = [subcmd_parser]
840
841                    # argparse only copies actions from a parent and not the following settings.
842                    # To retain these settings, we will copy them from subcmd_parser and pass them
843                    # as ArgumentParser constructor arguments to add_parser().
844                    add_parser_kwargs['prog'] = subcmd_parser.prog
845                    add_parser_kwargs['usage'] = subcmd_parser.usage
846                    add_parser_kwargs['description'] = subcmd_parser.description
847                    add_parser_kwargs['epilog'] = subcmd_parser.epilog
848                    add_parser_kwargs['formatter_class'] = subcmd_parser.formatter_class
849                    add_parser_kwargs['prefix_chars'] = subcmd_parser.prefix_chars
850                    add_parser_kwargs['fromfile_prefix_chars'] = subcmd_parser.fromfile_prefix_chars
851                    add_parser_kwargs['argument_default'] = subcmd_parser.argument_default
852                    add_parser_kwargs['conflict_handler'] = subcmd_parser.conflict_handler
853                    add_parser_kwargs['allow_abbrev'] = subcmd_parser.allow_abbrev
854
855                    # Set add_help to False and use whatever help option subcmd_parser already has
856                    add_parser_kwargs['add_help'] = False
857
858                    attached_parser = action.add_parser(subcommand_name, **add_parser_kwargs)
859
860                    # Set the subcommand handler
861                    defaults = {constants.NS_ATTR_SUBCMD_HANDLER: method}
862                    attached_parser.set_defaults(**defaults)
863
864                    # Copy value for custom ArgparseCompleter type, which will be None if not present on subcmd_parser
865                    attached_parser.set_ap_completer_type(subcmd_parser.get_ap_completer_type())  # type: ignore[attr-defined]
866
867                    # Set what instance the handler is bound to
868                    setattr(attached_parser, constants.PARSER_ATTR_COMMANDSET, cmdset)
869                    break
870
871    def _unregister_subcommands(self, cmdset: Union[CommandSet, 'Cmd']) -> None:
872        """
873        Unregister subcommands from their base command
874
875        :param cmdset: CommandSet containing subcommands
876        """
877        if not (cmdset is self or cmdset in self._installed_command_sets):
878            raise CommandSetRegistrationError('Cannot unregister subcommands with an unregistered CommandSet')
879
880        # find methods that have the required attributes necessary to be recognized as a sub-command
881        methods = inspect.getmembers(
882            cmdset,
883            predicate=lambda meth: isinstance(meth, Callable)  # type: ignore[arg-type]
884            and hasattr(meth, constants.SUBCMD_ATTR_NAME)
885            and hasattr(meth, constants.SUBCMD_ATTR_COMMAND)
886            and hasattr(meth, constants.CMD_ATTR_ARGPARSER),
887        )
888
889        # iterate through all matching methods
890        for method_name, method in methods:
891            subcommand_name = getattr(method, constants.SUBCMD_ATTR_NAME)
892            command_name = getattr(method, constants.SUBCMD_ATTR_COMMAND)
893
894            # Search for the base command function and verify it has an argparser defined
895            if command_name in self.disabled_commands:
896                command_func = self.disabled_commands[command_name].command_function
897            else:
898                command_func = self.cmd_func(command_name)
899
900            if command_func is None:  # pragma: no cover
901                # This really shouldn't be possible since _register_subcommands would prevent this from happening
902                # but keeping in case it does for some strange reason
903                raise CommandSetRegistrationError(
904                    f"Could not find command '{command_name}' needed by subcommand: {str(method)}"
905                )
906            command_parser = getattr(command_func, constants.CMD_ATTR_ARGPARSER, None)
907            if command_parser is None:  # pragma: no cover
908                # This really shouldn't be possible since _register_subcommands would prevent this from happening
909                # but keeping in case it does for some strange reason
910                raise CommandSetRegistrationError(
911                    f"Could not find argparser for command '{command_name}' needed by subcommand: {str(method)}"
912                )
913
914            for action in command_parser._actions:
915                if isinstance(action, argparse._SubParsersAction):
916                    action.remove_parser(subcommand_name)  # type: ignore[arg-type,attr-defined]
917                    break
918
919    @property
920    def always_prefix_settables(self) -> bool:
921        """
922        Flags whether CommandSet settable values should always be prefixed
923
924        :return: True if CommandSet settable values will always be prefixed. False if not.
925        """
926        return self._always_prefix_settables
927
928    @always_prefix_settables.setter
929    def always_prefix_settables(self, new_value: bool) -> None:
930        """
931        Set whether CommandSet settable values should always be prefixed.
932
933        :param new_value: True if CommandSet settable values should always be prefixed. False if not.
934        :raises ValueError: If a registered CommandSet does not have a defined prefix
935        """
936        if not self._always_prefix_settables and new_value:
937            for cmd_set in self._installed_command_sets:
938                if not cmd_set.settable_prefix:
939                    raise ValueError(
940                        f'Cannot force settable prefixes. CommandSet {cmd_set.__class__.__name__} does '
941                        f'not have a settable prefix defined.'
942                    )
943        self._always_prefix_settables = new_value
944
945    @property
946    def settables(self) -> Mapping[str, Settable]:
947        """
948        Get all available user-settable attributes. This includes settables defined in installed CommandSets
949
950        :return: Mapping from attribute-name to Settable of all user-settable attributes from
951        """
952        all_settables = dict(self._settables)
953        for cmd_set in self._installed_command_sets:
954            cmdset_settables = cmd_set.settables
955            for settable_name, settable in cmdset_settables.items():
956                if self.always_prefix_settables:
957                    all_settables[f'{cmd_set.settable_prefix}.{settable_name}'] = settable
958                else:
959                    all_settables[settable_name] = settable
960        return all_settables
961
962    def add_settable(self, settable: Settable) -> None:
963        """
964        Add a settable parameter to ``self.settables``
965
966        :param settable: Settable object being added
967        """
968        if not self.always_prefix_settables:
969            if settable.name in self.settables.keys() and settable.name not in self._settables.keys():
970                raise KeyError(f'Duplicate settable: {settable.name}')
971        self._settables[settable.name] = settable
972
973    def remove_settable(self, name: str) -> None:
974        """
975        Convenience method for removing a settable parameter from ``self.settables``
976
977        :param name: name of the settable being removed
978        :raises: KeyError if the Settable matches this name
979        """
980        try:
981            del self._settables[name]
982        except KeyError:
983            raise KeyError(name + " is not a settable parameter")
984
985    def build_settables(self) -> None:
986        """Create the dictionary of user-settable parameters"""
987
988        def get_allow_style_choices(cli_self: Cmd) -> List[str]:
989            """Used to tab complete allow_style values"""
990            return [val.name.lower() for val in ansi.AllowStyle]
991
992        def allow_style_type(value: str) -> ansi.AllowStyle:
993            """Converts a string value into an ansi.AllowStyle"""
994            try:
995                return ansi.AllowStyle[value.upper()]
996            except KeyError:
997                raise ValueError(
998                    f"must be {ansi.AllowStyle.ALWAYS}, {ansi.AllowStyle.NEVER}, or "
999                    f"{ansi.AllowStyle.TERMINAL} (case-insensitive)"
1000                )
1001
1002        self.add_settable(
1003            Settable(
1004                'allow_style',
1005                allow_style_type,
1006                'Allow ANSI text style sequences in output (valid values: '
1007                f'{ansi.AllowStyle.ALWAYS}, {ansi.AllowStyle.NEVER}, {ansi.AllowStyle.TERMINAL})',
1008                self,
1009                choices_provider=cast(ChoicesProviderFunc, get_allow_style_choices),
1010            )
1011        )
1012
1013        self.add_settable(
1014            Settable(
1015                'always_show_hint',
1016                bool,
1017                'Display tab completion hint even when completion suggestions print',
1018                self,
1019            )
1020        )
1021        self.add_settable(Settable('debug', bool, "Show full traceback on exception", self))
1022        self.add_settable(Settable('echo', bool, "Echo command issued into output", self))
1023        self.add_settable(Settable('editor', str, "Program used by 'edit'", self))
1024        self.add_settable(Settable('feedback_to_output', bool, "Include nonessentials in '|', '>' results", self))
1025        self.add_settable(
1026            Settable('max_completion_items', int, "Maximum number of CompletionItems to display during tab completion", self)
1027        )
1028        self.add_settable(Settable('quiet', bool, "Don't print nonessential feedback", self))
1029        self.add_settable(Settable('timing', bool, "Report execution times", self))
1030
1031    # -----  Methods related to presenting output to the user -----
1032
1033    @property
1034    def allow_style(self) -> ansi.AllowStyle:
1035        """Read-only property needed to support do_set when it reads allow_style"""
1036        return ansi.allow_style
1037
1038    @allow_style.setter
1039    def allow_style(self, new_val: ansi.AllowStyle) -> None:
1040        """Setter property needed to support do_set when it updates allow_style"""
1041        ansi.allow_style = new_val
1042
1043    def _completion_supported(self) -> bool:
1044        """Return whether tab completion is supported"""
1045        return self.use_rawinput and bool(self.completekey) and rl_type != RlType.NONE
1046
1047    @property
1048    def visible_prompt(self) -> str:
1049        """Read-only property to get the visible prompt with any ANSI style escape codes stripped.
1050
1051        Used by transcript testing to make it easier and more reliable when users are doing things like coloring the
1052        prompt using ANSI color codes.
1053
1054        :return: prompt stripped of any ANSI escape codes
1055        """
1056        return ansi.strip_style(self.prompt)
1057
1058    def poutput(self, msg: Any = '', *, end: str = '\n') -> None:
1059        """Print message to self.stdout and appends a newline by default
1060
1061        Also handles BrokenPipeError exceptions for when a command's output has
1062        been piped to another process and that process terminates before the
1063        cmd2 command is finished executing.
1064
1065        :param msg: object to print
1066        :param end: string appended after the end of the message, default a newline
1067        """
1068        try:
1069            ansi.style_aware_write(self.stdout, f"{msg}{end}")
1070        except BrokenPipeError:
1071            # This occurs if a command's output is being piped to another
1072            # process and that process closes before the command is
1073            # finished. If you would like your application to print a
1074            # warning message, then set the broken_pipe_warning attribute
1075            # to the message you want printed.
1076            if self.broken_pipe_warning:
1077                sys.stderr.write(self.broken_pipe_warning)
1078
1079    # noinspection PyMethodMayBeStatic
1080    def perror(self, msg: Any = '', *, end: str = '\n', apply_style: bool = True) -> None:
1081        """Print message to sys.stderr
1082
1083        :param msg: object to print
1084        :param end: string appended after the end of the message, default a newline
1085        :param apply_style: If True, then ansi.style_error will be applied to the message text. Set to False in cases
1086                            where the message text already has the desired style. Defaults to True.
1087        """
1088        if apply_style:
1089            final_msg = ansi.style_error(msg)
1090        else:
1091            final_msg = str(msg)
1092        ansi.style_aware_write(sys.stderr, final_msg + end)
1093
1094    def pwarning(self, msg: Any = '', *, end: str = '\n', apply_style: bool = True) -> None:
1095        """Wraps perror, but applies ansi.style_warning by default
1096
1097        :param msg: object to print
1098        :param end: string appended after the end of the message, default a newline
1099        :param apply_style: If True, then ansi.style_warning will be applied to the message text. Set to False in cases
1100                            where the message text already has the desired style. Defaults to True.
1101        """
1102        if apply_style:
1103            msg = ansi.style_warning(msg)
1104        self.perror(msg, end=end, apply_style=False)
1105
1106    def pexcept(self, msg: Any, *, end: str = '\n', apply_style: bool = True) -> None:
1107        """Print Exception message to sys.stderr. If debug is true, print exception traceback if one exists.
1108
1109        :param msg: message or Exception to print
1110        :param end: string appended after the end of the message, default a newline
1111        :param apply_style: If True, then ansi.style_error will be applied to the message text. Set to False in cases
1112                            where the message text already has the desired style. Defaults to True.
1113        """
1114        if self.debug and sys.exc_info() != (None, None, None):
1115            import traceback
1116
1117            traceback.print_exc()
1118
1119        if isinstance(msg, Exception):
1120            final_msg = f"EXCEPTION of type '{type(msg).__name__}' occurred with message: {msg}"
1121        else:
1122            final_msg = str(msg)
1123
1124        if apply_style:
1125            final_msg = ansi.style_error(final_msg)
1126
1127        if not self.debug and 'debug' in self.settables:
1128            warning = "\nTo enable full traceback, run the following command: 'set debug true'"
1129            final_msg += ansi.style_warning(warning)
1130
1131        self.perror(final_msg, end=end, apply_style=False)
1132
1133    def pfeedback(self, msg: Any, *, end: str = '\n') -> None:
1134        """For printing nonessential feedback.  Can be silenced with `quiet`.
1135        Inclusion in redirected output is controlled by `feedback_to_output`.
1136
1137        :param msg: object to print
1138        :param end: string appended after the end of the message, default a newline
1139        """
1140        if not self.quiet:
1141            if self.feedback_to_output:
1142                self.poutput(msg, end=end)
1143            else:
1144                self.perror(msg, end=end, apply_style=False)
1145
1146    def ppaged(self, msg: Any, *, end: str = '\n', chop: bool = False) -> None:
1147        """Print output using a pager if it would go off screen and stdout isn't currently being redirected.
1148
1149        Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when
1150        stdout or stdin are not a fully functional terminal.
1151
1152        :param msg: object to print
1153        :param end: string appended after the end of the message, default a newline
1154        :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped
1155                              - truncated text is still accessible by scrolling with the right & left arrow keys
1156                              - chopping is ideal for displaying wide tabular data as is done in utilities like pgcli
1157                     False -> causes lines longer than the screen width to wrap to the next line
1158                              - wrapping is ideal when you want to keep users from having to use horizontal scrolling
1159
1160        WARNING: On Windows, the text always wraps regardless of what the chop argument is set to
1161        """
1162        # msg can be any type, so convert to string before checking if it's blank
1163        msg_str = str(msg)
1164
1165        # Consider None to be no data to print
1166        if msg is None or msg_str == '':
1167            return
1168
1169        try:
1170            import subprocess
1171
1172            # Attempt to detect if we are not running within a fully functional terminal.
1173            # Don't try to use the pager when being run by a continuous integration system like Jenkins + pexpect.
1174            functional_terminal = False
1175
1176            if self.stdin.isatty() and self.stdout.isatty():
1177                if sys.platform.startswith('win') or os.environ.get('TERM') is not None:
1178                    functional_terminal = True
1179
1180            # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python)
1181            # Also only attempt to use a pager if actually running in a real fully functional terminal
1182            if functional_terminal and not self._redirecting and not self.in_pyscript() and not self.in_script():
1183                if ansi.allow_style == ansi.AllowStyle.NEVER:
1184                    msg_str = ansi.strip_style(msg_str)
1185                msg_str += end
1186
1187                pager = self.pager
1188                if chop:
1189                    pager = self.pager_chop
1190
1191                # Prevent KeyboardInterrupts while in the pager. The pager application will
1192                # still receive the SIGINT since it is in the same process group as us.
1193                with self.sigint_protection:
1194                    pipe_proc = subprocess.Popen(pager, shell=True, stdin=subprocess.PIPE)
1195                    pipe_proc.communicate(msg_str.encode('utf-8', 'replace'))
1196            else:
1197                self.poutput(msg_str, end=end)
1198        except BrokenPipeError:
1199            # This occurs if a command's output is being piped to another process and that process closes before the
1200            # command is finished. If you would like your application to print a warning message, then set the
1201            # broken_pipe_warning attribute to the message you want printed.`
1202            if self.broken_pipe_warning:
1203                sys.stderr.write(self.broken_pipe_warning)
1204
1205    # -----  Methods related to tab completion -----
1206
1207    def _reset_completion_defaults(self) -> None:
1208        """
1209        Resets tab completion settings
1210        Needs to be called each time readline runs tab completion
1211        """
1212        self.allow_appended_space = True
1213        self.allow_closing_quote = True
1214        self.completion_hint = ''
1215        self.formatted_completions = ''
1216        self.completion_matches = []
1217        self.display_matches = []
1218        self.matches_delimited = False
1219        self.matches_sorted = False
1220
1221        if rl_type == RlType.GNU:
1222            readline.set_completion_display_matches_hook(self._display_matches_gnu_readline)
1223        elif rl_type == RlType.PYREADLINE:
1224            # noinspection PyUnresolvedReferences
1225            readline.rl.mode._display_completions = self._display_matches_pyreadline
1226
1227    def tokens_for_completion(self, line: str, begidx: int, endidx: int) -> Tuple[List[str], List[str]]:
1228        """Used by tab completion functions to get all tokens through the one being completed.
1229
1230        :param line: the current input line with leading whitespace removed
1231        :param begidx: the beginning index of the prefix text
1232        :param endidx: the ending index of the prefix text
1233        :return: A 2 item tuple where the items are
1234                 **On Success**
1235                 - tokens: list of unquoted tokens - this is generally the list needed for tab completion functions
1236                 - raw_tokens: list of tokens with any quotes preserved = this can be used to know if a token was quoted
1237                 or is missing a closing quote
1238                 Both lists are guaranteed to have at least 1 item. The last item in both lists is the token being tab
1239                 completed
1240                 **On Failure**
1241                 - Two empty lists
1242        """
1243        import copy
1244
1245        unclosed_quote = ''
1246        quotes_to_try = copy.copy(constants.QUOTES)
1247
1248        tmp_line = line[:endidx]
1249        tmp_endidx = endidx
1250
1251        # Parse the line into tokens
1252        while True:
1253            try:
1254                initial_tokens = shlex_split(tmp_line[:tmp_endidx])
1255
1256                # If the cursor is at an empty token outside of a quoted string,
1257                # then that is the token being completed. Add it to the list.
1258                if not unclosed_quote and begidx == tmp_endidx:
1259                    initial_tokens.append('')
1260                break
1261            except ValueError as ex:
1262                # Make sure the exception was due to an unclosed quote and
1263                # we haven't exhausted the closing quotes to try
1264                if str(ex) == "No closing quotation" and quotes_to_try:
1265                    # Add a closing quote and try to parse again
1266                    unclosed_quote = quotes_to_try[0]
1267                    quotes_to_try = quotes_to_try[1:]
1268
1269                    tmp_line = line[:endidx]
1270                    tmp_line += unclosed_quote
1271                    tmp_endidx = endidx + 1
1272                else:  # pragma: no cover
1273                    # The parsing error is not caused by unclosed quotes.
1274                    # Return empty lists since this means the line is malformed.
1275                    return [], []
1276
1277        # Further split tokens on punctuation characters
1278        raw_tokens = self.statement_parser.split_on_punctuation(initial_tokens)
1279
1280        # Save the unquoted tokens
1281        tokens = [utils.strip_quotes(cur_token) for cur_token in raw_tokens]
1282
1283        # If the token being completed had an unclosed quote, we need
1284        # to remove the closing quote that was added in order for it
1285        # to match what was on the command line.
1286        if unclosed_quote:
1287            raw_tokens[-1] = raw_tokens[-1][:-1]
1288
1289        return tokens, raw_tokens
1290
1291    # noinspection PyMethodMayBeStatic, PyUnusedLocal
1292    def basic_complete(
1293        self,
1294        text: str,
1295        line: str,
1296        begidx: int,
1297        endidx: int,
1298        match_against: Iterable[str],
1299    ) -> List[str]:
1300        """
1301        Basic tab completion function that matches against a list of strings without considering line contents
1302        or cursor position. The args required by this function are defined in the header of Python's cmd.py.
1303
1304        :param text: the string prefix we are attempting to match (all matches must begin with it)
1305        :param line: the current input line with leading whitespace removed
1306        :param begidx: the beginning index of the prefix text
1307        :param endidx: the ending index of the prefix text
1308        :param match_against: the strings being matched against
1309        :return: a list of possible tab completions
1310        """
1311        return [cur_match for cur_match in match_against if cur_match.startswith(text)]
1312
1313    def delimiter_complete(
1314        self,
1315        text: str,
1316        line: str,
1317        begidx: int,
1318        endidx: int,
1319        match_against: Iterable[str],
1320        delimiter: str,
1321    ) -> List[str]:
1322        """
1323        Performs tab completion against a list but each match is split on a delimiter and only
1324        the portion of the match being tab completed is shown as the completion suggestions.
1325        This is useful if you match against strings that are hierarchical in nature and have a
1326        common delimiter.
1327
1328        An easy way to illustrate this concept is path completion since paths are just directories/files
1329        delimited by a slash. If you are tab completing items in /home/user you don't get the following
1330        as suggestions:
1331
1332        /home/user/file.txt     /home/user/program.c
1333        /home/user/maps/        /home/user/cmd2.py
1334
1335        Instead you are shown:
1336
1337        file.txt                program.c
1338        maps/                   cmd2.py
1339
1340        For a large set of data, this can be visually more pleasing and easier to search.
1341
1342        Another example would be strings formatted with the following syntax: company::department::name
1343        In this case the delimiter would be :: and the user could easily narrow down what they are looking
1344        for if they were only shown suggestions in the category they are at in the string.
1345
1346        :param text: the string prefix we are attempting to match (all matches must begin with it)
1347        :param line: the current input line with leading whitespace removed
1348        :param begidx: the beginning index of the prefix text
1349        :param endidx: the ending index of the prefix text
1350        :param match_against: the list being matched against
1351        :param delimiter: what delimits each portion of the matches (ex: paths are delimited by a slash)
1352        :return: a list of possible tab completions
1353        """
1354        matches = self.basic_complete(text, line, begidx, endidx, match_against)
1355
1356        # Display only the portion of the match that's being completed based on delimiter
1357        if matches:
1358            # Set this to True for proper quoting of matches with spaces
1359            self.matches_delimited = True
1360
1361            # Get the common beginning for the matches
1362            common_prefix = os.path.commonprefix(matches)
1363            prefix_tokens = common_prefix.split(delimiter)
1364
1365            # Calculate what portion of the match we are completing
1366            display_token_index = 0
1367            if prefix_tokens:
1368                display_token_index = len(prefix_tokens) - 1
1369
1370            # Get this portion for each match and store them in self.display_matches
1371            for cur_match in matches:
1372                match_tokens = cur_match.split(delimiter)
1373                display_token = match_tokens[display_token_index]
1374
1375                if not display_token:
1376                    display_token = delimiter
1377                self.display_matches.append(display_token)
1378
1379        return matches
1380
1381    def flag_based_complete(
1382        self,
1383        text: str,
1384        line: str,
1385        begidx: int,
1386        endidx: int,
1387        flag_dict: Dict[str, Union[Iterable[str], CompleterFunc]],
1388        *,
1389        all_else: Union[None, Iterable[str], CompleterFunc] = None,
1390    ) -> List[str]:
1391        """Tab completes based on a particular flag preceding the token being completed.
1392
1393        :param text: the string prefix we are attempting to match (all matches must begin with it)
1394        :param line: the current input line with leading whitespace removed
1395        :param begidx: the beginning index of the prefix text
1396        :param endidx: the ending index of the prefix text
1397        :param flag_dict: dictionary whose structure is the following:
1398                          `keys` - flags (ex: -c, --create) that result in tab completion for the next argument in the
1399                          command line
1400                          `values` - there are two types of values:
1401                          1. iterable list of strings to match against (dictionaries, lists, etc.)
1402                          2. function that performs tab completion (ex: path_complete)
1403        :param all_else: an optional parameter for tab completing any token that isn't preceded by a flag in flag_dict
1404        :return: a list of possible tab completions
1405        """
1406        # Get all tokens through the one being completed
1407        tokens, _ = self.tokens_for_completion(line, begidx, endidx)
1408        if not tokens:  # pragma: no cover
1409            return []
1410
1411        completions_matches = []
1412        match_against = all_else
1413
1414        # Must have at least 2 args for a flag to precede the token being completed
1415        if len(tokens) > 1:
1416            flag = tokens[-2]
1417            if flag in flag_dict:
1418                match_against = flag_dict[flag]
1419
1420        # Perform tab completion using an Iterable
1421        if isinstance(match_against, Iterable):
1422            completions_matches = self.basic_complete(text, line, begidx, endidx, match_against)
1423
1424        # Perform tab completion using a function
1425        elif callable(match_against):
1426            completions_matches = match_against(text, line, begidx, endidx)
1427
1428        return completions_matches
1429
1430    def index_based_complete(
1431        self,
1432        text: str,
1433        line: str,
1434        begidx: int,
1435        endidx: int,
1436        index_dict: Mapping[int, Union[Iterable[str], CompleterFunc]],
1437        *,
1438        all_else: Optional[Union[Iterable[str], CompleterFunc]] = None,
1439    ) -> List[str]:
1440        """Tab completes based on a fixed position in the input string.
1441
1442        :param text: the string prefix we are attempting to match (all matches must begin with it)
1443        :param line: the current input line with leading whitespace removed
1444        :param begidx: the beginning index of the prefix text
1445        :param endidx: the ending index of the prefix text
1446        :param index_dict: dictionary whose structure is the following:
1447                           `keys` - 0-based token indexes into command line that determine which tokens perform tab
1448                           completion
1449                           `values` - there are two types of values:
1450                           1. iterable list of strings to match against (dictionaries, lists, etc.)
1451                           2. function that performs tab completion (ex: path_complete)
1452        :param all_else: an optional parameter for tab completing any token that isn't at an index in index_dict
1453        :return: a list of possible tab completions
1454        """
1455        # Get all tokens through the one being completed
1456        tokens, _ = self.tokens_for_completion(line, begidx, endidx)
1457        if not tokens:  # pragma: no cover
1458            return []
1459
1460        matches = []
1461
1462        # Get the index of the token being completed
1463        index = len(tokens) - 1
1464
1465        # Check if token is at an index in the dictionary
1466        match_against: Optional[Union[Iterable[str], CompleterFunc]]
1467        if index in index_dict:
1468            match_against = index_dict[index]
1469        else:
1470            match_against = all_else
1471
1472        # Perform tab completion using a Iterable
1473        if isinstance(match_against, Iterable):
1474            matches = self.basic_complete(text, line, begidx, endidx, match_against)
1475
1476        # Perform tab completion using a function
1477        elif callable(match_against):
1478            matches = match_against(text, line, begidx, endidx)
1479
1480        return matches
1481
1482    # noinspection PyUnusedLocal
1483    def path_complete(
1484        self, text: str, line: str, begidx: int, endidx: int, *, path_filter: Optional[Callable[[str], bool]] = None
1485    ) -> List[str]:
1486        """Performs completion of local file system paths
1487
1488        :param text: the string prefix we are attempting to match (all matches must begin with it)
1489        :param line: the current input line with leading whitespace removed
1490        :param begidx: the beginning index of the prefix text
1491        :param endidx: the ending index of the prefix text
1492        :param path_filter: optional filter function that determines if a path belongs in the results
1493                            this function takes a path as its argument and returns True if the path should
1494                            be kept in the results
1495        :return: a list of possible tab completions
1496        """
1497
1498        # Used to complete ~ and ~user strings
1499        def complete_users() -> List[str]:
1500
1501            # We are returning ~user strings that resolve to directories,
1502            # so don't append a space or quote in the case of a single result.
1503            self.allow_appended_space = False
1504            self.allow_closing_quote = False
1505
1506            users = []
1507
1508            # Windows lacks the pwd module so we can't get a list of users.
1509            # Instead we will return a result once the user enters text that
1510            # resolves to an existing home directory.
1511            if sys.platform.startswith('win'):
1512                expanded_path = os.path.expanduser(text)
1513                if os.path.isdir(expanded_path):
1514                    user = text
1515                    if add_trailing_sep_if_dir:
1516                        user += os.path.sep
1517                    users.append(user)
1518            else:
1519                import pwd
1520
1521                # Iterate through a list of users from the password database
1522                for cur_pw in pwd.getpwall():
1523
1524                    # Check if the user has an existing home dir
1525                    if os.path.isdir(cur_pw.pw_dir):
1526
1527                        # Add a ~ to the user to match against text
1528                        cur_user = '~' + cur_pw.pw_name
1529                        if cur_user.startswith(text):
1530                            if add_trailing_sep_if_dir:
1531                                cur_user += os.path.sep
1532                            users.append(cur_user)
1533
1534            return users
1535
1536        # Determine if a trailing separator should be appended to directory completions
1537        add_trailing_sep_if_dir = False
1538        if endidx == len(line) or (endidx < len(line) and line[endidx] != os.path.sep):
1539            add_trailing_sep_if_dir = True
1540
1541        # Used to replace cwd in the final results
1542        cwd = os.getcwd()
1543        cwd_added = False
1544
1545        # Used to replace expanded user path in final result
1546        orig_tilde_path = ''
1547        expanded_tilde_path = ''
1548
1549        # If the search text is blank, then search in the CWD for *
1550        if not text:
1551            search_str = os.path.join(os.getcwd(), '*')
1552            cwd_added = True
1553        else:
1554            # Purposely don't match any path containing wildcards
1555            wildcards = ['*', '?']
1556            for wildcard in wildcards:
1557                if wildcard in text:
1558                    return []
1559
1560            # Start the search string
1561            search_str = text + '*'
1562
1563            # Handle tilde expansion and completion
1564            if text.startswith('~'):
1565                sep_index = text.find(os.path.sep, 1)
1566
1567                # If there is no slash, then the user is still completing the user after the tilde
1568                if sep_index == -1:
1569                    return complete_users()
1570
1571                # Otherwise expand the user dir
1572                else:
1573                    search_str = os.path.expanduser(search_str)
1574
1575                    # Get what we need to restore the original tilde path later
1576                    orig_tilde_path = text[:sep_index]
1577                    expanded_tilde_path = os.path.expanduser(orig_tilde_path)
1578
1579            # If the search text does not have a directory, then use the cwd
1580            elif not os.path.dirname(text):
1581                search_str = os.path.join(os.getcwd(), search_str)
1582                cwd_added = True
1583
1584        # Set this to True for proper quoting of paths with spaces
1585        self.matches_delimited = True
1586
1587        # Find all matching path completions
1588        matches = glob.glob(search_str)
1589
1590        # Filter out results that don't belong
1591        if path_filter is not None:
1592            matches = [c for c in matches if path_filter(c)]
1593
1594        # Don't append a space or closing quote to directory
1595        if len(matches) == 1 and os.path.isdir(matches[0]):
1596            self.allow_appended_space = False
1597            self.allow_closing_quote = False
1598
1599        # Sort the matches before any trailing slashes are added
1600        matches.sort(key=self.default_sort_key)
1601        self.matches_sorted = True
1602
1603        # Build display_matches and add a slash to directories
1604        for index, cur_match in enumerate(matches):
1605
1606            # Display only the basename of this path in the tab completion suggestions
1607            self.display_matches.append(os.path.basename(cur_match))
1608
1609            # Add a separator after directories if the next character isn't already a separator
1610            if os.path.isdir(cur_match) and add_trailing_sep_if_dir:
1611                matches[index] += os.path.sep
1612                self.display_matches[index] += os.path.sep
1613
1614        # Remove cwd if it was added to match the text readline expects
1615        if cwd_added:
1616            if cwd == os.path.sep:
1617                to_replace = cwd
1618            else:
1619                to_replace = cwd + os.path.sep
1620            matches = [cur_path.replace(to_replace, '', 1) for cur_path in matches]
1621
1622        # Restore the tilde string if we expanded one to match the text readline expects
1623        if expanded_tilde_path:
1624            matches = [cur_path.replace(expanded_tilde_path, orig_tilde_path, 1) for cur_path in matches]
1625
1626        return matches
1627
1628    def shell_cmd_complete(self, text: str, line: str, begidx: int, endidx: int, *, complete_blank: bool = False) -> List[str]:
1629        """Performs completion of executables either in a user's path or a given path
1630
1631        :param text: the string prefix we are attempting to match (all matches must begin with it)
1632        :param line: the current input line with leading whitespace removed
1633        :param begidx: the beginning index of the prefix text
1634        :param endidx: the ending index of the prefix text
1635        :param complete_blank: If True, then a blank will complete all shell commands in a user's path. If False, then
1636                               no completion is performed. Defaults to False to match Bash shell behavior.
1637        :return: a list of possible tab completions
1638        """
1639        # Don't tab complete anything if no shell command has been started
1640        if not complete_blank and not text:
1641            return []
1642
1643        # If there are no path characters in the search text, then do shell command completion in the user's path
1644        if not text.startswith('~') and os.path.sep not in text:
1645            return utils.get_exes_in_path(text)
1646
1647        # Otherwise look for executables in the given path
1648        else:
1649            return self.path_complete(
1650                text, line, begidx, endidx, path_filter=lambda path: os.path.isdir(path) or os.access(path, os.X_OK)
1651            )
1652
1653    def _redirect_complete(self, text: str, line: str, begidx: int, endidx: int, compfunc: CompleterFunc) -> List[str]:
1654        """Called by complete() as the first tab completion function for all commands
1655        It determines if it should tab complete for redirection (|, >, >>) or use the
1656        completer function for the current command
1657
1658        :param text: the string prefix we are attempting to match (all matches must begin with it)
1659        :param line: the current input line with leading whitespace removed
1660        :param begidx: the beginning index of the prefix text
1661        :param endidx: the ending index of the prefix text
1662        :param compfunc: the completer function for the current command
1663                         this will be called if we aren't completing for redirection
1664        :return: a list of possible tab completions
1665        """
1666        # Get all tokens through the one being completed. We want the raw tokens
1667        # so we can tell if redirection strings are quoted and ignore them.
1668        _, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
1669        if not raw_tokens:  # pragma: no cover
1670            return []
1671
1672        # Must at least have the command
1673        if len(raw_tokens) > 1:
1674
1675            # True when command line contains any redirection tokens
1676            has_redirection = False
1677
1678            # Keep track of state while examining tokens
1679            in_pipe = False
1680            in_file_redir = False
1681            do_shell_completion = False
1682            do_path_completion = False
1683            prior_token = None
1684
1685            for cur_token in raw_tokens:
1686                # Process redirection tokens
1687                if cur_token in constants.REDIRECTION_TOKENS:
1688                    has_redirection = True
1689
1690                    # Check if we are at a pipe
1691                    if cur_token == constants.REDIRECTION_PIPE:
1692                        # Do not complete bad syntax (e.g cmd | |)
1693                        if prior_token == constants.REDIRECTION_PIPE:
1694                            return []
1695
1696                        in_pipe = True
1697                        in_file_redir = False
1698
1699                    # Otherwise this is a file redirection token
1700                    else:
1701                        if prior_token in constants.REDIRECTION_TOKENS or in_file_redir:
1702                            # Do not complete bad syntax (e.g cmd | >) (e.g cmd > blah >)
1703                            return []
1704
1705                        in_pipe = False
1706                        in_file_redir = True
1707
1708                # Only tab complete after redirection tokens if redirection is allowed
1709                elif self.allow_redirection:
1710                    do_shell_completion = False
1711                    do_path_completion = False
1712
1713                    if prior_token == constants.REDIRECTION_PIPE:
1714                        do_shell_completion = True
1715                    elif in_pipe or prior_token in (constants.REDIRECTION_OUTPUT, constants.REDIRECTION_APPEND):
1716                        do_path_completion = True
1717
1718                prior_token = cur_token
1719
1720            if do_shell_completion:
1721                return self.shell_cmd_complete(text, line, begidx, endidx)
1722
1723            elif do_path_completion:
1724                return self.path_complete(text, line, begidx, endidx)
1725
1726            # If there were redirection strings anywhere on the command line, then we
1727            # are no longer tab completing for the current command
1728            elif has_redirection:
1729                return []
1730
1731        # Call the command's completer function
1732        return compfunc(text, line, begidx, endidx)
1733
1734    @staticmethod
1735    def _pad_matches_to_display(matches_to_display: List[str]) -> Tuple[List[str], int]:  # pragma: no cover
1736        """Adds padding to the matches being displayed as tab completion suggestions.
1737        The default padding of readline/pyreadine is small and not visually appealing
1738        especially if matches have spaces. It appears very squished together.
1739
1740        :param matches_to_display: the matches being padded
1741        :return: the padded matches and length of padding that was added
1742        """
1743        if rl_type == RlType.GNU:
1744            # Add 2 to the padding of 2 that readline uses for a total of 4.
1745            padding = 2 * ' '
1746
1747        elif rl_type == RlType.PYREADLINE:
1748            # Add 3 to the padding of 1 that pyreadline uses for a total of 4.
1749            padding = 3 * ' '
1750
1751        else:
1752            return matches_to_display, 0
1753
1754        return [cur_match + padding for cur_match in matches_to_display], len(padding)
1755
1756    def _display_matches_gnu_readline(
1757        self, substitution: str, matches: List[str], longest_match_length: int
1758    ) -> None:  # pragma: no cover
1759        """Prints a match list using GNU readline's rl_display_match_list()
1760
1761        :param substitution: the substitution written to the command line
1762        :param matches: the tab completion matches to display
1763        :param longest_match_length: longest printed length of the matches
1764        """
1765        if rl_type == RlType.GNU:
1766
1767            # Print hint if one exists and we are supposed to display it
1768            hint_printed = False
1769            if self.always_show_hint and self.completion_hint:
1770                hint_printed = True
1771                sys.stdout.write('\n' + self.completion_hint)
1772
1773            # Check if we already have formatted results to print
1774            if self.formatted_completions:
1775                if not hint_printed:
1776                    sys.stdout.write('\n')
1777                sys.stdout.write('\n' + self.formatted_completions + '\n\n')
1778
1779            # Otherwise use readline's formatter
1780            else:
1781                # Check if we should show display_matches
1782                if self.display_matches:
1783                    matches_to_display = self.display_matches
1784
1785                    # Recalculate longest_match_length for display_matches
1786                    longest_match_length = 0
1787
1788                    for cur_match in matches_to_display:
1789                        cur_length = ansi.style_aware_wcswidth(cur_match)
1790                        if cur_length > longest_match_length:
1791                            longest_match_length = cur_length
1792                else:
1793                    matches_to_display = matches
1794
1795                # Add padding for visual appeal
1796                matches_to_display, padding_length = self._pad_matches_to_display(matches_to_display)
1797                longest_match_length += padding_length
1798
1799                # We will use readline's display function (rl_display_match_list()), so we
1800                # need to encode our string as bytes to place in a C array.
1801                encoded_substitution = bytes(substitution, encoding='utf-8')
1802                encoded_matches = [bytes(cur_match, encoding='utf-8') for cur_match in matches_to_display]
1803
1804                # rl_display_match_list() expects matches to be in argv format where
1805                # substitution is the first element, followed by the matches, and then a NULL.
1806                # noinspection PyCallingNonCallable,PyTypeChecker
1807                strings_array = cast(List[Optional[bytes]], (ctypes.c_char_p * (1 + len(encoded_matches) + 1))())
1808
1809                # Copy in the encoded strings and add a NULL to the end
1810                strings_array[0] = encoded_substitution
1811                strings_array[1:-1] = encoded_matches
1812                strings_array[-1] = None
1813
1814                # rl_display_match_list(strings_array, number of completion matches, longest match length)
1815                readline_lib.rl_display_match_list(strings_array, len(encoded_matches), longest_match_length)
1816
1817            # Redraw prompt and input line
1818            rl_force_redisplay()
1819
1820    def _display_matches_pyreadline(self, matches: List[str]) -> None:  # pragma: no cover
1821        """Prints a match list using pyreadline's _display_completions()
1822
1823        :param matches: the tab completion matches to display
1824        """
1825        if rl_type == RlType.PYREADLINE:
1826
1827            # Print hint if one exists and we are supposed to display it
1828            hint_printed = False
1829            if self.always_show_hint and self.completion_hint:
1830                hint_printed = True
1831                readline.rl.mode.console.write('\n' + self.completion_hint)
1832
1833            # Check if we already have formatted results to print
1834            if self.formatted_completions:
1835                if not hint_printed:
1836                    readline.rl.mode.console.write('\n')
1837                readline.rl.mode.console.write('\n' + self.formatted_completions + '\n\n')
1838
1839                # Redraw the prompt and input lines
1840                rl_force_redisplay()
1841
1842            # Otherwise use pyreadline's formatter
1843            else:
1844                # Check if we should show display_matches
1845                if self.display_matches:
1846                    matches_to_display = self.display_matches
1847                else:
1848                    matches_to_display = matches
1849
1850                # Add padding for visual appeal
1851                matches_to_display, _ = self._pad_matches_to_display(matches_to_display)
1852
1853                # Display matches using actual display function. This also redraws the prompt and input lines.
1854                orig_pyreadline_display(matches_to_display)
1855
1856    @staticmethod
1857    def _determine_ap_completer_type(parser: argparse.ArgumentParser) -> Type[argparse_completer.ArgparseCompleter]:
1858        """
1859        Determine what type of ArgparseCompleter to use on a given parser. If the parser does not have one
1860        set, then use argparse_completer.DEFAULT_AP_COMPLETER.
1861
1862        :param parser: the parser to examine
1863        :return: type of ArgparseCompleter
1864        """
1865        completer_type: Optional[
1866            Type[argparse_completer.ArgparseCompleter]
1867        ] = parser.get_ap_completer_type()  # type: ignore[attr-defined]
1868
1869        if completer_type is None:
1870            completer_type = argparse_completer.DEFAULT_AP_COMPLETER
1871        return completer_type
1872
1873    def _perform_completion(
1874        self, text: str, line: str, begidx: int, endidx: int, custom_settings: Optional[utils.CustomCompletionSettings] = None
1875    ) -> None:
1876        """
1877        Helper function for complete() that performs the actual completion
1878
1879        :param text: the string prefix we are attempting to match (all matches must begin with it)
1880        :param line: the current input line with leading whitespace removed
1881        :param begidx: the beginning index of the prefix text
1882        :param endidx: the ending index of the prefix text
1883        :param custom_settings: optional prepopulated completion settings
1884        """
1885        # If custom_settings is None, then we are completing a command's argument.
1886        # Parse the command line to get the command token.
1887        command = ''
1888        if custom_settings is None:
1889            statement = self.statement_parser.parse_command_only(line)
1890            command = statement.command
1891
1892            # Malformed command line (e.g. quoted command token)
1893            if not command:
1894                return
1895
1896            expanded_line = statement.command_and_args
1897
1898            # We overwrote line with a properly formatted but fully stripped version
1899            # Restore the end spaces since line is only supposed to be lstripped when
1900            # passed to completer functions according to Python docs
1901            rstripped_len = len(line) - len(line.rstrip())
1902            expanded_line += ' ' * rstripped_len
1903
1904            # Fix the index values if expanded_line has a different size than line
1905            if len(expanded_line) != len(line):
1906                diff = len(expanded_line) - len(line)
1907                begidx += diff
1908                endidx += diff
1909
1910            # Overwrite line to pass into completers
1911            line = expanded_line
1912
1913        # Get all tokens through the one being completed
1914        tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
1915        if not tokens:  # pragma: no cover
1916            return
1917
1918        # Determine the completer function to use for the command's argument
1919        if custom_settings is None:
1920            # Check if a macro was entered
1921            if command in self.macros:
1922                completer_func = self.path_complete
1923
1924            # Check if a command was entered
1925            elif command in self.get_all_commands():
1926                # Get the completer function for this command
1927                func_attr = getattr(self, constants.COMPLETER_FUNC_PREFIX + command, None)
1928
1929                if func_attr is not None:
1930                    completer_func = func_attr
1931                else:
1932                    # There's no completer function, next see if the command uses argparse
1933                    func = self.cmd_func(command)
1934                    argparser: Optional[argparse.ArgumentParser] = getattr(func, constants.CMD_ATTR_ARGPARSER, None)
1935
1936                    if func is not None and argparser is not None:
1937                        # Get arguments for complete()
1938                        preserve_quotes = getattr(func, constants.CMD_ATTR_PRESERVE_QUOTES)
1939                        cmd_set = self._cmd_to_command_sets[command] if command in self._cmd_to_command_sets else None
1940
1941                        # Create the argparse completer
1942                        completer_type = self._determine_ap_completer_type(argparser)
1943                        completer = completer_type(argparser, self)
1944
1945                        completer_func = functools.partial(
1946                            completer.complete, tokens=raw_tokens[1:] if preserve_quotes else tokens[1:], cmd_set=cmd_set
1947                        )
1948                    else:
1949                        completer_func = self.completedefault  # type: ignore[assignment]
1950
1951            # Not a recognized macro or command
1952            else:
1953                # Check if this command should be run as a shell command
1954                if self.default_to_shell and command in utils.get_exes_in_path(command):
1955                    completer_func = self.path_complete
1956                else:
1957                    completer_func = self.completedefault  # type: ignore[assignment]
1958
1959        # Otherwise we are completing the command token or performing custom completion
1960        else:
1961            # Create the argparse completer
1962            completer_type = self._determine_ap_completer_type(custom_settings.parser)
1963            completer = completer_type(custom_settings.parser, self)
1964
1965            completer_func = functools.partial(
1966                completer.complete, tokens=raw_tokens if custom_settings.preserve_quotes else tokens, cmd_set=None
1967            )
1968
1969        # Text we need to remove from completions later
1970        text_to_remove = ''
1971
1972        # Get the token being completed with any opening quote preserved
1973        raw_completion_token = raw_tokens[-1]
1974
1975        # Used for adding quotes to the completion token
1976        completion_token_quote = ''
1977
1978        # Check if the token being completed has an opening quote
1979        if raw_completion_token and raw_completion_token[0] in constants.QUOTES:
1980
1981            # Since the token is still being completed, we know the opening quote is unclosed.
1982            # Save the quote so we can add a matching closing quote later.
1983            completion_token_quote = raw_completion_token[0]
1984
1985            # readline still performs word breaks after a quote. Therefore something like quoted search
1986            # text with a space would have resulted in begidx pointing to the middle of the token we
1987            # we want to complete. Figure out where that token actually begins and save the beginning
1988            # portion of it that was not part of the text readline gave us. We will remove it from the
1989            # completions later since readline expects them to start with the original text.
1990            actual_begidx = line[:endidx].rfind(tokens[-1])
1991
1992            if actual_begidx != begidx:
1993                text_to_remove = line[actual_begidx:begidx]
1994
1995                # Adjust text and where it begins so the completer routines
1996                # get unbroken search text to complete on.
1997                text = text_to_remove + text
1998                begidx = actual_begidx
1999
2000        # Attempt tab completion for redirection first, and if that isn't occurring,
2001        # call the completer function for the current command
2002        self.completion_matches = self._redirect_complete(text, line, begidx, endidx, completer_func)
2003
2004        if self.completion_matches:
2005
2006            # Eliminate duplicates
2007            self.completion_matches = utils.remove_duplicates(self.completion_matches)
2008            self.display_matches = utils.remove_duplicates(self.display_matches)
2009
2010            if not self.display_matches:
2011                # Since self.display_matches is empty, set it to self.completion_matches
2012                # before we alter them. That way the suggestions will reflect how we parsed
2013                # the token being completed and not how readline did.
2014                import copy
2015
2016                self.display_matches = copy.copy(self.completion_matches)
2017
2018            # Check if we need to add an opening quote
2019            if not completion_token_quote:
2020
2021                add_quote = False
2022
2023                # This is the tab completion text that will appear on the command line.
2024                common_prefix = os.path.commonprefix(self.completion_matches)
2025
2026                if self.matches_delimited:
2027                    # Check if any portion of the display matches appears in the tab completion
2028                    display_prefix = os.path.commonprefix(self.display_matches)
2029
2030                    # For delimited matches, we check for a space in what appears before the display
2031                    # matches (common_prefix) as well as in the display matches themselves.
2032                    if ' ' in common_prefix or (display_prefix and any(' ' in match for match in self.display_matches)):
2033                        add_quote = True
2034
2035                # If there is a tab completion and any match has a space, then add an opening quote
2036                elif common_prefix and any(' ' in match for match in self.completion_matches):
2037                    add_quote = True
2038
2039                if add_quote:
2040                    # Figure out what kind of quote to add and save it as the unclosed_quote
2041                    if any('"' in match for match in self.completion_matches):
2042                        completion_token_quote = "'"
2043                    else:
2044                        completion_token_quote = '"'
2045
2046                    self.completion_matches = [completion_token_quote + match for match in self.completion_matches]
2047
2048            # Check if we need to remove text from the beginning of tab completions
2049            elif text_to_remove:
2050                self.completion_matches = [match.replace(text_to_remove, '', 1) for match in self.completion_matches]
2051
2052            # If we have one result, then add a closing quote if needed and allowed
2053            if len(self.completion_matches) == 1 and self.allow_closing_quote and completion_token_quote:
2054                self.completion_matches[0] += completion_token_quote
2055
2056    def complete(  # type: ignore[override]
2057        self, text: str, state: int, custom_settings: Optional[utils.CustomCompletionSettings] = None
2058    ) -> Optional[str]:
2059        """Override of cmd's complete method which returns the next possible completion for 'text'
2060
2061        This completer function is called by readline as complete(text, state), for state in 0, 1, 2, …,
2062        until it returns a non-string value. It should return the next possible completion starting with text.
2063
2064        Since readline suppresses any exception raised in completer functions, they can be difficult to debug.
2065        Therefore this function wraps the actual tab completion logic and prints to stderr any exception that
2066        occurs before returning control to readline.
2067
2068        :param text: the current word that user is typing
2069        :param state: non-negative integer
2070        :param custom_settings: used when not tab completing the main command line
2071        :return: the next possible completion for text or None
2072        """
2073        # noinspection PyBroadException
2074        try:
2075            if state == 0:
2076                self._reset_completion_defaults()
2077
2078                # Check if we are completing a multiline command
2079                if self._at_continuation_prompt:
2080                    # lstrip and prepend the previously typed portion of this multiline command
2081                    lstripped_previous = self._multiline_in_progress.lstrip().replace(constants.LINE_FEED, ' ')
2082                    line = lstripped_previous + readline.get_line_buffer()
2083
2084                    # Increment the indexes to account for the prepended text
2085                    begidx = len(lstripped_previous) + readline.get_begidx()
2086                    endidx = len(lstripped_previous) + readline.get_endidx()
2087                else:
2088                    # lstrip the original line
2089                    orig_line = readline.get_line_buffer()
2090                    line = orig_line.lstrip()
2091                    num_stripped = len(orig_line) - len(line)
2092
2093                    # Calculate new indexes for the stripped line. If the cursor is at a position before the end of a
2094                    # line of spaces, then the following math could result in negative indexes. Enforce a max of 0.
2095                    begidx = max(readline.get_begidx() - num_stripped, 0)
2096                    endidx = max(readline.get_endidx() - num_stripped, 0)
2097
2098                # Shortcuts are not word break characters when tab completing. Therefore shortcuts become part
2099                # of the text variable if there isn't a word break, like a space, after it. We need to remove it
2100                # from text and update the indexes. This only applies if we are at the beginning of the command line.
2101                shortcut_to_restore = ''
2102                if begidx == 0 and custom_settings is None:
2103                    for (shortcut, _) in self.statement_parser.shortcuts:
2104                        if text.startswith(shortcut):
2105                            # Save the shortcut to restore later
2106                            shortcut_to_restore = shortcut
2107
2108                            # Adjust text and where it begins
2109                            text = text[len(shortcut_to_restore) :]
2110                            begidx += len(shortcut_to_restore)
2111                            break
2112                    else:
2113                        # No shortcut was found. Complete the command token.
2114                        parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(add_help=False)
2115                        parser.add_argument(
2116                            'command',
2117                            metavar="COMMAND",
2118                            help="command, alias, or macro name",
2119                            choices=self._get_commands_aliases_and_macros_for_completion(),
2120                        )
2121                        custom_settings = utils.CustomCompletionSettings(parser)
2122
2123                self._perform_completion(text, line, begidx, endidx, custom_settings)
2124
2125                # Check if we need to restore a shortcut in the tab completions
2126                # so it doesn't get erased from the command line
2127                if shortcut_to_restore:
2128                    self.completion_matches = [shortcut_to_restore + match for match in self.completion_matches]
2129
2130                # If we have one result and we are at the end of the line, then add a space if allowed
2131                if len(self.completion_matches) == 1 and endidx == len(line) and self.allow_appended_space:
2132                    self.completion_matches[0] += ' '
2133
2134                # Sort matches if they haven't already been sorted
2135                if not self.matches_sorted:
2136                    self.completion_matches.sort(key=self.default_sort_key)
2137                    self.display_matches.sort(key=self.default_sort_key)
2138                    self.matches_sorted = True
2139
2140            try:
2141                return self.completion_matches[state]
2142            except IndexError:
2143                return None
2144
2145        except CompletionError as ex:
2146            # Don't print error and redraw the prompt unless the error has length
2147            err_str = str(ex)
2148            if err_str:
2149                if ex.apply_style:
2150                    err_str = ansi.style_error(err_str)
2151                ansi.style_aware_write(sys.stdout, '\n' + err_str + '\n')
2152                rl_force_redisplay()
2153            return None
2154        except Exception as ex:
2155            # Insert a newline so the exception doesn't print in the middle of the command line being tab completed
2156            self.perror()
2157            self.pexcept(ex)
2158            rl_force_redisplay()
2159            return None
2160
2161    def in_script(self) -> bool:
2162        """Return whether a text script is running"""
2163        return self._current_script_dir is not None
2164
2165    def in_pyscript(self) -> bool:
2166        """Return whether running inside a Python shell or pyscript"""
2167        return self._in_py
2168
2169    @property
2170    def aliases(self) -> Dict[str, str]:
2171        """Read-only property to access the aliases stored in the StatementParser"""
2172        return self.statement_parser.aliases
2173
2174    def get_names(self) -> List[str]:
2175        """Return an alphabetized list of names comprising the attributes of the cmd2 class instance."""
2176        return dir(self)
2177
2178    def get_all_commands(self) -> List[str]:
2179        """Return a list of all commands"""
2180        return [
2181            name[len(constants.COMMAND_FUNC_PREFIX) :]
2182            for name in self.get_names()
2183            if name.startswith(constants.COMMAND_FUNC_PREFIX) and callable(getattr(self, name))
2184        ]
2185
2186    def get_visible_commands(self) -> List[str]:
2187        """Return a list of commands that have not been hidden or disabled"""
2188        return [
2189            command
2190            for command in self.get_all_commands()
2191            if command not in self.hidden_commands and command not in self.disabled_commands
2192        ]
2193
2194    # Table displayed when tab completing aliases
2195    _alias_completion_table = SimpleTable([Column('Value', width=80)], divider_char=None)
2196
2197    def _get_alias_completion_items(self) -> List[CompletionItem]:
2198        """Return list of alias names and values as CompletionItems"""
2199        results: List[CompletionItem] = []
2200
2201        for cur_key in self.aliases:
2202            row_data = [self.aliases[cur_key]]
2203            results.append(CompletionItem(cur_key, self._alias_completion_table.generate_data_row(row_data)))
2204
2205        return results
2206
2207    # Table displayed when tab completing macros
2208    _macro_completion_table = SimpleTable([Column('Value', width=80)], divider_char=None)
2209
2210    def _get_macro_completion_items(self) -> List[CompletionItem]:
2211        """Return list of macro names and values as CompletionItems"""
2212        results: List[CompletionItem] = []
2213
2214        for cur_key in self.macros:
2215            row_data = [self.macros[cur_key].value]
2216            results.append(CompletionItem(cur_key, self._macro_completion_table.generate_data_row(row_data)))
2217
2218        return results
2219
2220    # Table displayed when tab completing Settables
2221    _settable_completion_table = SimpleTable([Column('Value', width=30), Column('Description', width=60)], divider_char=None)
2222
2223    def _get_settable_completion_items(self) -> List[CompletionItem]:
2224        """Return list of Settable names, values, and descriptions as CompletionItems"""
2225        results: List[CompletionItem] = []
2226
2227        for cur_key in self.settables:
2228            row_data = [self.settables[cur_key].get_value(), self.settables[cur_key].description]
2229            results.append(CompletionItem(cur_key, self._settable_completion_table.generate_data_row(row_data)))
2230
2231        return results
2232
2233    def _get_commands_aliases_and_macros_for_completion(self) -> List[str]:
2234        """Return a list of visible commands, aliases, and macros for tab completion"""
2235        visible_commands = set(self.get_visible_commands())
2236        alias_names = set(self.aliases)
2237        macro_names = set(self.macros)
2238        return list(visible_commands | alias_names | macro_names)
2239
2240    def get_help_topics(self) -> List[str]:
2241        """Return a list of help topics"""
2242        all_topics = [
2243            name[len(constants.HELP_FUNC_PREFIX) :]
2244            for name in self.get_names()
2245            if name.startswith(constants.HELP_FUNC_PREFIX) and callable(getattr(self, name))
2246        ]
2247
2248        # Filter out hidden and disabled commands
2249        return [topic for topic in all_topics if topic not in self.hidden_commands and topic not in self.disabled_commands]
2250
2251    # noinspection PyUnusedLocal
2252    def sigint_handler(self, signum: int, _: FrameType) -> None:
2253        """Signal handler for SIGINTs which typically come from Ctrl-C events.
2254
2255        If you need custom SIGINT behavior, then override this function.
2256
2257        :param signum: signal number
2258        :param _: required param for signal handlers
2259        """
2260        if self._cur_pipe_proc_reader is not None:
2261            # Pass the SIGINT to the current pipe process
2262            self._cur_pipe_proc_reader.send_sigint()
2263
2264        # Check if we are allowed to re-raise the KeyboardInterrupt
2265        if not self.sigint_protection:
2266            self._raise_keyboard_interrupt()
2267
2268    def _raise_keyboard_interrupt(self) -> None:
2269        """Helper function to raise a KeyboardInterrupt"""
2270        raise KeyboardInterrupt("Got a keyboard interrupt")
2271
2272    def precmd(self, statement: Union[Statement, str]) -> Statement:
2273        """Hook method executed just before the command is executed by
2274        :meth:`~cmd2.Cmd.onecmd` and after adding it to history.
2275
2276        :param statement: subclass of str which also contains the parsed input
2277        :return: a potentially modified version of the input Statement object
2278
2279        See :meth:`~cmd2.Cmd.register_postparsing_hook` and
2280        :meth:`~cmd2.Cmd.register_precmd_hook` for more robust ways
2281        to run hooks before the command is executed. See
2282        :ref:`features/hooks:Postparsing Hooks` and
2283        :ref:`features/hooks:Precommand Hooks` for more information.
2284        """
2285        return Statement(statement) if not isinstance(statement, Statement) else statement
2286
2287    def postcmd(self, stop: bool, statement: Union[Statement, str]) -> bool:
2288        """Hook method executed just after a command is executed by
2289        :meth:`~cmd2.Cmd.onecmd`.
2290
2291        :param stop: return `True` to request the command loop terminate
2292        :param statement: subclass of str which also contains the parsed input
2293
2294        See :meth:`~cmd2.Cmd.register_postcmd_hook` and :meth:`~cmd2.Cmd.register_cmdfinalization_hook` for more robust ways
2295        to run hooks after the command is executed. See
2296        :ref:`features/hooks:Postcommand Hooks` and
2297        :ref:`features/hooks:Command Finalization Hooks` for more information.
2298        """
2299        return stop
2300
2301    def preloop(self) -> None:
2302        """Hook method executed once when the :meth:`~.cmd2.Cmd.cmdloop()`
2303        method is called.
2304
2305        See :meth:`~cmd2.Cmd.register_preloop_hook` for a more robust way
2306        to run hooks before the command loop begins. See
2307        :ref:`features/hooks:Application Lifecycle Hooks` for more information.
2308        """
2309        pass
2310
2311    def postloop(self) -> None:
2312        """Hook method executed once when the :meth:`~.cmd2.Cmd.cmdloop()`
2313        method is about to return.
2314
2315        See :meth:`~cmd2.Cmd.register_postloop_hook` for a more robust way
2316        to run hooks after the command loop completes. See
2317        :ref:`features/hooks:Application Lifecycle Hooks` for more information.
2318        """
2319        pass
2320
2321    def parseline(self, line: str) -> Tuple[str, str, str]:
2322        """Parse the line into a command name and a string containing the arguments.
2323
2324        NOTE: This is an override of a parent class method.  It is only used by other parent class methods.
2325
2326        Different from the parent class method, this ignores self.identchars.
2327
2328        :param line: line read by readline
2329        :return: tuple containing (command, args, line)
2330        """
2331        statement = self.statement_parser.parse_command_only(line)
2332        return statement.command, statement.args, statement.command_and_args
2333
2334    def onecmd_plus_hooks(
2335        self, line: str, *, add_to_history: bool = True, raise_keyboard_interrupt: bool = False, py_bridge_call: bool = False
2336    ) -> bool:
2337        """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks.
2338
2339        :param line: command line to run
2340        :param add_to_history: If True, then add this command to history. Defaults to True.
2341        :param raise_keyboard_interrupt: if True, then KeyboardInterrupt exceptions will be raised if stop isn't already
2342                                         True. This is used when running commands in a loop to be able to stop the whole
2343                                         loop and not just the current command. Defaults to False.
2344        :param py_bridge_call: This should only ever be set to True by PyBridge to signify the beginning
2345                               of an app() call from Python. It is used to enable/disable the storage of the
2346                               command's stdout.
2347        :return: True if running of commands should stop
2348        """
2349        import datetime
2350
2351        stop = False
2352        statement = None
2353
2354        try:
2355            # Convert the line into a Statement
2356            statement = self._input_line_to_statement(line)
2357
2358            # call the postparsing hooks
2359            postparsing_data = plugin.PostparsingData(False, statement)
2360            for postparsing_func in self._postparsing_hooks:
2361                postparsing_data = postparsing_func(postparsing_data)
2362                if postparsing_data.stop:
2363                    break
2364
2365            # unpack the postparsing_data object
2366            statement = postparsing_data.statement
2367            stop = postparsing_data.stop
2368            if stop:
2369                # we should not run the command, but
2370                # we need to run the finalization hooks
2371                raise EmptyStatement
2372
2373            redir_saved_state: Optional[utils.RedirectionSavedState] = None
2374
2375            try:
2376                # Get sigint protection while we set up redirection
2377                with self.sigint_protection:
2378                    if py_bridge_call:
2379                        # Start saving command's stdout at this point
2380                        self.stdout.pause_storage = False  # type: ignore[attr-defined]
2381
2382                    redir_saved_state = self._redirect_output(statement)
2383
2384                timestart = datetime.datetime.now()
2385
2386                # precommand hooks
2387                precmd_data = plugin.PrecommandData(statement)
2388                for precmd_func in self._precmd_hooks:
2389                    precmd_data = precmd_func(precmd_data)
2390                statement = precmd_data.statement
2391
2392                # call precmd() for compatibility with cmd.Cmd
2393                statement = self.precmd(statement)
2394
2395                # go run the command function
2396                stop = self.onecmd(statement, add_to_history=add_to_history)
2397
2398                # postcommand hooks
2399                postcmd_data = plugin.PostcommandData(stop, statement)
2400                for postcmd_func in self._postcmd_hooks:
2401                    postcmd_data = postcmd_func(postcmd_data)
2402
2403                # retrieve the final value of stop, ignoring any statement modification from the hooks
2404                stop = postcmd_data.stop
2405
2406                # call postcmd() for compatibility with cmd.Cmd
2407                stop = self.postcmd(stop, statement)
2408
2409                if self.timing:
2410                    self.pfeedback(f'Elapsed: {datetime.datetime.now() - timestart}')
2411            finally:
2412                # Get sigint protection while we restore stuff
2413                with self.sigint_protection:
2414                    if redir_saved_state is not None:
2415                        self._restore_output(statement, redir_saved_state)
2416
2417                    if py_bridge_call:
2418                        # Stop saving command's stdout before command finalization hooks run
2419                        self.stdout.pause_storage = True  # type: ignore[attr-defined]
2420        except (SkipPostcommandHooks, EmptyStatement):
2421            # Don't do anything, but do allow command finalization hooks to run
2422            pass
2423        except Cmd2ShlexError as ex:
2424            self.perror(f"Invalid syntax: {ex}")
2425        except RedirectionError as ex:
2426            self.perror(ex)
2427        except KeyboardInterrupt as ex:
2428            if raise_keyboard_interrupt and not stop:
2429                raise ex
2430        except SystemExit as ex:
2431            self.exit_code = ex.code
2432            stop = True
2433        except PassThroughException as ex:
2434            raise ex.wrapped_ex
2435        except Exception as ex:
2436            self.pexcept(ex)
2437        finally:
2438            try:
2439                stop = self._run_cmdfinalization_hooks(stop, statement)
2440            except KeyboardInterrupt as ex:
2441                if raise_keyboard_interrupt and not stop:
2442                    raise ex
2443            except SystemExit as ex:
2444                self.exit_code = ex.code
2445                stop = True
2446            except PassThroughException as ex:
2447                raise ex.wrapped_ex
2448            except Exception as ex:
2449                self.pexcept(ex)
2450
2451        return stop
2452
2453    def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool:
2454        """Run the command finalization hooks"""
2455        with self.sigint_protection:
2456            if not sys.platform.startswith('win') and self.stdin.isatty():
2457                # Before the next command runs, fix any terminal problems like those
2458                # caused by certain binary characters having been printed to it.
2459                import subprocess
2460
2461                proc = subprocess.Popen(['stty', 'sane'])
2462                proc.communicate()
2463
2464        data = plugin.CommandFinalizationData(stop, statement)
2465        for func in self._cmdfinalization_hooks:
2466            data = func(data)
2467        # retrieve the final value of stop, ignoring any
2468        # modifications to the statement
2469        return data.stop
2470
2471    def runcmds_plus_hooks(
2472        self,
2473        cmds: Union[List[HistoryItem], List[str]],
2474        *,
2475        add_to_history: bool = True,
2476        stop_on_keyboard_interrupt: bool = False,
2477    ) -> bool:
2478        """
2479        Used when commands are being run in an automated fashion like text scripts or history replays.
2480        The prompt and command line for each command will be printed if echo is True.
2481
2482        :param cmds: commands to run
2483        :param add_to_history: If True, then add these commands to history. Defaults to True.
2484        :param stop_on_keyboard_interrupt: if True, then stop running contents of cmds if Ctrl-C is pressed instead of moving
2485                                           to the next command in the list. This is used when the commands are part of a
2486                                           group, like a text script, which should stop upon Ctrl-C. Defaults to False.
2487        :return: True if running of commands should stop
2488        """
2489        for line in cmds:
2490            if isinstance(line, HistoryItem):
2491                line = line.raw
2492
2493            if self.echo:
2494                self.poutput(f'{self.prompt}{line}')
2495
2496            try:
2497                if self.onecmd_plus_hooks(
2498                    line, add_to_history=add_to_history, raise_keyboard_interrupt=stop_on_keyboard_interrupt
2499                ):
2500                    return True
2501            except KeyboardInterrupt as ex:
2502                if stop_on_keyboard_interrupt:
2503                    self.perror(ex)
2504                    break
2505
2506        return False
2507
2508    def _complete_statement(self, line: str) -> Statement:
2509        """Keep accepting lines of input until the command is complete.
2510
2511        There is some pretty hacky code here to handle some quirks of
2512        self._read_command_line(). It returns a literal 'eof' if the input
2513        pipe runs out. We can't refactor it because we need to retain
2514        backwards compatibility with the standard library version of cmd.
2515
2516        :param line: the line being parsed
2517        :return: the completed Statement
2518        :raises: Cmd2ShlexError if a shlex error occurs (e.g. No closing quotation)
2519        :raises: EmptyStatement when the resulting Statement is blank
2520        """
2521        while True:
2522            try:
2523                statement = self.statement_parser.parse(line)
2524                if statement.multiline_command and statement.terminator:
2525                    # we have a completed multiline command, we are done
2526                    break
2527                if not statement.multiline_command:
2528                    # it's not a multiline command, but we parsed it ok
2529                    # so we are done
2530                    break
2531            except Cmd2ShlexError:
2532                # we have unclosed quotation marks, lets parse only the command
2533                # and see if it's a multiline
2534                statement = self.statement_parser.parse_command_only(line)
2535                if not statement.multiline_command:
2536                    # not a multiline command, so raise the exception
2537                    raise
2538
2539            # if we get here we must have:
2540            #   - a multiline command with no terminator
2541            #   - a multiline command with unclosed quotation marks
2542            try:
2543                self._at_continuation_prompt = True
2544
2545                # Save the command line up to this point for tab completion
2546                self._multiline_in_progress = line + '\n'
2547
2548                nextline = self._read_command_line(self.continuation_prompt)
2549                if nextline == 'eof':
2550                    # they entered either a blank line, or we hit an EOF
2551                    # for some other reason. Turn the literal 'eof'
2552                    # into a blank line, which serves as a command
2553                    # terminator
2554                    nextline = '\n'
2555                    self.poutput(nextline)
2556                line = f'{self._multiline_in_progress}{nextline}'
2557            except KeyboardInterrupt:
2558                self.poutput('^C')
2559                statement = self.statement_parser.parse('')
2560                break
2561            finally:
2562                self._at_continuation_prompt = False
2563
2564        if not statement.command:
2565            raise EmptyStatement
2566        return statement
2567
2568    def _input_line_to_statement(self, line: str) -> Statement:
2569        """
2570        Parse the user's input line and convert it to a Statement, ensuring that all macros are also resolved
2571
2572        :param line: the line being parsed
2573        :return: parsed command line as a Statement
2574        :raises: Cmd2ShlexError if a shlex error occurs (e.g. No closing quotation)
2575        :raises: EmptyStatement when the resulting Statement is blank
2576        """
2577        used_macros = []
2578        orig_line = None
2579
2580        # Continue until all macros are resolved
2581        while True:
2582            # Make sure all input has been read and convert it to a Statement
2583            statement = self._complete_statement(line)
2584
2585            # Save the fully entered line if this is the first loop iteration
2586            if orig_line is None:
2587                orig_line = statement.raw
2588
2589            # Check if this command matches a macro and wasn't already processed to avoid an infinite loop
2590            if statement.command in self.macros.keys() and statement.command not in used_macros:
2591                used_macros.append(statement.command)
2592                resolve_result = self._resolve_macro(statement)
2593                if resolve_result is None:
2594                    raise EmptyStatement
2595                line = resolve_result
2596            else:
2597                break
2598
2599        # This will be true when a macro was used
2600        if orig_line != statement.raw:
2601            # Build a Statement that contains the resolved macro line
2602            # but the originally typed line for its raw member.
2603            statement = Statement(
2604                statement.args,
2605                raw=orig_line,
2606                command=statement.command,
2607                arg_list=statement.arg_list,
2608                multiline_command=statement.multiline_command,
2609                terminator=statement.terminator,
2610                suffix=statement.suffix,
2611                pipe_to=statement.pipe_to,
2612                output=statement.output,
2613                output_to=statement.output_to,
2614            )
2615        return statement
2616
2617    def _resolve_macro(self, statement: Statement) -> Optional[str]:
2618        """
2619        Resolve a macro and return the resulting string
2620
2621        :param statement: the parsed statement from the command line
2622        :return: the resolved macro or None on error
2623        """
2624        if statement.command not in self.macros.keys():
2625            raise KeyError(f"{statement.command} is not a macro")
2626
2627        macro = self.macros[statement.command]
2628
2629        # Make sure enough arguments were passed in
2630        if len(statement.arg_list) < macro.minimum_arg_count:
2631            plural = '' if macro.minimum_arg_count == 1 else 's'
2632            self.perror(f"The macro '{statement.command}' expects at least {macro.minimum_arg_count} argument{plural}")
2633            return None
2634
2635        # Resolve the arguments in reverse and read their values from statement.argv since those
2636        # are unquoted. Macro args should have been quoted when the macro was created.
2637        resolved = macro.value
2638        reverse_arg_list = sorted(macro.arg_list, key=lambda ma: ma.start_index, reverse=True)
2639
2640        for macro_arg in reverse_arg_list:
2641            if macro_arg.is_escaped:
2642                to_replace = '{{' + macro_arg.number_str + '}}'
2643                replacement = '{' + macro_arg.number_str + '}'
2644            else:
2645                to_replace = '{' + macro_arg.number_str + '}'
2646                replacement = statement.argv[int(macro_arg.number_str)]
2647
2648            parts = resolved.rsplit(to_replace, maxsplit=1)
2649            resolved = parts[0] + replacement + parts[1]
2650
2651        # Append extra arguments and use statement.arg_list since these arguments need their quotes preserved
2652        for stmt_arg in statement.arg_list[macro.minimum_arg_count :]:
2653            resolved += ' ' + stmt_arg
2654
2655        # Restore any terminator, suffix, redirection, etc.
2656        return resolved + statement.post_command
2657
2658    def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState:
2659        """Set up a command's output redirection for >, >>, and |.
2660
2661        :param statement: a parsed statement from the user
2662        :return: A bool telling if an error occurred and a utils.RedirectionSavedState object
2663        :raises: RedirectionError if an error occurs trying to pipe or redirect
2664        """
2665        import io
2666        import subprocess
2667
2668        # Initialize the redirection saved state
2669        redir_saved_state = utils.RedirectionSavedState(
2670            cast(TextIO, self.stdout), sys.stdout, self._cur_pipe_proc_reader, self._redirecting
2671        )
2672
2673        # The ProcReader for this command
2674        cmd_pipe_proc_reader: Optional[utils.ProcReader] = None
2675
2676        if not self.allow_redirection:
2677            # Don't return since we set some state variables at the end of the function
2678            pass
2679
2680        elif statement.pipe_to:
2681            # Create a pipe with read and write sides
2682            read_fd, write_fd = os.pipe()
2683
2684            # Open each side of the pipe
2685            subproc_stdin = io.open(read_fd, 'r')
2686            new_stdout: TextIO = cast(TextIO, io.open(write_fd, 'w'))
2687
2688            # Create pipe process in a separate group to isolate our signals from it. If a Ctrl-C event occurs,
2689            # our sigint handler will forward it only to the most recent pipe process. This makes sure pipe
2690            # processes close in the right order (most recent first).
2691            kwargs: Dict[str, Any] = dict()
2692            if sys.platform == 'win32':
2693                kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
2694            else:
2695                kwargs['start_new_session'] = True
2696
2697                # Attempt to run the pipe process in the user's preferred shell instead of the default behavior of using sh.
2698                shell = os.environ.get("SHELL")
2699                if shell:
2700                    kwargs['executable'] = shell
2701
2702            # For any stream that is a StdSim, we will use a pipe so we can capture its output
2703            proc = subprocess.Popen(  # type: ignore[call-overload]
2704                statement.pipe_to,
2705                stdin=subproc_stdin,
2706                stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout,  # type: ignore[unreachable]
2707                stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr,  # type: ignore[unreachable]
2708                shell=True,
2709                **kwargs,
2710            )
2711
2712            # Popen was called with shell=True so the user can chain pipe commands and redirect their output
2713            # like: !ls -l | grep user | wc -l > out.txt. But this makes it difficult to know if the pipe process
2714            # started OK, since the shell itself always starts. Therefore, we will wait a short time and check
2715            # if the pipe process is still running.
2716            try:
2717                proc.wait(0.2)
2718            except subprocess.TimeoutExpired:
2719                pass
2720
2721            # Check if the pipe process already exited
2722            if proc.returncode is not None:
2723                subproc_stdin.close()
2724                new_stdout.close()
2725                raise RedirectionError(f'Pipe process exited with code {proc.returncode} before command could run')
2726            else:
2727                redir_saved_state.redirecting = True  # type: ignore[unreachable]
2728                cmd_pipe_proc_reader = utils.ProcReader(proc, cast(TextIO, self.stdout), sys.stderr)
2729                sys.stdout = self.stdout = new_stdout
2730
2731        elif statement.output:
2732            import tempfile
2733
2734            if (not statement.output_to) and (not self._can_clip):
2735                raise RedirectionError("Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies")
2736
2737            # Redirecting to a file
2738            elif statement.output_to:
2739                # statement.output can only contain REDIRECTION_APPEND or REDIRECTION_OUTPUT
2740                mode = 'a' if statement.output == constants.REDIRECTION_APPEND else 'w'
2741                try:
2742                    # Use line buffering
2743                    new_stdout = cast(TextIO, open(utils.strip_quotes(statement.output_to), mode=mode, buffering=1))
2744                except OSError as ex:
2745                    raise RedirectionError(f'Failed to redirect because: {ex}')
2746
2747                redir_saved_state.redirecting = True
2748                sys.stdout = self.stdout = new_stdout
2749
2750            # Redirecting to a paste buffer
2751            else:
2752                new_stdout = cast(TextIO, tempfile.TemporaryFile(mode="w+"))
2753                redir_saved_state.redirecting = True
2754                sys.stdout = self.stdout = new_stdout
2755
2756                if statement.output == constants.REDIRECTION_APPEND:
2757                    self.stdout.write(get_paste_buffer())
2758                    self.stdout.flush()
2759
2760        # These are updated regardless of whether the command redirected
2761        self._cur_pipe_proc_reader = cmd_pipe_proc_reader
2762        self._redirecting = redir_saved_state.redirecting
2763
2764        return redir_saved_state
2765
2766    def _restore_output(self, statement: Statement, saved_redir_state: utils.RedirectionSavedState) -> None:
2767        """Handles restoring state after output redirection
2768
2769        :param statement: Statement object which contains the parsed input from the user
2770        :param saved_redir_state: contains information needed to restore state data
2771        """
2772        if saved_redir_state.redirecting:
2773            # If we redirected output to the clipboard
2774            if statement.output and not statement.output_to:
2775                self.stdout.seek(0)
2776                write_to_paste_buffer(self.stdout.read())
2777
2778            try:
2779                # Close the file or pipe that stdout was redirected to
2780                self.stdout.close()
2781            except BrokenPipeError:
2782                pass
2783
2784            # Restore the stdout values
2785            self.stdout = cast(TextIO, saved_redir_state.saved_self_stdout)
2786            sys.stdout = cast(TextIO, saved_redir_state.saved_sys_stdout)
2787
2788            # Check if we need to wait for the process being piped to
2789            if self._cur_pipe_proc_reader is not None:
2790                self._cur_pipe_proc_reader.wait()
2791
2792        # These are restored regardless of whether the command redirected
2793        self._cur_pipe_proc_reader = saved_redir_state.saved_pipe_proc_reader
2794        self._redirecting = saved_redir_state.saved_redirecting
2795
2796    def cmd_func(self, command: str) -> Optional[CommandFunc]:
2797        """
2798        Get the function for a command
2799
2800        :param command: the name of the command
2801
2802        :Example:
2803
2804        >>> helpfunc = self.cmd_func('help')
2805
2806        helpfunc now contains a reference to the ``do_help`` method
2807        """
2808        func_name = self._cmd_func_name(command)
2809        if func_name:
2810            return cast(Optional[CommandFunc], getattr(self, func_name))
2811        return None
2812
2813    def _cmd_func_name(self, command: str) -> str:
2814        """Get the method name associated with a given command.
2815
2816        :param command: command to look up method name which implements it
2817        :return: method name which implements the given command
2818        """
2819        target = constants.COMMAND_FUNC_PREFIX + command
2820        return target if callable(getattr(self, target, None)) else ''
2821
2822    # noinspection PyMethodOverriding
2823    def onecmd(self, statement: Union[Statement, str], *, add_to_history: bool = True) -> bool:
2824        """This executes the actual do_* method for a command.
2825
2826        If the command provided doesn't exist, then it executes default() instead.
2827
2828        :param statement: intended to be a Statement instance parsed command from the input stream, alternative
2829                          acceptance of a str is present only for backward compatibility with cmd
2830        :param add_to_history: If True, then add this command to history. Defaults to True.
2831        :return: a flag indicating whether the interpretation of commands should stop
2832        """
2833        # For backwards compatibility with cmd, allow a str to be passed in
2834        if not isinstance(statement, Statement):
2835            statement = self._input_line_to_statement(statement)
2836
2837        func = self.cmd_func(statement.command)
2838        if func:
2839            # Check to see if this command should be stored in history
2840            if (
2841                statement.command not in self.exclude_from_history
2842                and statement.command not in self.disabled_commands
2843                and add_to_history
2844            ):
2845                self.history.append(statement)
2846
2847            stop = func(statement)
2848
2849        else:
2850            stop = self.default(statement)
2851
2852        return stop if stop is not None else False
2853
2854    def default(self, statement: Statement) -> Optional[bool]:  # type: ignore[override]
2855        """Executed when the command given isn't a recognized command implemented by a do_* method.
2856
2857        :param statement: Statement object with parsed input
2858        """
2859        if self.default_to_shell:
2860            if 'shell' not in self.exclude_from_history:
2861                self.history.append(statement)
2862
2863            # noinspection PyTypeChecker
2864            return self.do_shell(statement.command_and_args)
2865        else:
2866            err_msg = self.default_error.format(statement.command)
2867
2868            # Set apply_style to False so default_error's style is not overridden
2869            self.perror(err_msg, apply_style=False)
2870            return None
2871
2872    def read_input(
2873        self,
2874        prompt: str,
2875        *,
2876        history: Optional[List[str]] = None,
2877        completion_mode: utils.CompletionMode = utils.CompletionMode.NONE,
2878        preserve_quotes: bool = False,
2879        choices: Optional[Iterable[Any]] = None,
2880        choices_provider: Optional[ChoicesProviderFunc] = None,
2881        completer: Optional[CompleterFunc] = None,
2882        parser: Optional[argparse.ArgumentParser] = None,
2883    ) -> str:
2884        """
2885        Read input from appropriate stdin value. Also supports tab completion and up-arrow history while
2886        input is being entered.
2887
2888        :param prompt: prompt to display to user
2889        :param history: optional list of strings to use for up-arrow history. If completion_mode is
2890                        CompletionMode.COMMANDS and this is None, then cmd2's command list history will
2891                        be used. The passed in history will not be edited. It is the caller's responsibility
2892                        to add the returned input to history if desired. Defaults to None.
2893        :param completion_mode: tells what type of tab completion to support. Tab completion only works when
2894                                self.use_rawinput is True and sys.stdin is a terminal. Defaults to
2895                                CompletionMode.NONE.
2896
2897        The following optional settings apply when completion_mode is CompletionMode.CUSTOM:
2898
2899        :param preserve_quotes: if True, then quoted tokens will keep their quotes when processed by
2900                                ArgparseCompleter. This is helpful in cases when you're tab completing
2901                                flag-like tokens (e.g. -o, --option) and you don't want them to be
2902                                treated as argparse flags when quoted. Set this to True if you plan
2903                                on passing the string to argparse with the tokens still quoted.
2904
2905        A maximum of one of these should be provided:
2906
2907        :param choices: iterable of accepted values for single argument
2908        :param choices_provider: function that provides choices for single argument
2909        :param completer: tab completion function that provides choices for single argument
2910        :param parser: an argument parser which supports the tab completion of multiple arguments
2911
2912        :return: the line read from stdin with all trailing new lines removed
2913        :raises: any exceptions raised by input() and stdin.readline()
2914        """
2915        readline_configured = False
2916        saved_completer: Optional[CompleterFunc] = None
2917        saved_history: Optional[List[str]] = None
2918
2919        def configure_readline() -> None:
2920            """Configure readline tab completion and history"""
2921            nonlocal readline_configured
2922            nonlocal saved_completer
2923            nonlocal saved_history
2924            nonlocal parser
2925
2926            if readline_configured:  # pragma: no cover
2927                return
2928
2929            # Configure tab completion
2930            if self._completion_supported():
2931                saved_completer = readline.get_completer()
2932
2933                # Disable completion
2934                if completion_mode == utils.CompletionMode.NONE:
2935                    # noinspection PyUnusedLocal
2936                    def complete_none(text: str, state: int) -> Optional[str]:  # pragma: no cover
2937                        return None
2938
2939                    complete_func = complete_none
2940
2941                # Complete commands
2942                elif completion_mode == utils.CompletionMode.COMMANDS:
2943                    complete_func = self.complete
2944
2945                # Set custom completion settings
2946                else:
2947                    if parser is None:
2948                        parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(add_help=False)
2949                        parser.add_argument(
2950                            'arg',
2951                            suppress_tab_hint=True,
2952                            choices=choices,  # type: ignore[arg-type]
2953                            choices_provider=choices_provider,
2954                            completer=completer,
2955                        )
2956
2957                    custom_settings = utils.CustomCompletionSettings(parser, preserve_quotes=preserve_quotes)
2958                    complete_func = functools.partial(self.complete, custom_settings=custom_settings)
2959
2960                readline.set_completer(complete_func)
2961
2962            # Overwrite history if not completing commands or new history was provided
2963            if completion_mode != utils.CompletionMode.COMMANDS or history is not None:
2964                saved_history = []
2965                for i in range(1, readline.get_current_history_length() + 1):
2966                    # noinspection PyArgumentList
2967                    saved_history.append(readline.get_history_item(i))
2968
2969                readline.clear_history()
2970                if history is not None:
2971                    for item in history:
2972                        readline.add_history(item)
2973
2974            readline_configured = True
2975
2976        def restore_readline() -> None:
2977            """Restore readline tab completion and history"""
2978            nonlocal readline_configured
2979            if not readline_configured:  # pragma: no cover
2980                return
2981
2982            if self._completion_supported():
2983                readline.set_completer(saved_completer)
2984
2985            if saved_history is not None:
2986                readline.clear_history()
2987                for item in saved_history:
2988                    readline.add_history(item)
2989
2990            readline_configured = False
2991
2992        # Check we are reading from sys.stdin
2993        if self.use_rawinput:
2994            if sys.stdin.isatty():
2995                try:
2996                    # Deal with the vagaries of readline and ANSI escape codes
2997                    escaped_prompt = rl_escape_prompt(prompt)
2998
2999                    with self.sigint_protection:
3000                        configure_readline()
3001                    line = input(escaped_prompt)
3002                finally:
3003                    with self.sigint_protection:
3004                        restore_readline()
3005            else:
3006                line = input()
3007                if self.echo:
3008                    sys.stdout.write(f'{prompt}{line}\n')
3009
3010        # Otherwise read from self.stdin
3011        else:
3012            if self.stdin.isatty():
3013                # on a tty, print the prompt first, then read the line
3014                self.poutput(prompt, end='')
3015                self.stdout.flush()
3016                line = self.stdin.readline()
3017                if len(line) == 0:
3018                    line = 'eof'
3019            else:
3020                # we are reading from a pipe, read the line to see if there is
3021                # anything there, if so, then decide whether to print the
3022                # prompt or not
3023                line = self.stdin.readline()
3024                if len(line):
3025                    # we read something, output the prompt and the something
3026                    if self.echo:
3027                        self.poutput(f'{prompt}{line}')
3028                else:
3029                    line = 'eof'
3030
3031        return line.rstrip('\r\n')
3032
3033    def _read_command_line(self, prompt: str) -> str:
3034        """
3035        Read command line from appropriate stdin
3036
3037        :param prompt: prompt to display to user
3038        :return: command line text of 'eof' if an EOFError was caught
3039        :raises: whatever exceptions are raised by input() except for EOFError
3040        """
3041        try:
3042            # Wrap in try since terminal_lock may not be locked
3043            try:
3044                # Command line is about to be drawn. Allow asynchronous changes to the terminal.
3045                self.terminal_lock.release()
3046            except RuntimeError:
3047                pass
3048            return self.read_input(prompt, completion_mode=utils.CompletionMode.COMMANDS)
3049        except EOFError:
3050            return 'eof'
3051        finally:
3052            # Command line is gone. Do not allow asynchronous changes to the terminal.
3053            self.terminal_lock.acquire()
3054
3055    def _set_up_cmd2_readline(self) -> _SavedReadlineSettings:
3056        """
3057        Called at beginning of command loop to set up readline with cmd2-specific settings
3058
3059        :return: Class containing saved readline settings
3060        """
3061        readline_settings = _SavedReadlineSettings()
3062
3063        if self._completion_supported():
3064
3065            # Set up readline for our tab completion needs
3066            if rl_type == RlType.GNU:
3067                # Set GNU readline's rl_basic_quote_characters to NULL so it won't automatically add a closing quote
3068                # We don't need to worry about setting rl_completion_suppress_quote since we never declared
3069                # rl_completer_quote_characters.
3070                readline_settings.basic_quotes = cast(bytes, ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value)
3071                rl_basic_quote_characters.value = None
3072
3073            readline_settings.completer = readline.get_completer()
3074            readline.set_completer(self.complete)
3075
3076            # Set the readline word delimiters for completion
3077            completer_delims = " \t\n"
3078            completer_delims += ''.join(constants.QUOTES)
3079            completer_delims += ''.join(constants.REDIRECTION_CHARS)
3080            completer_delims += ''.join(self.statement_parser.terminators)
3081
3082            readline_settings.delims = readline.get_completer_delims()
3083            readline.set_completer_delims(completer_delims)
3084
3085            # Enable tab completion
3086            readline.parse_and_bind(self.completekey + ": complete")
3087
3088        return readline_settings
3089
3090    def _restore_readline(self, readline_settings: _SavedReadlineSettings) -> None:
3091        """
3092        Called at end of command loop to restore saved readline settings
3093
3094        :param readline_settings: the readline settings to restore
3095        """
3096        if self._completion_supported():
3097
3098            # Restore what we changed in readline
3099            readline.set_completer(readline_settings.completer)
3100            readline.set_completer_delims(readline_settings.delims)
3101
3102            if rl_type == RlType.GNU:
3103                readline.set_completion_display_matches_hook(None)
3104                rl_basic_quote_characters.value = readline_settings.basic_quotes
3105            elif rl_type == RlType.PYREADLINE:
3106                # noinspection PyUnresolvedReferences
3107                readline.rl.mode._display_completions = orig_pyreadline_display
3108
3109    def _cmdloop(self) -> None:
3110        """Repeatedly issue a prompt, accept input, parse an initial prefix
3111        off the received input, and dispatch to action methods, passing them
3112        the remainder of the line as argument.
3113
3114        This serves the same role as cmd.cmdloop().
3115        """
3116        saved_readline_settings = None
3117
3118        try:
3119            # Get sigint protection while we set up readline for cmd2
3120            with self.sigint_protection:
3121                saved_readline_settings = self._set_up_cmd2_readline()
3122
3123            # Run startup commands
3124            stop = self.runcmds_plus_hooks(self._startup_commands)
3125            self._startup_commands.clear()
3126
3127            while not stop:
3128                # Get commands from user
3129                try:
3130                    line = self._read_command_line(self.prompt)
3131                except KeyboardInterrupt:
3132                    self.poutput('^C')
3133                    line = ''
3134
3135                # Run the command along with all associated pre and post hooks
3136                stop = self.onecmd_plus_hooks(line)
3137        finally:
3138            # Get sigint protection while we restore readline settings
3139            with self.sigint_protection:
3140                if saved_readline_settings is not None:
3141                    self._restore_readline(saved_readline_settings)
3142
3143    #############################################################
3144    # Parsers and functions for alias command and subcommands
3145    #############################################################
3146
3147    # Top-level parser for alias
3148    alias_description = "Manage aliases\n" "\n" "An alias is a command that enables replacement of a word by another string."
3149    alias_epilog = "See also:\n" "  macro"
3150    alias_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=alias_description, epilog=alias_epilog)
3151    alias_subparsers = alias_parser.add_subparsers(dest='subcommand', metavar='SUBCOMMAND')
3152    alias_subparsers.required = True
3153
3154    # Preserve quotes since we are passing strings to other commands
3155    @with_argparser(alias_parser, preserve_quotes=True)
3156    def do_alias(self, args: argparse.Namespace) -> None:
3157        """Manage aliases"""
3158        # Call handler for whatever subcommand was selected
3159        handler = args.cmd2_handler.get()
3160        handler(args)
3161
3162    # alias -> create
3163    alias_create_description = "Create or overwrite an alias"
3164
3165    alias_create_epilog = (
3166        "Notes:\n"
3167        "  If you want to use redirection, pipes, or terminators in the value of the\n"
3168        "  alias, then quote them.\n"
3169        "\n"
3170        "  Since aliases are resolved during parsing, tab completion will function as\n"
3171        "  it would for the actual command the alias resolves to.\n"
3172        "\n"
3173        "Examples:\n"
3174        "  alias create ls !ls -lF\n"
3175        "  alias create show_log !cat \"log file.txt\"\n"
3176        "  alias create save_results print_results \">\" out.txt\n"
3177    )
3178
3179    alias_create_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(
3180        description=alias_create_description, epilog=alias_create_epilog
3181    )
3182    alias_create_parser.add_argument('name', help='name of this alias')
3183    alias_create_parser.add_argument(
3184        'command', help='what the alias resolves to', choices_provider=_get_commands_aliases_and_macros_for_completion
3185    )
3186    alias_create_parser.add_argument(
3187        'command_args', nargs=argparse.REMAINDER, help='arguments to pass to command', completer=path_complete
3188    )
3189
3190    @as_subcommand_to('alias', 'create', alias_create_parser, help=alias_create_description.lower())
3191    def _alias_create(self, args: argparse.Namespace) -> None:
3192        """Create or overwrite an alias"""
3193        self.last_result = False
3194
3195        # Validate the alias name
3196        valid, errmsg = self.statement_parser.is_valid_command(args.name)
3197        if not valid:
3198            self.perror(f"Invalid alias name: {errmsg}")
3199            return
3200
3201        if args.name in self.get_all_commands():
3202            self.perror("Alias cannot have the same name as a command")
3203            return
3204
3205        if args.name in self.macros:
3206            self.perror("Alias cannot have the same name as a macro")
3207            return
3208
3209        # Unquote redirection and terminator tokens
3210        tokens_to_unquote = constants.REDIRECTION_TOKENS
3211        tokens_to_unquote.extend(self.statement_parser.terminators)
3212        utils.unquote_specific_tokens(args.command_args, tokens_to_unquote)
3213
3214        # Build the alias value string
3215        value = args.command
3216        if args.command_args:
3217            value += ' ' + ' '.join(args.command_args)
3218
3219        # Set the alias
3220        result = "overwritten" if args.name in self.aliases else "created"
3221        self.poutput(f"Alias '{args.name}' {result}")
3222
3223        self.aliases[args.name] = value
3224        self.last_result = True
3225
3226    # alias -> delete
3227    alias_delete_help = "delete aliases"
3228    alias_delete_description = "Delete specified aliases or all aliases if --all is used"
3229
3230    alias_delete_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=alias_delete_description)
3231    alias_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all aliases")
3232    alias_delete_parser.add_argument(
3233        'names',
3234        nargs=argparse.ZERO_OR_MORE,
3235        help='alias(es) to delete',
3236        choices_provider=_get_alias_completion_items,
3237        descriptive_header=_alias_completion_table.generate_header(),
3238    )
3239
3240    @as_subcommand_to('alias', 'delete', alias_delete_parser, help=alias_delete_help)
3241    def _alias_delete(self, args: argparse.Namespace) -> None:
3242        """Delete aliases"""
3243        self.last_result = True
3244
3245        if args.all:
3246            self.aliases.clear()
3247            self.poutput("All aliases deleted")
3248        elif not args.names:
3249            self.perror("Either --all or alias name(s) must be specified")
3250            self.last_result = False
3251        else:
3252            for cur_name in utils.remove_duplicates(args.names):
3253                if cur_name in self.aliases:
3254                    del self.aliases[cur_name]
3255                    self.poutput(f"Alias '{cur_name}' deleted")
3256                else:
3257                    self.perror(f"Alias '{cur_name}' does not exist")
3258
3259    # alias -> list
3260    alias_list_help = "list aliases"
3261    alias_list_description = (
3262        "List specified aliases in a reusable form that can be saved to a startup\n"
3263        "script to preserve aliases across sessions\n"
3264        "\n"
3265        "Without arguments, all aliases will be listed."
3266    )
3267
3268    alias_list_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=alias_list_description)
3269    alias_list_parser.add_argument(
3270        'names',
3271        nargs=argparse.ZERO_OR_MORE,
3272        help='alias(es) to list',
3273        choices_provider=_get_alias_completion_items,
3274        descriptive_header=_alias_completion_table.generate_header(),
3275    )
3276
3277    @as_subcommand_to('alias', 'list', alias_list_parser, help=alias_list_help)
3278    def _alias_list(self, args: argparse.Namespace) -> None:
3279        """List some or all aliases as 'alias create' commands"""
3280        self.last_result = {}  # Dict[alias_name, alias_value]
3281
3282        tokens_to_quote = constants.REDIRECTION_TOKENS
3283        tokens_to_quote.extend(self.statement_parser.terminators)
3284
3285        if args.names:
3286            to_list = utils.remove_duplicates(args.names)
3287        else:
3288            to_list = sorted(self.aliases, key=self.default_sort_key)
3289
3290        not_found: List[str] = []
3291        for name in to_list:
3292            if name not in self.aliases:
3293                not_found.append(name)
3294                continue
3295
3296            # Quote redirection and terminator tokens for the 'alias create' command
3297            tokens = shlex_split(self.aliases[name])
3298            command = tokens[0]
3299            command_args = tokens[1:]
3300            utils.quote_specific_tokens(command_args, tokens_to_quote)
3301
3302            val = command
3303            if command_args:
3304                val += ' ' + ' '.join(command_args)
3305
3306            self.poutput(f"alias create {name} {val}")
3307            self.last_result[name] = val
3308
3309        for name in not_found:
3310            self.perror(f"Alias '{name}' not found")
3311
3312    #############################################################
3313    # Parsers and functions for macro command and subcommands
3314    #############################################################
3315
3316    # Top-level parser for macro
3317    macro_description = "Manage macros\n" "\n" "A macro is similar to an alias, but it can contain argument placeholders."
3318    macro_epilog = "See also:\n" "  alias"
3319    macro_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=macro_description, epilog=macro_epilog)
3320    macro_subparsers = macro_parser.add_subparsers(dest='subcommand', metavar='SUBCOMMAND')
3321    macro_subparsers.required = True
3322
3323    # Preserve quotes since we are passing strings to other commands
3324    @with_argparser(macro_parser, preserve_quotes=True)
3325    def do_macro(self, args: argparse.Namespace) -> None:
3326        """Manage macros"""
3327        # Call handler for whatever subcommand was selected
3328        handler = args.cmd2_handler.get()
3329        handler(args)
3330
3331    # macro -> create
3332    macro_create_help = "create or overwrite a macro"
3333    macro_create_description = "Create or overwrite a macro"
3334
3335    macro_create_epilog = (
3336        "A macro is similar to an alias, but it can contain argument placeholders.\n"
3337        "Arguments are expressed when creating a macro using {#} notation where {1}\n"
3338        "means the first argument.\n"
3339        "\n"
3340        "The following creates a macro called my_macro that expects two arguments:\n"
3341        "\n"
3342        "  macro create my_macro make_dinner --meat {1} --veggie {2}\n"
3343        "\n"
3344        "When the macro is called, the provided arguments are resolved and the\n"
3345        "assembled command is run. For example:\n"
3346        "\n"
3347        "  my_macro beef broccoli ---> make_dinner --meat beef --veggie broccoli\n"
3348        "\n"
3349        "Notes:\n"
3350        "  To use the literal string {1} in your command, escape it this way: {{1}}.\n"
3351        "\n"
3352        "  Extra arguments passed to a macro are appended to resolved command.\n"
3353        "\n"
3354        "  An argument number can be repeated in a macro. In the following example the\n"
3355        "  first argument will populate both {1} instances.\n"
3356        "\n"
3357        "    macro create ft file_taxes -p {1} -q {2} -r {1}\n"
3358        "\n"
3359        "  To quote an argument in the resolved command, quote it during creation.\n"
3360        "\n"
3361        "    macro create backup !cp \"{1}\" \"{1}.orig\"\n"
3362        "\n"
3363        "  If you want to use redirection, pipes, or terminators in the value of the\n"
3364        "  macro, then quote them.\n"
3365        "\n"
3366        "    macro create show_results print_results -type {1} \"|\" less\n"
3367        "\n"
3368        "  Because macros do not resolve until after hitting Enter, tab completion\n"
3369        "  will only complete paths while typing a macro."
3370    )
3371
3372    macro_create_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(
3373        description=macro_create_description, epilog=macro_create_epilog
3374    )
3375    macro_create_parser.add_argument('name', help='name of this macro')
3376    macro_create_parser.add_argument(
3377        'command', help='what the macro resolves to', choices_provider=_get_commands_aliases_and_macros_for_completion
3378    )
3379    macro_create_parser.add_argument(
3380        'command_args', nargs=argparse.REMAINDER, help='arguments to pass to command', completer=path_complete
3381    )
3382
3383    @as_subcommand_to('macro', 'create', macro_create_parser, help=macro_create_help)
3384    def _macro_create(self, args: argparse.Namespace) -> None:
3385        """Create or overwrite a macro"""
3386        self.last_result = False
3387
3388        # Validate the macro name
3389        valid, errmsg = self.statement_parser.is_valid_command(args.name)
3390        if not valid:
3391            self.perror(f"Invalid macro name: {errmsg}")
3392            return
3393
3394        if args.name in self.get_all_commands():
3395            self.perror("Macro cannot have the same name as a command")
3396            return
3397
3398        if args.name in self.aliases:
3399            self.perror("Macro cannot have the same name as an alias")
3400            return
3401
3402        # Unquote redirection and terminator tokens
3403        tokens_to_unquote = constants.REDIRECTION_TOKENS
3404        tokens_to_unquote.extend(self.statement_parser.terminators)
3405        utils.unquote_specific_tokens(args.command_args, tokens_to_unquote)
3406
3407        # Build the macro value string
3408        value = args.command
3409        if args.command_args:
3410            value += ' ' + ' '.join(args.command_args)
3411
3412        # Find all normal arguments
3413        arg_list = []
3414        normal_matches = re.finditer(MacroArg.macro_normal_arg_pattern, value)
3415        max_arg_num = 0
3416        arg_nums = set()
3417
3418        while True:
3419            try:
3420                cur_match = normal_matches.__next__()
3421
3422                # Get the number string between the braces
3423                cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0]
3424                cur_num = int(cur_num_str)
3425                if cur_num < 1:
3426                    self.perror("Argument numbers must be greater than 0")
3427                    return
3428
3429                arg_nums.add(cur_num)
3430                if cur_num > max_arg_num:
3431                    max_arg_num = cur_num
3432
3433                arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=False))
3434
3435            except StopIteration:
3436                break
3437
3438        # Make sure the argument numbers are continuous
3439        if len(arg_nums) != max_arg_num:
3440            self.perror(f"Not all numbers between 1 and {max_arg_num} are present in the argument placeholders")
3441            return
3442
3443        # Find all escaped arguments
3444        escaped_matches = re.finditer(MacroArg.macro_escaped_arg_pattern, value)
3445
3446        while True:
3447            try:
3448                cur_match = escaped_matches.__next__()
3449
3450                # Get the number string between the braces
3451                cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0]
3452
3453                arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=True))
3454            except StopIteration:
3455                break
3456
3457        # Set the macro
3458        result = "overwritten" if args.name in self.macros else "created"
3459        self.poutput(f"Macro '{args.name}' {result}")
3460
3461        self.macros[args.name] = Macro(name=args.name, value=value, minimum_arg_count=max_arg_num, arg_list=arg_list)
3462        self.last_result = True
3463
3464    # macro -> delete
3465    macro_delete_help = "delete macros"
3466    macro_delete_description = "Delete specified macros or all macros if --all is used"
3467    macro_delete_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=macro_delete_description)
3468    macro_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all macros")
3469    macro_delete_parser.add_argument(
3470        'names',
3471        nargs=argparse.ZERO_OR_MORE,
3472        help='macro(s) to delete',
3473        choices_provider=_get_macro_completion_items,
3474        descriptive_header=_macro_completion_table.generate_header(),
3475    )
3476
3477    @as_subcommand_to('macro', 'delete', macro_delete_parser, help=macro_delete_help)
3478    def _macro_delete(self, args: argparse.Namespace) -> None:
3479        """Delete macros"""
3480        self.last_result = True
3481
3482        if args.all:
3483            self.macros.clear()
3484            self.poutput("All macros deleted")
3485        elif not args.names:
3486            self.perror("Either --all or macro name(s) must be specified")
3487            self.last_result = False
3488        else:
3489            for cur_name in utils.remove_duplicates(args.names):
3490                if cur_name in self.macros:
3491                    del self.macros[cur_name]
3492                    self.poutput(f"Macro '{cur_name}' deleted")
3493                else:
3494                    self.perror(f"Macro '{cur_name}' does not exist")
3495
3496    # macro -> list
3497    macro_list_help = "list macros"
3498    macro_list_description = (
3499        "List specified macros in a reusable form that can be saved to a startup script\n"
3500        "to preserve macros across sessions\n"
3501        "\n"
3502        "Without arguments, all macros will be listed."
3503    )
3504
3505    macro_list_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=macro_list_description)
3506    macro_list_parser.add_argument(
3507        'names',
3508        nargs=argparse.ZERO_OR_MORE,
3509        help='macro(s) to list',
3510        choices_provider=_get_macro_completion_items,
3511        descriptive_header=_macro_completion_table.generate_header(),
3512    )
3513
3514    @as_subcommand_to('macro', 'list', macro_list_parser, help=macro_list_help)
3515    def _macro_list(self, args: argparse.Namespace) -> None:
3516        """List some or all macros as 'macro create' commands"""
3517        self.last_result = {}  # Dict[macro_name, macro_value]
3518
3519        tokens_to_quote = constants.REDIRECTION_TOKENS
3520        tokens_to_quote.extend(self.statement_parser.terminators)
3521
3522        if args.names:
3523            to_list = utils.remove_duplicates(args.names)
3524        else:
3525            to_list = sorted(self.macros, key=self.default_sort_key)
3526
3527        not_found: List[str] = []
3528        for name in to_list:
3529            if name not in self.macros:
3530                not_found.append(name)
3531                continue
3532
3533            # Quote redirection and terminator tokens for the 'macro create' command
3534            tokens = shlex_split(self.macros[name].value)
3535            command = tokens[0]
3536            command_args = tokens[1:]
3537            utils.quote_specific_tokens(command_args, tokens_to_quote)
3538
3539            val = command
3540            if command_args:
3541                val += ' ' + ' '.join(command_args)
3542
3543            self.poutput(f"macro create {name} {val}")
3544            self.last_result[name] = val
3545
3546        for name in not_found:
3547            self.perror(f"Macro '{name}' not found")
3548
3549    def complete_help_command(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
3550        """Completes the command argument of help"""
3551
3552        # Complete token against topics and visible commands
3553        topics = set(self.get_help_topics())
3554        visible_commands = set(self.get_visible_commands())
3555        strs_to_match = list(topics | visible_commands)
3556        return self.basic_complete(text, line, begidx, endidx, strs_to_match)
3557
3558    def complete_help_subcommands(
3559        self, text: str, line: str, begidx: int, endidx: int, arg_tokens: Dict[str, List[str]]
3560    ) -> List[str]:
3561        """Completes the subcommands argument of help"""
3562
3563        # Make sure we have a command whose subcommands we will complete
3564        command = arg_tokens['command'][0]
3565        if not command:
3566            return []
3567
3568        # Check if this command uses argparse
3569        func = self.cmd_func(command)
3570        argparser = getattr(func, constants.CMD_ATTR_ARGPARSER, None)
3571        if func is None or argparser is None:
3572            return []
3573
3574        completer = argparse_completer.DEFAULT_AP_COMPLETER(argparser, self)
3575        return completer.complete_subcommand_help(text, line, begidx, endidx, arg_tokens['subcommands'])
3576
3577    help_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(
3578        description="List available commands or provide " "detailed help for a specific command"
3579    )
3580    help_parser.add_argument(
3581        '-v', '--verbose', action='store_true', help="print a list of all commands with descriptions of each"
3582    )
3583    help_parser.add_argument(
3584        'command', nargs=argparse.OPTIONAL, help="command to retrieve help for", completer=complete_help_command
3585    )
3586    help_parser.add_argument(
3587        'subcommands', nargs=argparse.REMAINDER, help="subcommand(s) to retrieve help for", completer=complete_help_subcommands
3588    )
3589
3590    # Get rid of cmd's complete_help() functions so ArgparseCompleter will complete the help command
3591    if getattr(cmd.Cmd, 'complete_help', None) is not None:
3592        delattr(cmd.Cmd, 'complete_help')
3593
3594    @with_argparser(help_parser)
3595    def do_help(self, args: argparse.Namespace) -> None:
3596        """List available commands or provide detailed help for a specific command"""
3597        self.last_result = True
3598
3599        if not args.command or args.verbose:
3600            self._help_menu(args.verbose)
3601
3602        else:
3603            # Getting help for a specific command
3604            func = self.cmd_func(args.command)
3605            help_func = getattr(self, constants.HELP_FUNC_PREFIX + args.command, None)
3606            argparser = getattr(func, constants.CMD_ATTR_ARGPARSER, None)
3607
3608            # If the command function uses argparse, then use argparse's help
3609            if func is not None and argparser is not None:
3610                completer = argparse_completer.DEFAULT_AP_COMPLETER(argparser, self)
3611
3612                # Set end to blank so the help output matches how it looks when "command -h" is used
3613                self.poutput(completer.format_help(args.subcommands), end='')
3614
3615            # If there is a help func delegate to do_help
3616            elif help_func is not None:
3617                super().do_help(args.command)
3618
3619            # If there's no help_func __doc__ then format and output it
3620            elif func is not None and func.__doc__ is not None:
3621                self.poutput(pydoc.getdoc(func))
3622
3623            # If there is no help information then print an error
3624            else:
3625                err_msg = self.help_error.format(args.command)
3626
3627                # Set apply_style to False so help_error's style is not overridden
3628                self.perror(err_msg, apply_style=False)
3629                self.last_result = False
3630
3631    def print_topics(self, header: str, cmds: Optional[List[str]], cmdlen: int, maxcol: int) -> None:
3632        """
3633        Print groups of commands and topics in columns and an optional header
3634        Override of cmd's print_topics() to handle headers with newlines, ANSI style sequences, and wide characters
3635
3636        :param header: string to print above commands being printed
3637        :param cmds: list of topics to print
3638        :param cmdlen: unused, even by cmd's version
3639        :param maxcol: max number of display columns to fit into
3640        """
3641        if cmds:
3642            self.poutput(header)
3643            if self.ruler:
3644                divider = utils.align_left('', fill_char=self.ruler, width=ansi.widest_line(header))
3645                self.poutput(divider)
3646            self.columnize(cmds, maxcol - 1)
3647            self.poutput()
3648
3649    def columnize(self, str_list: Optional[List[str]], display_width: int = 80) -> None:
3650        """Display a list of single-line strings as a compact set of columns.
3651        Override of cmd's print_topics() to handle strings with ANSI style sequences and wide characters
3652
3653        Each column is only as wide as necessary.
3654        Columns are separated by two spaces (one was not legible enough).
3655        """
3656        if not str_list:
3657            self.poutput("<empty>")
3658            return
3659
3660        nonstrings = [i for i in range(len(str_list)) if not isinstance(str_list[i], str)]
3661        if nonstrings:
3662            raise TypeError(f"str_list[i] not a string for i in {nonstrings}")
3663        size = len(str_list)
3664        if size == 1:
3665            self.poutput(str_list[0])
3666            return
3667        # Try every row count from 1 upwards
3668        for nrows in range(1, len(str_list)):
3669            ncols = (size + nrows - 1) // nrows
3670            colwidths = []
3671            totwidth = -2
3672            for col in range(ncols):
3673                colwidth = 0
3674                for row in range(nrows):
3675                    i = row + nrows * col
3676                    if i >= size:
3677                        break
3678                    x = str_list[i]
3679                    colwidth = max(colwidth, ansi.style_aware_wcswidth(x))
3680                colwidths.append(colwidth)
3681                totwidth += colwidth + 2
3682                if totwidth > display_width:
3683                    break
3684            if totwidth <= display_width:
3685                break
3686        else:
3687            nrows = len(str_list)
3688            ncols = 1
3689            colwidths = [0]
3690        for row in range(nrows):
3691            texts = []
3692            for col in range(ncols):
3693                i = row + nrows * col
3694                if i >= size:
3695                    x = ""
3696                else:
3697                    x = str_list[i]
3698                texts.append(x)
3699            while texts and not texts[-1]:
3700                del texts[-1]
3701            for col in range(len(texts)):
3702                texts[col] = utils.align_left(texts[col], width=colwidths[col])
3703            self.poutput("  ".join(texts))
3704
3705    def _help_menu(self, verbose: bool = False) -> None:
3706        """Show a list of commands which help can be displayed for"""
3707        cmds_cats, cmds_doc, cmds_undoc, help_topics = self._build_command_info()
3708
3709        if not cmds_cats:
3710            # No categories found, fall back to standard behavior
3711            self.poutput(self.doc_leader)
3712            self._print_topics(self.doc_header, cmds_doc, verbose)
3713        else:
3714            # Categories found, Organize all commands by category
3715            self.poutput(self.doc_leader)
3716            self.poutput(self.doc_header, end="\n\n")
3717            for category in sorted(cmds_cats.keys(), key=self.default_sort_key):
3718                self._print_topics(category, cmds_cats[category], verbose)
3719            self._print_topics(self.default_category, cmds_doc, verbose)
3720
3721        self.print_topics(self.misc_header, help_topics, 15, 80)
3722        self.print_topics(self.undoc_header, cmds_undoc, 15, 80)
3723
3724    def _build_command_info(self) -> Tuple[Dict[str, List[str]], List[str], List[str], List[str]]:
3725        # Get a sorted list of help topics
3726        help_topics = sorted(self.get_help_topics(), key=self.default_sort_key)
3727        # Get a sorted list of visible command names
3728        visible_commands = sorted(self.get_visible_commands(), key=self.default_sort_key)
3729        cmds_doc: List[str] = []
3730        cmds_undoc: List[str] = []
3731        cmds_cats: Dict[str, List[str]] = {}
3732        for command in visible_commands:
3733            func = self.cmd_func(command)
3734            has_help_func = False
3735
3736            if command in help_topics:
3737                # Prevent the command from showing as both a command and help topic in the output
3738                help_topics.remove(command)
3739
3740                # Non-argparse commands can have help_functions for their documentation
3741                if not hasattr(func, constants.CMD_ATTR_ARGPARSER):
3742                    has_help_func = True
3743
3744            if hasattr(func, constants.CMD_ATTR_HELP_CATEGORY):
3745                category: str = getattr(func, constants.CMD_ATTR_HELP_CATEGORY)
3746                cmds_cats.setdefault(category, [])
3747                cmds_cats[category].append(command)
3748            elif func.__doc__ or has_help_func:
3749                cmds_doc.append(command)
3750            else:
3751                cmds_undoc.append(command)
3752        return cmds_cats, cmds_doc, cmds_undoc, help_topics
3753
3754    def _print_topics(self, header: str, cmds: List[str], verbose: bool) -> None:
3755        """Customized version of print_topics that can switch between verbose or traditional output"""
3756        import io
3757
3758        if cmds:
3759            if not verbose:
3760                self.print_topics(header, cmds, 15, 80)
3761            else:
3762                # Find the widest command
3763                widest = max([ansi.style_aware_wcswidth(command) for command in cmds])
3764
3765                # Define the table structure
3766                name_column = Column('', width=max(widest, 20))
3767                desc_column = Column('', width=80)
3768
3769                topic_table = SimpleTable([name_column, desc_column], divider_char=self.ruler)
3770
3771                # Build the topic table
3772                table_str_buf = io.StringIO()
3773                if header:
3774                    table_str_buf.write(header + "\n")
3775
3776                divider = topic_table.generate_divider()
3777                if divider:
3778                    table_str_buf.write(divider + "\n")
3779
3780                # Try to get the documentation string for each command
3781                topics = self.get_help_topics()
3782                for command in cmds:
3783                    cmd_func = self.cmd_func(command)
3784                    doc: Optional[str]
3785
3786                    # Non-argparse commands can have help_functions for their documentation
3787                    if not hasattr(cmd_func, constants.CMD_ATTR_ARGPARSER) and command in topics:
3788                        help_func = getattr(self, constants.HELP_FUNC_PREFIX + command)
3789                        result = io.StringIO()
3790
3791                        # try to redirect system stdout
3792                        with redirect_stdout(result):
3793                            # save our internal stdout
3794                            stdout_orig = self.stdout
3795                            try:
3796                                # redirect our internal stdout
3797                                self.stdout = cast(TextIO, result)
3798                                help_func()
3799                            finally:
3800                                # restore internal stdout
3801                                self.stdout = stdout_orig
3802                        doc = result.getvalue()
3803
3804                    else:
3805                        doc = cmd_func.__doc__
3806
3807                    # Attempt to locate the first documentation block
3808                    cmd_desc = ''
3809                    if doc:
3810                        found_first = False
3811                        for doc_line in doc.splitlines():
3812                            stripped_line = doc_line.strip()
3813
3814                            # Don't include :param type lines
3815                            if stripped_line.startswith(':'):
3816                                if found_first:
3817                                    break
3818                            elif stripped_line:
3819                                if found_first:
3820                                    cmd_desc += "\n"
3821                                cmd_desc += stripped_line
3822                                found_first = True
3823                            elif found_first:
3824                                break
3825
3826                    # Add this command to the table
3827                    table_row = topic_table.generate_data_row([command, cmd_desc])
3828                    table_str_buf.write(table_row + '\n')
3829
3830                self.poutput(table_str_buf.getvalue())
3831
3832    shortcuts_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="List available shortcuts")
3833
3834    @with_argparser(shortcuts_parser)
3835    def do_shortcuts(self, _: argparse.Namespace) -> None:
3836        """List available shortcuts"""
3837        # Sort the shortcut tuples by name
3838        sorted_shortcuts = sorted(self.statement_parser.shortcuts, key=lambda x: self.default_sort_key(x[0]))
3839        result = "\n".join('{}: {}'.format(sc[0], sc[1]) for sc in sorted_shortcuts)
3840        self.poutput(f"Shortcuts for other commands:\n{result}")
3841        self.last_result = True
3842
3843    eof_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(
3844        description="Called when Ctrl-D is pressed", epilog=INTERNAL_COMMAND_EPILOG
3845    )
3846
3847    @with_argparser(eof_parser)
3848    def do_eof(self, _: argparse.Namespace) -> Optional[bool]:
3849        """
3850        Called when Ctrl-D is pressed and calls quit with no arguments.
3851        This can be overridden if quit should be called differently.
3852        """
3853        self.poutput()
3854
3855        # self.last_result will be set by do_quit()
3856        # noinspection PyTypeChecker
3857        return self.do_quit('')
3858
3859    quit_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Exit this application")
3860
3861    @with_argparser(quit_parser)
3862    def do_quit(self, _: argparse.Namespace) -> Optional[bool]:
3863        """Exit this application"""
3864        # Return True to stop the command loop
3865        self.last_result = True
3866        return True
3867
3868    def select(self, opts: Union[str, List[str], List[Tuple[Any, Optional[str]]]], prompt: str = 'Your choice? ') -> Any:
3869        """Presents a numbered menu to the user.  Modeled after
3870        the bash shell's SELECT.  Returns the item chosen.
3871
3872        Argument ``opts`` can be:
3873
3874          | a single string -> will be split into one-word options
3875          | a list of strings -> will be offered as options
3876          | a list of tuples -> interpreted as (value, text), so
3877                                that the return value can differ from
3878                                the text advertised to the user"""
3879        local_opts: Union[List[str], List[Tuple[Any, Optional[str]]]]
3880        if isinstance(opts, str):
3881            local_opts = cast(List[Tuple[Any, Optional[str]]], list(zip(opts.split(), opts.split())))
3882        else:
3883            local_opts = opts
3884        fulloptions: List[Tuple[Any, Optional[str]]] = []
3885        for opt in local_opts:
3886            if isinstance(opt, str):
3887                fulloptions.append((opt, opt))
3888            else:
3889                try:
3890                    fulloptions.append((opt[0], opt[1]))
3891                except IndexError:
3892                    fulloptions.append((opt[0], opt[0]))
3893        for (idx, (_, text)) in enumerate(fulloptions):
3894            self.poutput('  %2d. %s' % (idx + 1, text))
3895
3896        while True:
3897            try:
3898                response = self.read_input(prompt)
3899            except EOFError:
3900                response = ''
3901                self.poutput()
3902            except KeyboardInterrupt as ex:
3903                self.poutput('^C')
3904                raise ex
3905
3906            if not response:
3907                continue
3908
3909            try:
3910                choice = int(response)
3911                if choice < 1:
3912                    raise IndexError
3913                return fulloptions[choice - 1][0]
3914            except (ValueError, IndexError):
3915                self.poutput(f"'{response}' isn't a valid choice. Pick a number between 1 and {len(fulloptions)}:")
3916
3917    def complete_set_value(
3918        self, text: str, line: str, begidx: int, endidx: int, arg_tokens: Dict[str, List[str]]
3919    ) -> List[str]:
3920        """Completes the value argument of set"""
3921        param = arg_tokens['param'][0]
3922        try:
3923            settable = self.settables[param]
3924        except KeyError:
3925            raise CompletionError(param + " is not a settable parameter")
3926
3927        # Create a parser with a value field based on this settable
3928        settable_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(parents=[Cmd.set_parser_parent])
3929
3930        # Settables with choices list the values of those choices instead of the arg name
3931        # in help text and this shows in tab completion hints. Set metavar to avoid this.
3932        arg_name = 'value'
3933        settable_parser.add_argument(
3934            arg_name,
3935            metavar=arg_name,
3936            help=settable.description,
3937            choices=settable.choices,  # type: ignore[arg-type]
3938            choices_provider=settable.choices_provider,
3939            completer=settable.completer,
3940        )
3941
3942        completer = argparse_completer.DEFAULT_AP_COMPLETER(settable_parser, self)
3943
3944        # Use raw_tokens since quotes have been preserved
3945        _, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
3946        return completer.complete(text, line, begidx, endidx, raw_tokens[1:])
3947
3948    # When tab completing value, we recreate the set command parser with a value argument specific to
3949    # the settable being edited. To make this easier, define a parent parser with all the common elements.
3950    set_description = (
3951        "Set a settable parameter or show current settings of parameters\n"
3952        "Call without arguments for a list of all settable parameters with their values.\n"
3953        "Call with just param to view that parameter's value."
3954    )
3955    set_parser_parent = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=set_description, add_help=False)
3956    set_parser_parent.add_argument(
3957        'param',
3958        nargs=argparse.OPTIONAL,
3959        help='parameter to set or view',
3960        choices_provider=_get_settable_completion_items,
3961        descriptive_header=_settable_completion_table.generate_header(),
3962    )
3963
3964    # Create the parser for the set command
3965    set_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(parents=[set_parser_parent])
3966    set_parser.add_argument(
3967        'value', nargs=argparse.OPTIONAL, help='new value for settable', completer=complete_set_value, suppress_tab_hint=True
3968    )
3969
3970    # Preserve quotes so users can pass in quoted empty strings and flags (e.g. -h) as the value
3971    @with_argparser(set_parser, preserve_quotes=True)
3972    def do_set(self, args: argparse.Namespace) -> None:
3973        """Set a settable parameter or show current settings of parameters"""
3974        self.last_result = False
3975
3976        if not self.settables:
3977            self.pwarning("There are no settable parameters")
3978            return
3979
3980        if args.param:
3981            try:
3982                settable = self.settables[args.param]
3983            except KeyError:
3984                self.perror(f"Parameter '{args.param}' not supported (type 'set' for list of parameters).")
3985                return
3986
3987            if args.value:
3988                # Try to update the settable's value
3989                try:
3990                    orig_value = settable.get_value()
3991                    new_value = settable.set_value(utils.strip_quotes(args.value))
3992                # noinspection PyBroadException
3993                except Exception as ex:
3994                    self.perror(f"Error setting {args.param}: {ex}")
3995                else:
3996                    self.poutput(f"{args.param} - was: {orig_value!r}\nnow: {new_value!r}")
3997                    self.last_result = True
3998                return
3999
4000            # Show one settable
4001            to_show = [args.param]
4002        else:
4003            # Show all settables
4004            to_show = list(self.settables.keys())
4005
4006        # Define the table structure
4007        name_label = 'Name'
4008        max_name_width = max([ansi.style_aware_wcswidth(param) for param in to_show])
4009        max_name_width = max(max_name_width, ansi.style_aware_wcswidth(name_label))
4010
4011        cols: List[Column] = [
4012            Column(name_label, width=max_name_width),
4013            Column('Value', width=30),
4014            Column('Description', width=60),
4015        ]
4016
4017        table = SimpleTable(cols, divider_char=self.ruler)
4018        self.poutput(table.generate_header())
4019
4020        # Build the table and populate self.last_result
4021        self.last_result = {}  # Dict[settable_name, settable_value]
4022
4023        for param in sorted(to_show, key=self.default_sort_key):
4024            settable = self.settables[param]
4025            row_data = [param, settable.get_value(), settable.description]
4026            self.poutput(table.generate_data_row(row_data))
4027            self.last_result[param] = settable.get_value()
4028
4029    shell_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Execute a command as if at the OS prompt")
4030    shell_parser.add_argument('command', help='the command to run', completer=shell_cmd_complete)
4031    shell_parser.add_argument(
4032        'command_args', nargs=argparse.REMAINDER, help='arguments to pass to command', completer=path_complete
4033    )
4034
4035    # Preserve quotes since we are passing these strings to the shell
4036    @with_argparser(shell_parser, preserve_quotes=True)
4037    def do_shell(self, args: argparse.Namespace) -> None:
4038        """Execute a command as if at the OS prompt"""
4039        import signal
4040        import subprocess
4041
4042        kwargs: Dict[str, Any] = dict()
4043
4044        # Set OS-specific parameters
4045        if sys.platform.startswith('win'):
4046            # Windows returns STATUS_CONTROL_C_EXIT when application stopped by Ctrl-C
4047            ctrl_c_ret_code = 0xC000013A
4048        else:
4049            # On POSIX, Popen() returns -SIGINT when application stopped by Ctrl-C
4050            ctrl_c_ret_code = signal.SIGINT.value * -1
4051
4052            # On POSIX with shell=True, Popen() defaults to /bin/sh as the shell.
4053            # sh reports an incorrect return code for some applications when Ctrl-C is pressed within that
4054            # application (e.g. less). Since sh received the SIGINT, it sets the return code to reflect being
4055            # closed by SIGINT even though less did not exit upon a Ctrl-C press. In the same situation, other
4056            # shells like bash and zsh report the actual return code of less. Therefore we will try to run the
4057            # user's preferred shell which most likely will be something other than sh. This also allows the user
4058            # to run builtin commands of their preferred shell.
4059            shell = os.environ.get("SHELL")
4060            if shell:
4061                kwargs['executable'] = shell
4062
4063        # Create a list of arguments to shell
4064        tokens = [args.command] + args.command_args
4065
4066        # Expand ~ where needed
4067        utils.expand_user_in_tokens(tokens)
4068        expanded_command = ' '.join(tokens)
4069
4070        # Prevent KeyboardInterrupts while in the shell process. The shell process will
4071        # still receive the SIGINT since it is in the same process group as us.
4072        with self.sigint_protection:
4073            # For any stream that is a StdSim, we will use a pipe so we can capture its output
4074            proc = subprocess.Popen(  # type: ignore[call-overload]
4075                expanded_command,
4076                stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout,  # type: ignore[unreachable]
4077                stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr,  # type: ignore[unreachable]
4078                shell=True,
4079                **kwargs,
4080            )
4081
4082            proc_reader = utils.ProcReader(proc, cast(TextIO, self.stdout), sys.stderr)  # type: ignore[arg-type]
4083            proc_reader.wait()
4084
4085            # Save the return code of the application for use in a pyscript
4086            self.last_result = proc.returncode
4087
4088            # If the process was stopped by Ctrl-C, then inform the caller by raising a KeyboardInterrupt.
4089            # This is to support things like stop_on_keyboard_interrupt in run_cmds_plus_hooks().
4090            if proc.returncode == ctrl_c_ret_code:
4091                self._raise_keyboard_interrupt()
4092
4093    @staticmethod
4094    def _reset_py_display() -> None:
4095        """
4096        Resets the dynamic objects in the sys module that the py and ipy consoles fight over.
4097        When a Python console starts it adopts certain display settings if they've already been set.
4098        If an ipy console has previously been run, then py uses its settings and ends up looking
4099        like an ipy console in terms of prompt and exception text. This method forces the Python
4100        console to create its own display settings since they won't exist.
4101
4102        IPython does not have this problem since it always overwrites the display settings when it
4103        is run. Therefore this method only needs to be called before creating a Python console.
4104        """
4105        # Delete any prompts that have been set
4106        attributes = ['ps1', 'ps2', 'ps3']
4107        for cur_attr in attributes:
4108            try:
4109                del sys.__dict__[cur_attr]
4110            except KeyError:
4111                pass
4112
4113        # Reset functions
4114        sys.displayhook = sys.__displayhook__
4115        sys.excepthook = sys.__excepthook__
4116
4117    def _set_up_py_shell_env(self, interp: InteractiveConsole) -> _SavedCmd2Env:
4118        """
4119        Set up interactive Python shell environment
4120        :return: Class containing saved up cmd2 environment
4121        """
4122        cmd2_env = _SavedCmd2Env()
4123
4124        # Set up readline for Python shell
4125        if rl_type != RlType.NONE:
4126            # Save cmd2 history
4127            for i in range(1, readline.get_current_history_length() + 1):
4128                # noinspection PyArgumentList
4129                cmd2_env.history.append(readline.get_history_item(i))
4130
4131            readline.clear_history()
4132
4133            # Restore py's history
4134            for item in self._py_history:
4135                readline.add_history(item)
4136
4137            if self._completion_supported():
4138                # Set up tab completion for the Python console
4139                # rlcompleter relies on the default settings of the Python readline module
4140                if rl_type == RlType.GNU:
4141                    cmd2_env.readline_settings.basic_quotes = cast(
4142                        bytes, ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value
4143                    )
4144                    rl_basic_quote_characters.value = orig_rl_basic_quotes
4145
4146                    if 'gnureadline' in sys.modules:
4147                        # rlcompleter imports readline by name, so it won't use gnureadline
4148                        # Force rlcompleter to use gnureadline instead so it has our settings and history
4149                        if 'readline' in sys.modules:
4150                            cmd2_env.readline_module = sys.modules['readline']
4151
4152                        sys.modules['readline'] = sys.modules['gnureadline']
4153
4154                cmd2_env.readline_settings.delims = readline.get_completer_delims()
4155                readline.set_completer_delims(orig_rl_delims)
4156
4157                # rlcompleter will not need cmd2's custom display function
4158                # This will be restored by cmd2 the next time complete() is called
4159                if rl_type == RlType.GNU:
4160                    readline.set_completion_display_matches_hook(None)
4161                elif rl_type == RlType.PYREADLINE:
4162                    # noinspection PyUnresolvedReferences
4163                    readline.rl.mode._display_completions = orig_pyreadline_display
4164
4165                # Save off the current completer and set a new one in the Python console
4166                # Make sure it tab completes from its locals() dictionary
4167                cmd2_env.readline_settings.completer = readline.get_completer()
4168                interp.runcode("from rlcompleter import Completer")  # type: ignore[arg-type]
4169                interp.runcode("import readline")  # type: ignore[arg-type]
4170                interp.runcode("readline.set_completer(Completer(locals()).complete)")  # type: ignore[arg-type]
4171
4172        # Set up sys module for the Python console
4173        self._reset_py_display()
4174
4175        cmd2_env.sys_stdout = sys.stdout
4176        sys.stdout = self.stdout  # type: ignore[assignment]
4177
4178        cmd2_env.sys_stdin = sys.stdin
4179        sys.stdin = self.stdin  # type: ignore[assignment]
4180
4181        return cmd2_env
4182
4183    def _restore_cmd2_env(self, cmd2_env: _SavedCmd2Env) -> None:
4184        """
4185        Restore cmd2 environment after exiting an interactive Python shell
4186
4187        :param cmd2_env: the environment settings to restore
4188        """
4189        sys.stdout = cmd2_env.sys_stdout  # type: ignore[assignment]
4190        sys.stdin = cmd2_env.sys_stdin  # type: ignore[assignment]
4191
4192        # Set up readline for cmd2
4193        if rl_type != RlType.NONE:
4194            # Save py's history
4195            self._py_history.clear()
4196            for i in range(1, readline.get_current_history_length() + 1):
4197                # noinspection PyArgumentList
4198                self._py_history.append(readline.get_history_item(i))
4199
4200            readline.clear_history()
4201
4202            # Restore cmd2's history
4203            for item in cmd2_env.history:
4204                readline.add_history(item)
4205
4206            if self._completion_supported():
4207                # Restore cmd2's tab completion settings
4208                readline.set_completer(cmd2_env.readline_settings.completer)
4209                readline.set_completer_delims(cmd2_env.readline_settings.delims)
4210
4211                if rl_type == RlType.GNU:
4212                    rl_basic_quote_characters.value = cmd2_env.readline_settings.basic_quotes
4213
4214                    if 'gnureadline' in sys.modules:
4215                        # Restore what the readline module pointed to
4216                        if cmd2_env.readline_module is None:
4217                            del sys.modules['readline']
4218                        else:
4219                            sys.modules['readline'] = cmd2_env.readline_module
4220
4221    def _run_python(self, *, pyscript: Optional[str] = None) -> Optional[bool]:
4222        """
4223        Called by do_py() and do_run_pyscript().
4224        If pyscript is None, then this function runs an interactive Python shell.
4225        Otherwise, it runs the pyscript file.
4226
4227        :param pyscript: optional path to a pyscript file to run. This is intended only to be used by do_run_pyscript()
4228                         after it sets up sys.argv for the script. (Defaults to None)
4229        :return: True if running of commands should stop
4230        """
4231        self.last_result = False
4232
4233        def py_quit() -> None:
4234            """Function callable from the interactive Python console to exit that environment"""
4235            raise EmbeddedConsoleExit
4236
4237        from .py_bridge import (
4238            PyBridge,
4239        )
4240
4241        py_bridge = PyBridge(self)
4242        saved_sys_path = None
4243
4244        if self.in_pyscript():
4245            self.perror("Recursively entering interactive Python shells is not allowed")
4246            return None
4247
4248        try:
4249            self._in_py = True
4250            py_code_to_run = ''
4251
4252            # Make a copy of self.py_locals for the locals dictionary in the Python environment we are creating.
4253            # This is to prevent pyscripts from editing it. (e.g. locals().clear()). It also ensures a pyscript's
4254            # environment won't be filled with data from a previously run pyscript. Only make a shallow copy since
4255            # it's OK for py_locals to contain objects which are editable in a pyscript.
4256            local_vars = self.py_locals.copy()
4257            local_vars[self.py_bridge_name] = py_bridge
4258            local_vars['quit'] = py_quit
4259            local_vars['exit'] = py_quit
4260
4261            if self.self_in_py:
4262                local_vars['self'] = self
4263
4264            # Handle case where we were called by do_run_pyscript()
4265            if pyscript is not None:
4266                # Read the script file
4267                expanded_filename = os.path.expanduser(pyscript)
4268
4269                try:
4270                    with open(expanded_filename) as f:
4271                        py_code_to_run = f.read()
4272                except OSError as ex:
4273                    self.perror(f"Error reading script file '{expanded_filename}': {ex}")
4274                    return None
4275
4276                local_vars['__name__'] = '__main__'
4277                local_vars['__file__'] = expanded_filename
4278
4279                # Place the script's directory at sys.path[0] just as Python does when executing a script
4280                saved_sys_path = list(sys.path)
4281                sys.path.insert(0, os.path.dirname(os.path.abspath(expanded_filename)))
4282
4283            else:
4284                # This is the default name chosen by InteractiveConsole when no locals are passed in
4285                local_vars['__name__'] = '__console__'
4286
4287            # Create the Python interpreter
4288            self.last_result = True
4289            interp = InteractiveConsole(locals=local_vars)
4290
4291            # Check if we are running Python code
4292            if py_code_to_run:
4293                # noinspection PyBroadException
4294                try:
4295                    interp.runcode(py_code_to_run)  # type: ignore[arg-type]
4296                except BaseException:
4297                    # We don't care about any exception that happened in the Python code
4298                    pass
4299
4300            # Otherwise we will open an interactive Python shell
4301            else:
4302                cprt = 'Type "help", "copyright", "credits" or "license" for more information.'
4303                instructions = (
4304                    'Use `Ctrl-D` (Unix) / `Ctrl-Z` (Windows), `quit()`, `exit()` to exit.\n'
4305                    f'Run CLI commands with: {self.py_bridge_name}("command ...")'
4306                )
4307                banner = f"Python {sys.version} on {sys.platform}\n{cprt}\n\n{instructions}\n"
4308
4309                saved_cmd2_env = None
4310
4311                # noinspection PyBroadException
4312                try:
4313                    # Get sigint protection while we set up the Python shell environment
4314                    with self.sigint_protection:
4315                        saved_cmd2_env = self._set_up_py_shell_env(interp)
4316
4317                    # Since quit() or exit() raise an EmbeddedConsoleExit, interact() exits before printing
4318                    # the exitmsg. Therefore we will not provide it one and print it manually later.
4319                    interp.interact(banner=banner, exitmsg='')
4320                except BaseException:
4321                    # We don't care about any exception that happened in the interactive console
4322                    pass
4323                finally:
4324                    # Get sigint protection while we restore cmd2 environment settings
4325                    with self.sigint_protection:
4326                        if saved_cmd2_env is not None:
4327                            self._restore_cmd2_env(saved_cmd2_env)
4328                    self.poutput("Now exiting Python shell...")
4329
4330        finally:
4331            with self.sigint_protection:
4332                if saved_sys_path is not None:
4333                    sys.path = saved_sys_path
4334                self._in_py = False
4335
4336        return py_bridge.stop
4337
4338    py_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Run an interactive Python shell")
4339
4340    @with_argparser(py_parser)
4341    def do_py(self, _: argparse.Namespace) -> Optional[bool]:
4342        """
4343        Run an interactive Python shell
4344        :return: True if running of commands should stop
4345        """
4346        # self.last_resort will be set by _run_python()
4347        return self._run_python()
4348
4349    run_pyscript_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Run a Python script file inside the console")
4350    run_pyscript_parser.add_argument('script_path', help='path to the script file', completer=path_complete)
4351    run_pyscript_parser.add_argument(
4352        'script_arguments', nargs=argparse.REMAINDER, help='arguments to pass to script', completer=path_complete
4353    )
4354
4355    @with_argparser(run_pyscript_parser)
4356    def do_run_pyscript(self, args: argparse.Namespace) -> Optional[bool]:
4357        """
4358        Run a Python script file inside the console
4359
4360        :return: True if running of commands should stop
4361        """
4362        self.last_result = False
4363
4364        # Expand ~ before placing this path in sys.argv just as a shell would
4365        args.script_path = os.path.expanduser(args.script_path)
4366
4367        # Add some protection against accidentally running a non-Python file. The happens when users
4368        # mix up run_script and run_pyscript.
4369        if not args.script_path.endswith('.py'):
4370            self.pwarning(f"'{args.script_path}' does not have a .py extension")
4371            selection = self.select('Yes No', 'Continue to try to run it as a Python script? ')
4372            if selection != 'Yes':
4373                return None
4374
4375        # Save current command line arguments
4376        orig_args = sys.argv
4377
4378        try:
4379            # Overwrite sys.argv to allow the script to take command line arguments
4380            sys.argv = [args.script_path] + args.script_arguments
4381
4382            # self.last_resort will be set by _run_python()
4383            py_return = self._run_python(pyscript=args.script_path)
4384        finally:
4385            # Restore command line arguments to original state
4386            sys.argv = orig_args
4387
4388        return py_return
4389
4390    ipython_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Run an interactive IPython shell")
4391
4392    # noinspection PyPackageRequirements
4393    @with_argparser(ipython_parser)
4394    def do_ipy(self, _: argparse.Namespace) -> Optional[bool]:  # pragma: no cover
4395        """
4396        Enter an interactive IPython shell
4397
4398        :return: True if running of commands should stop
4399        """
4400        self.last_result = False
4401
4402        # Detect whether IPython is installed
4403        try:
4404            import traitlets.config.loader as TraitletsLoader  # type: ignore[import]
4405            from IPython import (  # type: ignore[import]
4406                start_ipython,
4407            )
4408            from IPython.terminal.interactiveshell import (  # type: ignore[import]
4409                TerminalInteractiveShell,
4410            )
4411            from IPython.terminal.ipapp import (  # type: ignore[import]
4412                TerminalIPythonApp,
4413            )
4414        except ImportError:
4415            self.perror("IPython package is not installed")
4416            return None
4417
4418        from .py_bridge import (
4419            PyBridge,
4420        )
4421
4422        if self.in_pyscript():
4423            self.perror("Recursively entering interactive Python shells is not allowed")
4424            return None
4425
4426        self.last_result = True
4427
4428        try:
4429            self._in_py = True
4430            py_bridge = PyBridge(self)
4431
4432            # Make a copy of self.py_locals for the locals dictionary in the IPython environment we are creating.
4433            # This is to prevent ipy from editing it. (e.g. locals().clear()). Only make a shallow copy since
4434            # it's OK for py_locals to contain objects which are editable in ipy.
4435            local_vars = self.py_locals.copy()
4436            local_vars[self.py_bridge_name] = py_bridge
4437            if self.self_in_py:
4438                local_vars['self'] = self
4439
4440            # Configure IPython
4441            config = TraitletsLoader.Config()
4442            config.InteractiveShell.banner2 = (
4443                'Entering an IPython shell. Type exit, quit, or Ctrl-D to exit.\n'
4444                f'Run CLI commands with: {self.py_bridge_name}("command ...")\n'
4445            )
4446
4447            # Start IPython
4448            start_ipython(config=config, argv=[], user_ns=local_vars)
4449            self.poutput("Now exiting IPython shell...")
4450
4451            # The IPython application is a singleton and won't be recreated next time
4452            # this function runs. That's a problem since the contents of local_vars
4453            # may need to be changed. Therefore we must destroy all instances of the
4454            # relevant classes.
4455            TerminalIPythonApp.clear_instance()
4456            TerminalInteractiveShell.clear_instance()
4457
4458            return py_bridge.stop
4459        finally:
4460            self._in_py = False
4461
4462    history_description = "View, run, edit, save, or clear previously entered commands"
4463
4464    history_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=history_description)
4465    history_action_group = history_parser.add_mutually_exclusive_group()
4466    history_action_group.add_argument('-r', '--run', action='store_true', help='run selected history items')
4467    history_action_group.add_argument('-e', '--edit', action='store_true', help='edit and then run selected history items')
4468    history_action_group.add_argument(
4469        '-o', '--output_file', metavar='FILE', help='output commands to a script file, implies -s', completer=path_complete
4470    )
4471    history_action_group.add_argument(
4472        '-t',
4473        '--transcript',
4474        metavar='TRANSCRIPT_FILE',
4475        help='output commands and results to a transcript file,\nimplies -s',
4476        completer=path_complete,
4477    )
4478    history_action_group.add_argument('-c', '--clear', action='store_true', help='clear all history')
4479
4480    history_format_group = history_parser.add_argument_group(title='formatting')
4481    history_format_group.add_argument(
4482        '-s', '--script', action='store_true', help='output commands in script format, i.e. without command\n' 'numbers'
4483    )
4484    history_format_group.add_argument(
4485        '-x',
4486        '--expanded',
4487        action='store_true',
4488        help='output fully parsed commands with any aliases and\n' 'macros expanded, instead of typed commands',
4489    )
4490    history_format_group.add_argument(
4491        '-v',
4492        '--verbose',
4493        action='store_true',
4494        help='display history and include expanded commands if they\n' 'differ from the typed command',
4495    )
4496    history_format_group.add_argument(
4497        '-a', '--all', action='store_true', help='display all commands, including ones persisted from\n' 'previous sessions'
4498    )
4499
4500    history_arg_help = (
4501        "empty               all history items\n"
4502        "a                   one history item by number\n"
4503        "a..b, a:b, a:, ..b  items by indices (inclusive)\n"
4504        "string              items containing string\n"
4505        "/regex/             items matching regular expression"
4506    )
4507    history_parser.add_argument('arg', nargs=argparse.OPTIONAL, help=history_arg_help)
4508
4509    @with_argparser(history_parser)
4510    def do_history(self, args: argparse.Namespace) -> Optional[bool]:
4511        """
4512        View, run, edit, save, or clear previously entered commands
4513
4514        :return: True if running of commands should stop
4515        """
4516        self.last_result = False
4517
4518        # -v must be used alone with no other options
4519        if args.verbose:
4520            if args.clear or args.edit or args.output_file or args.run or args.transcript or args.expanded or args.script:
4521                self.poutput("-v cannot be used with any other options")
4522                self.poutput(self.history_parser.format_usage())
4523                return None
4524
4525        # -s and -x can only be used if none of these options are present: [-c -r -e -o -t]
4526        if (args.script or args.expanded) and (args.clear or args.edit or args.output_file or args.run or args.transcript):
4527            self.poutput("-s and -x cannot be used with -c, -r, -e, -o, or -t")
4528            self.poutput(self.history_parser.format_usage())
4529            return None
4530
4531        if args.clear:
4532            self.last_result = True
4533
4534            # Clear command and readline history
4535            self.history.clear()
4536
4537            if self.persistent_history_file:
4538                try:
4539                    os.remove(self.persistent_history_file)
4540                except FileNotFoundError:
4541                    pass
4542                except OSError as ex:
4543                    self.perror(f"Error removing history file '{self.persistent_history_file}': {ex}")
4544                    self.last_result = False
4545                    return None
4546
4547            if rl_type != RlType.NONE:
4548                readline.clear_history()
4549            return None
4550
4551        # If an argument was supplied, then retrieve partial contents of the history, otherwise retrieve it all
4552        history = self._get_history(args)
4553
4554        if args.run:
4555            if not args.arg:
4556                self.perror("Cowardly refusing to run all previously entered commands.")
4557                self.perror("If this is what you want to do, specify '1:' as the range of history.")
4558            else:
4559                stop = self.runcmds_plus_hooks(list(history.values()))
4560                self.last_result = True
4561                return stop
4562        elif args.edit:
4563            import tempfile
4564
4565            fd, fname = tempfile.mkstemp(suffix='.txt', text=True)
4566            fobj: TextIO
4567            with os.fdopen(fd, 'w') as fobj:
4568                for command in history.values():
4569                    if command.statement.multiline_command:
4570                        fobj.write(f'{command.expanded}\n')
4571                    else:
4572                        fobj.write(f'{command.raw}\n')
4573            try:
4574                self.run_editor(fname)
4575
4576                # self.last_resort will be set by do_run_script()
4577                # noinspection PyTypeChecker
4578                return self.do_run_script(utils.quote_string(fname))
4579            finally:
4580                os.remove(fname)
4581        elif args.output_file:
4582            full_path = os.path.abspath(os.path.expanduser(args.output_file))
4583            try:
4584                with open(full_path, 'w') as fobj:
4585                    for item in history.values():
4586                        if item.statement.multiline_command:
4587                            fobj.write(f"{item.expanded}\n")
4588                        else:
4589                            fobj.write(f"{item.raw}\n")
4590                plural = '' if len(history) == 1 else 's'
4591            except OSError as ex:
4592                self.perror(f"Error saving history file '{full_path}': {ex}")
4593            else:
4594                self.pfeedback(f"{len(history)} command{plural} saved to {full_path}")
4595                self.last_result = True
4596        elif args.transcript:
4597            # self.last_resort will be set by _generate_transcript()
4598            self._generate_transcript(list(history.values()), args.transcript)
4599        else:
4600            # Display the history items retrieved
4601            for idx, hi in history.items():
4602                self.poutput(hi.pr(idx, script=args.script, expanded=args.expanded, verbose=args.verbose))
4603            self.last_result = history
4604        return None
4605
4606    def _get_history(self, args: argparse.Namespace) -> 'OrderedDict[int, HistoryItem]':
4607        """If an argument was supplied, then retrieve partial contents of the history; otherwise retrieve entire history.
4608
4609        This function returns a dictionary with history items keyed by their 1-based index in ascending order.
4610        """
4611        if args.arg:
4612            try:
4613                int_arg = int(args.arg)
4614                return OrderedDict({int_arg: self.history.get(int_arg)})
4615            except ValueError:
4616                pass
4617
4618            if '..' in args.arg or ':' in args.arg:
4619                # Get a slice of history
4620                history = self.history.span(args.arg, args.all)
4621            elif args.arg.startswith(r'/') and args.arg.endswith(r'/'):
4622                history = self.history.regex_search(args.arg, args.all)
4623            else:
4624                history = self.history.str_search(args.arg, args.all)
4625        else:
4626            # Get a copy of the history so it doesn't get mutated while we are using it
4627            history = self.history.span(':', args.all)
4628        return history
4629
4630    def _initialize_history(self, hist_file: str) -> None:
4631        """Initialize history using history related attributes
4632
4633        :param hist_file: optional path to persistent history file. If specified, then history from
4634                          previous sessions will be included. Additionally, all history will be written
4635                          to this file when the application exits.
4636        """
4637        import json
4638        import lzma
4639
4640        self.history = History()
4641        # with no persistent history, nothing else in this method is relevant
4642        if not hist_file:
4643            self.persistent_history_file = hist_file
4644            return
4645
4646        hist_file = os.path.abspath(os.path.expanduser(hist_file))
4647
4648        # On Windows, trying to open a directory throws a permission
4649        # error, not a `IsADirectoryError`. So we'll check it ourselves.
4650        if os.path.isdir(hist_file):
4651            self.perror(f"Persistent history file '{hist_file}' is a directory")
4652            return
4653
4654        # Create the directory for the history file if it doesn't already exist
4655        hist_file_dir = os.path.dirname(hist_file)
4656        try:
4657            os.makedirs(hist_file_dir, exist_ok=True)
4658        except OSError as ex:
4659            self.perror(f"Error creating persistent history file directory '{hist_file_dir}': {ex}")
4660            return
4661
4662        # Read and process history file
4663        try:
4664            with open(hist_file, 'rb') as fobj:
4665                compressed_bytes = fobj.read()
4666            history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8')
4667            self.history = History.from_json(history_json)
4668        except FileNotFoundError:
4669            # Just use an empty history
4670            pass
4671        except OSError as ex:
4672            self.perror(f"Cannot read persistent history file '{hist_file}': {ex}")
4673            return
4674        except (json.JSONDecodeError, lzma.LZMAError, KeyError, UnicodeDecodeError, ValueError) as ex:
4675            self.perror(
4676                f"Error processing persistent history file '{hist_file}': {ex}\n"
4677                f"The history file will be recreated when this application exits."
4678            )
4679
4680        self.history.start_session()
4681        self.persistent_history_file = hist_file
4682
4683        # populate readline history
4684        if rl_type != RlType.NONE:
4685            last = None
4686            for item in self.history:
4687                # Break the command into its individual lines
4688                for line in item.raw.splitlines():
4689                    # readline only adds a single entry for multiple sequential identical lines
4690                    # so we emulate that behavior here
4691                    if line != last:
4692                        readline.add_history(line)
4693                        last = line
4694
4695        # register a function to write history at save
4696        # if the history file is in plain text format from 0.9.12 or lower
4697        # this will fail, and the history in the plain text file will be lost
4698        import atexit
4699
4700        atexit.register(self._persist_history)
4701
4702    def _persist_history(self) -> None:
4703        """Write history out to the persistent history file as compressed JSON"""
4704        import lzma
4705
4706        if not self.persistent_history_file:
4707            return
4708
4709        self.history.truncate(self._persistent_history_length)
4710        try:
4711            history_json = self.history.to_json()
4712            compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8'))
4713
4714            with open(self.persistent_history_file, 'wb') as fobj:
4715                fobj.write(compressed_bytes)
4716        except OSError as ex:
4717            self.perror(f"Cannot write persistent history file '{self.persistent_history_file}': {ex}")
4718
4719    def _generate_transcript(self, history: Union[List[HistoryItem], List[str]], transcript_file: str) -> None:
4720        """Generate a transcript file from a given history of commands"""
4721        self.last_result = False
4722
4723        # Validate the transcript file path to make sure directory exists and write access is available
4724        transcript_path = os.path.abspath(os.path.expanduser(transcript_file))
4725        transcript_dir = os.path.dirname(transcript_path)
4726        if not os.path.isdir(transcript_dir) or not os.access(transcript_dir, os.W_OK):
4727            self.perror(f"'{transcript_dir}' is not a directory or you don't have write access")
4728            return
4729
4730        commands_run = 0
4731        try:
4732            with self.sigint_protection:
4733                # Disable echo while we manually redirect stdout to a StringIO buffer
4734                saved_echo = self.echo
4735                saved_stdout = self.stdout
4736                self.echo = False
4737
4738            # The problem with supporting regular expressions in transcripts
4739            # is that they shouldn't be processed in the command, just the output.
4740            # In addition, when we generate a transcript, any slashes in the output
4741            # are not really intended to indicate regular expressions, so they should
4742            # be escaped.
4743            #
4744            # We have to jump through some hoops here in order to catch the commands
4745            # separately from the output and escape the slashes in the output.
4746            transcript = ''
4747            for history_item in history:
4748                # build the command, complete with prompts. When we replay
4749                # the transcript, we look for the prompts to separate
4750                # the command from the output
4751                first = True
4752                command = ''
4753                if isinstance(history_item, HistoryItem):
4754                    history_item = history_item.raw
4755                for line in history_item.splitlines():
4756                    if first:
4757                        command += f"{self.prompt}{line}\n"
4758                        first = False
4759                    else:
4760                        command += f"{self.continuation_prompt}{line}\n"
4761                transcript += command
4762
4763                # Use a StdSim object to capture output
4764                stdsim = utils.StdSim(cast(TextIO, self.stdout))
4765                self.stdout = cast(TextIO, stdsim)
4766
4767                # then run the command and let the output go into our buffer
4768                try:
4769                    stop = self.onecmd_plus_hooks(history_item, raise_keyboard_interrupt=True)
4770                except KeyboardInterrupt as ex:
4771                    self.perror(ex)
4772                    stop = True
4773
4774                commands_run += 1
4775
4776                # add the regex-escaped output to the transcript
4777                transcript += stdsim.getvalue().replace('/', r'\/')
4778
4779                # check if we are supposed to stop
4780                if stop:
4781                    break
4782        finally:
4783            with self.sigint_protection:
4784                # Restore altered attributes to their original state
4785                self.echo = saved_echo
4786                self.stdout = cast(TextIO, saved_stdout)
4787
4788        # Check if all commands ran
4789        if commands_run < len(history):
4790            self.pwarning(f"Command {commands_run} triggered a stop and ended transcript generation early")
4791
4792        # finally, we can write the transcript out to the file
4793        try:
4794            with open(transcript_path, 'w') as fout:
4795                fout.write(transcript)
4796        except OSError as ex:
4797            self.perror(f"Error saving transcript file '{transcript_path}': {ex}")
4798        else:
4799            # and let the user know what we did
4800            if commands_run == 1:
4801                plural = 'command and its output'
4802            else:
4803                plural = 'commands and their outputs'
4804            self.pfeedback(f"{commands_run} {plural} saved to transcript file '{transcript_path}'")
4805            self.last_result = True
4806
4807    edit_description = (
4808        "Run a text editor and optionally open a file with it\n"
4809        "\n"
4810        "The editor used is determined by a settable parameter. To set it:\n"
4811        "\n"
4812        "  set editor (program-name)"
4813    )
4814
4815    edit_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=edit_description)
4816    edit_parser.add_argument(
4817        'file_path', nargs=argparse.OPTIONAL, help="optional path to a file to open in editor", completer=path_complete
4818    )
4819
4820    @with_argparser(edit_parser)
4821    def do_edit(self, args: argparse.Namespace) -> None:
4822        """Run a text editor and optionally open a file with it"""
4823
4824        # self.last_result will be set by do_shell() which is called by run_editor()
4825        self.run_editor(args.file_path)
4826
4827    def run_editor(self, file_path: Optional[str] = None) -> None:
4828        """
4829        Run a text editor and optionally open a file with it
4830
4831        :param file_path: optional path of the file to edit. Defaults to None.
4832        :raises: EnvironmentError if self.editor is not set
4833        """
4834        if not self.editor:
4835            raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.")
4836
4837        command = utils.quote_string(os.path.expanduser(self.editor))
4838        if file_path:
4839            command += " " + utils.quote_string(os.path.expanduser(file_path))
4840
4841        # noinspection PyTypeChecker
4842        self.do_shell(command)
4843
4844    @property
4845    def _current_script_dir(self) -> Optional[str]:
4846        """Accessor to get the current script directory from the _script_dir LIFO queue."""
4847        if self._script_dir:
4848            return self._script_dir[-1]
4849        else:
4850            return None
4851
4852    run_script_description = (
4853        "Run commands in script file that is encoded as either ASCII or UTF-8 text\n"
4854        "\n"
4855        "Script should contain one command per line, just like the command would be\n"
4856        "typed in the console.\n"
4857        "\n"
4858        "If the -t/--transcript flag is used, this command instead records\n"
4859        "the output of the script commands to a transcript for testing purposes.\n"
4860    )
4861
4862    run_script_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=run_script_description)
4863    run_script_parser.add_argument(
4864        '-t',
4865        '--transcript',
4866        metavar='TRANSCRIPT_FILE',
4867        help='record the output of the script as a transcript file',
4868        completer=path_complete,
4869    )
4870    run_script_parser.add_argument('script_path', help="path to the script file", completer=path_complete)
4871
4872    @with_argparser(run_script_parser)
4873    def do_run_script(self, args: argparse.Namespace) -> Optional[bool]:
4874        """Run commands in script file that is encoded as either ASCII or UTF-8 text.
4875
4876        :return: True if running of commands should stop
4877        """
4878        self.last_result = False
4879        expanded_path = os.path.abspath(os.path.expanduser(args.script_path))
4880
4881        # Add some protection against accidentally running a Python file. The happens when users
4882        # mix up run_script and run_pyscript.
4883        if expanded_path.endswith('.py'):
4884            self.pwarning(f"'{expanded_path}' appears to be a Python file")
4885            selection = self.select('Yes No', 'Continue to try to run it as a text script? ')
4886            if selection != 'Yes':
4887                return None
4888
4889        try:
4890            # An empty file is not an error, so just return
4891            if os.path.getsize(expanded_path) == 0:
4892                self.last_result = True
4893                return None
4894
4895            # Make sure the file is ASCII or UTF-8 encoded text
4896            if not utils.is_text_file(expanded_path):
4897                self.perror(f"'{expanded_path}' is not an ASCII or UTF-8 encoded text file")
4898                return None
4899
4900            # Read all lines of the script
4901            with open(expanded_path, encoding='utf-8') as target:
4902                script_commands = target.read().splitlines()
4903        except OSError as ex:
4904            self.perror(f"Problem accessing script from '{expanded_path}': {ex}")
4905            return None
4906
4907        orig_script_dir_count = len(self._script_dir)
4908
4909        try:
4910            self._script_dir.append(os.path.dirname(expanded_path))
4911
4912            if args.transcript:
4913                # self.last_resort will be set by _generate_transcript()
4914                self._generate_transcript(script_commands, os.path.expanduser(args.transcript))
4915            else:
4916                stop = self.runcmds_plus_hooks(script_commands, stop_on_keyboard_interrupt=True)
4917                self.last_result = True
4918                return stop
4919
4920        finally:
4921            with self.sigint_protection:
4922                # Check if a script dir was added before an exception occurred
4923                if orig_script_dir_count != len(self._script_dir):
4924                    self._script_dir.pop()
4925        return None
4926
4927    relative_run_script_description = run_script_description
4928    relative_run_script_description += (
4929        "\n\n"
4930        "If this is called from within an already-running script, the filename will be\n"
4931        "interpreted relative to the already-running script's directory."
4932    )
4933
4934    relative_run_script_epilog = "Notes:\n" "  This command is intended to only be used within text file scripts."
4935
4936    relative_run_script_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(
4937        description=relative_run_script_description, epilog=relative_run_script_epilog
4938    )
4939    relative_run_script_parser.add_argument('file_path', help='a file path pointing to a script')
4940
4941    @with_argparser(relative_run_script_parser)
4942    def do__relative_run_script(self, args: argparse.Namespace) -> Optional[bool]:
4943        """
4944        Run commands in script file that is encoded as either ASCII or UTF-8 text
4945
4946        :return: True if running of commands should stop
4947        """
4948        file_path = args.file_path
4949        # NOTE: Relative path is an absolute path, it is just relative to the current script directory
4950        relative_path = os.path.join(self._current_script_dir or '', file_path)
4951
4952        # self.last_result will be set by do_run_script()
4953        # noinspection PyTypeChecker
4954        return self.do_run_script(utils.quote_string(relative_path))
4955
4956    def _run_transcript_tests(self, transcript_paths: List[str]) -> None:
4957        """Runs transcript tests for provided file(s).
4958
4959        This is called when either -t is provided on the command line or the transcript_files argument is provided
4960        during construction of the cmd2.Cmd instance.
4961
4962        :param transcript_paths: list of transcript test file paths
4963        """
4964        import time
4965        import unittest
4966
4967        import cmd2
4968
4969        from .transcript import (
4970            Cmd2TestCase,
4971        )
4972
4973        class TestMyAppCase(Cmd2TestCase):
4974            cmdapp = self
4975
4976        # Validate that there is at least one transcript file
4977        transcripts_expanded = utils.files_from_glob_patterns(transcript_paths, access=os.R_OK)
4978        if not transcripts_expanded:
4979            self.perror('No test files found - nothing to test')
4980            self.exit_code = 1
4981            return
4982
4983        verinfo = ".".join(map(str, sys.version_info[:3]))
4984        num_transcripts = len(transcripts_expanded)
4985        plural = '' if len(transcripts_expanded) == 1 else 's'
4986        self.poutput(ansi.style(utils.align_center(' cmd2 transcript test ', fill_char='='), bold=True))
4987        self.poutput(f'platform {sys.platform} -- Python {verinfo}, cmd2-{cmd2.__version__}, readline-{rl_type}')
4988        self.poutput(f'cwd: {os.getcwd()}')
4989        self.poutput(f'cmd2 app: {sys.argv[0]}')
4990        self.poutput(ansi.style(f'collected {num_transcripts} transcript{plural}', bold=True))
4991
4992        setattr(self.__class__, 'testfiles', transcripts_expanded)
4993        sys.argv = [sys.argv[0]]  # the --test argument upsets unittest.main()
4994        testcase = TestMyAppCase()
4995        stream = cast(TextIO, utils.StdSim(sys.stderr))
4996        # noinspection PyTypeChecker
4997        runner = unittest.TextTestRunner(stream=stream)
4998        start_time = time.time()
4999        test_results = runner.run(testcase)
5000        execution_time = time.time() - start_time
5001        if test_results.wasSuccessful():
5002            ansi.style_aware_write(sys.stderr, stream.read())
5003            finish_msg = f' {num_transcripts} transcript{plural} passed in {execution_time:.3f} seconds '
5004            finish_msg = ansi.style_success(utils.align_center(finish_msg, fill_char='='))
5005            self.poutput(finish_msg)
5006        else:
5007            # Strip off the initial traceback which isn't particularly useful for end users
5008            error_str = stream.read()
5009            end_of_trace = error_str.find('AssertionError:')
5010            file_offset = error_str[end_of_trace:].find('File ')
5011            start = end_of_trace + file_offset
5012
5013            # But print the transcript file name and line number followed by what was expected and what was observed
5014            self.perror(error_str[start:])
5015
5016            # Return a failure error code to support automated transcript-based testing
5017            self.exit_code = 1
5018
5019    def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None:  # pragma: no cover
5020        """
5021        Display an important message to the user while they are at a command line prompt.
5022        To the user it appears as if an alert message is printed above the prompt and their current input
5023        text and cursor location is left alone.
5024
5025        IMPORTANT: This function will not print an alert unless it can acquire self.terminal_lock to ensure
5026                   a prompt is onscreen. Therefore it is best to acquire the lock before calling this function
5027                   to guarantee the alert prints and to avoid raising a RuntimeError.
5028
5029        :param alert_msg: the message to display to the user
5030        :param new_prompt: If you also want to change the prompt that is displayed, then include it here.
5031                           See async_update_prompt() docstring for guidance on updating a prompt.
5032        :raises RuntimeError: if called while another thread holds `terminal_lock`
5033        """
5034        if not (vt100_support and self.use_rawinput):
5035            return
5036
5037        # Sanity check that can't fail if self.terminal_lock was acquired before calling this function
5038        if self.terminal_lock.acquire(blocking=False):
5039
5040            # Windows terminals tend to flicker when we redraw the prompt and input lines.
5041            # To reduce how often this occurs, only update terminal if there are changes.
5042            update_terminal = False
5043
5044            if alert_msg:
5045                alert_msg += '\n'
5046                update_terminal = True
5047
5048            if new_prompt is not None:
5049                self.prompt = new_prompt
5050
5051            # Check if the prompt to display has changed from what's currently displayed
5052            cur_onscreen_prompt = rl_get_prompt()
5053            new_onscreen_prompt = self.continuation_prompt if self._at_continuation_prompt else self.prompt
5054
5055            if new_onscreen_prompt != cur_onscreen_prompt:
5056                update_terminal = True
5057
5058            if update_terminal:
5059                import shutil
5060
5061                # Generate the string which will replace the current prompt and input lines with the alert
5062                terminal_str = ansi.async_alert_str(
5063                    terminal_columns=shutil.get_terminal_size().columns,
5064                    prompt=cur_onscreen_prompt,
5065                    line=readline.get_line_buffer(),
5066                    cursor_offset=rl_get_point(),
5067                    alert_msg=alert_msg,
5068                )
5069                if rl_type == RlType.GNU:
5070                    sys.stderr.write(terminal_str)
5071                    sys.stderr.flush()
5072                elif rl_type == RlType.PYREADLINE:
5073                    # noinspection PyUnresolvedReferences
5074                    readline.rl.mode.console.write(terminal_str)
5075
5076                # Update Readline's prompt before we redraw it
5077                rl_set_prompt(new_onscreen_prompt)
5078
5079                # Redraw the prompt and input lines below the alert
5080                rl_force_redisplay()
5081
5082            self.terminal_lock.release()
5083
5084        else:
5085            raise RuntimeError("another thread holds terminal_lock")
5086
5087    def async_update_prompt(self, new_prompt: str) -> None:  # pragma: no cover
5088        """
5089        Update the command line prompt while the user is still typing at it. This is good for alerting the user to
5090        system changes dynamically in between commands. For instance you could alter the color of the prompt to
5091        indicate a system status or increase a counter to report an event. If you do alter the actual text of the
5092        prompt, it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will
5093        be shifted and the update will not be seamless.
5094
5095        IMPORTANT: This function will not update the prompt unless it can acquire self.terminal_lock to ensure
5096                   a prompt is onscreen. Therefore it is best to acquire the lock before calling this function
5097                   to guarantee the prompt changes and to avoid raising a RuntimeError.
5098
5099                   If user is at a continuation prompt while entering a multiline command, the onscreen prompt will
5100                   not change. However self.prompt will still be updated and display immediately after the multiline
5101                   line command completes.
5102
5103        :param new_prompt: what to change the prompt to
5104        :raises RuntimeError: if called while another thread holds `terminal_lock`
5105        """
5106        self.async_alert('', new_prompt)
5107
5108    @staticmethod
5109    def set_window_title(title: str) -> None:  # pragma: no cover
5110        """
5111        Set the terminal window title.
5112
5113        NOTE: This function writes to stderr. Therefore if you call this during a command run by a pyscript,
5114              the string which updates the title will appear in that command's CommandResult.stderr data.
5115
5116        :param title: the new window title
5117        """
5118        if not vt100_support:
5119            return
5120
5121        try:
5122            sys.stderr.write(ansi.set_title(title))
5123            sys.stderr.flush()
5124        except AttributeError:
5125            # Debugging in Pycharm has issues with setting terminal title
5126            pass
5127
5128    def enable_command(self, command: str) -> None:
5129        """
5130        Enable a command by restoring its functions
5131
5132        :param command: the command being enabled
5133        """
5134        # If the commands is already enabled, then return
5135        if command not in self.disabled_commands:
5136            return
5137
5138        help_func_name = constants.HELP_FUNC_PREFIX + command
5139        completer_func_name = constants.COMPLETER_FUNC_PREFIX + command
5140
5141        # Restore the command function to its original value
5142        dc = self.disabled_commands[command]
5143        setattr(self, self._cmd_func_name(command), dc.command_function)
5144
5145        # Restore the help function to its original value
5146        if dc.help_function is None:
5147            delattr(self, help_func_name)
5148        else:
5149            setattr(self, help_func_name, dc.help_function)
5150
5151        # Restore the completer function to its original value
5152        if dc.completer_function is None:
5153            delattr(self, completer_func_name)
5154        else:
5155            setattr(self, completer_func_name, dc.completer_function)
5156
5157        # Remove the disabled command entry
5158        del self.disabled_commands[command]
5159
5160    def enable_category(self, category: str) -> None:
5161        """
5162        Enable an entire category of commands
5163
5164        :param category: the category to enable
5165        """
5166        for cmd_name in list(self.disabled_commands):
5167            func = self.disabled_commands[cmd_name].command_function
5168            if getattr(func, constants.CMD_ATTR_HELP_CATEGORY, None) == category:
5169                self.enable_command(cmd_name)
5170
5171    def disable_command(self, command: str, message_to_print: str) -> None:
5172        """
5173        Disable a command and overwrite its functions
5174
5175        :param command: the command being disabled
5176        :param message_to_print: what to print when this command is run or help is called on it while disabled
5177
5178                                 The variable cmd2.COMMAND_NAME can be used as a placeholder for the name of the
5179                                 command being disabled.
5180                                 ex: message_to_print = f"{cmd2.COMMAND_NAME} is currently disabled"
5181        """
5182        # If the commands is already disabled, then return
5183        if command in self.disabled_commands:
5184            return
5185
5186        # Make sure this is an actual command
5187        command_function = self.cmd_func(command)
5188        if command_function is None:
5189            raise AttributeError(f"'{command}' does not refer to a command")
5190
5191        help_func_name = constants.HELP_FUNC_PREFIX + command
5192        completer_func_name = constants.COMPLETER_FUNC_PREFIX + command
5193
5194        # Add the disabled command record
5195        self.disabled_commands[command] = DisabledCommand(
5196            command_function=command_function,
5197            help_function=getattr(self, help_func_name, None),
5198            completer_function=getattr(self, completer_func_name, None),
5199        )
5200
5201        # Overwrite the command and help functions to print the message
5202        new_func = functools.partial(
5203            self._report_disabled_command_usage, message_to_print=message_to_print.replace(constants.COMMAND_NAME, command)
5204        )
5205        setattr(self, self._cmd_func_name(command), new_func)
5206        setattr(self, help_func_name, new_func)
5207
5208        # Set the completer to a function that returns a blank list
5209        setattr(self, completer_func_name, lambda *args, **kwargs: [])
5210
5211    def disable_category(self, category: str, message_to_print: str) -> None:
5212        """Disable an entire category of commands.
5213
5214        :param category: the category to disable
5215        :param message_to_print: what to print when anything in this category is run or help is called on it
5216                                 while disabled. The variable cmd2.COMMAND_NAME can be used as a placeholder for the name
5217                                 of the command being disabled.
5218                                 ex: message_to_print = f"{cmd2.COMMAND_NAME} is currently disabled"
5219        """
5220        all_commands = self.get_all_commands()
5221
5222        for cmd_name in all_commands:
5223            func = self.cmd_func(cmd_name)
5224            if getattr(func, constants.CMD_ATTR_HELP_CATEGORY, None) == category:
5225                self.disable_command(cmd_name, message_to_print)
5226
5227    def _report_disabled_command_usage(self, *_args: Any, message_to_print: str, **_kwargs: Any) -> None:
5228        """
5229        Report when a disabled command has been run or had help called on it
5230
5231        :param args: not used
5232        :param message_to_print: the message reporting that the command is disabled
5233        :param kwargs: not used
5234        """
5235        # Set apply_style to False so message_to_print's style is not overridden
5236        self.perror(message_to_print, apply_style=False)
5237
5238    def cmdloop(self, intro: Optional[str] = None) -> int:  # type: ignore[override]
5239        """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2.
5240
5241        _cmdloop() provides the main loop equivalent to cmd.cmdloop().  This is a wrapper around that which deals with
5242        the following extra features provided by cmd2:
5243        - transcript testing
5244        - intro banner
5245        - exit code
5246
5247        :param intro: if provided this overrides self.intro and serves as the intro banner printed once at start
5248        """
5249        # cmdloop() expects to be run in the main thread to support extensive use of KeyboardInterrupts throughout the
5250        # other built-in functions. You are free to override cmdloop, but much of cmd2's features will be limited.
5251        if not threading.current_thread() is threading.main_thread():
5252            raise RuntimeError("cmdloop must be run in the main thread")
5253
5254        # Register a SIGINT signal handler for Ctrl+C
5255        import signal
5256
5257        original_sigint_handler = signal.getsignal(signal.SIGINT)
5258        signal.signal(signal.SIGINT, self.sigint_handler)
5259
5260        # Grab terminal lock before the command line prompt has been drawn by readline
5261        self.terminal_lock.acquire()
5262
5263        # Always run the preloop first
5264        for func in self._preloop_hooks:
5265            func()
5266        self.preloop()
5267
5268        # If transcript-based regression testing was requested, then do that instead of the main loop
5269        if self._transcript_files is not None:
5270            self._run_transcript_tests([os.path.expanduser(tf) for tf in self._transcript_files])
5271        else:
5272            # If an intro was supplied in the method call, allow it to override the default
5273            if intro is not None:
5274                self.intro = intro
5275
5276            # Print the intro, if there is one, right after the preloop
5277            if self.intro is not None:
5278                self.poutput(self.intro)
5279
5280            # And then call _cmdloop() to enter the main loop
5281            self._cmdloop()
5282
5283        # Run the postloop() no matter what
5284        for func in self._postloop_hooks:
5285            func()
5286        self.postloop()
5287
5288        # Release terminal lock now that postloop code should have stopped any terminal updater threads
5289        # This will also zero the lock count in case cmdloop() is called again
5290        self.terminal_lock.release()
5291
5292        # Restore the original signal handler
5293        signal.signal(signal.SIGINT, original_sigint_handler)
5294
5295        return self.exit_code
5296
5297    ###
5298    #
5299    # plugin related functions
5300    #
5301    ###
5302    def _initialize_plugin_system(self) -> None:
5303        """Initialize the plugin system"""
5304        self._preloop_hooks: List[Callable[[], None]] = []
5305        self._postloop_hooks: List[Callable[[], None]] = []
5306        self._postparsing_hooks: List[Callable[[plugin.PostparsingData], plugin.PostparsingData]] = []
5307        self._precmd_hooks: List[Callable[[plugin.PrecommandData], plugin.PrecommandData]] = []
5308        self._postcmd_hooks: List[Callable[[plugin.PostcommandData], plugin.PostcommandData]] = []
5309        self._cmdfinalization_hooks: List[Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData]] = []
5310
5311    @classmethod
5312    def _validate_callable_param_count(cls, func: Callable[..., Any], count: int) -> None:
5313        """Ensure a function has the given number of parameters."""
5314        signature = inspect.signature(func)
5315        # validate that the callable has the right number of parameters
5316        nparam = len(signature.parameters)
5317        if nparam != count:
5318            plural = '' if nparam == 1 else 's'
5319            raise TypeError(f'{func.__name__} has {nparam} positional argument{plural}, expected {count}')
5320
5321    @classmethod
5322    def _validate_prepostloop_callable(cls, func: Callable[[], None]) -> None:
5323        """Check parameter and return types for preloop and postloop hooks."""
5324        cls._validate_callable_param_count(func, 0)
5325        # make sure there is no return notation
5326        signature = inspect.signature(func)
5327        if signature.return_annotation is not None:
5328            raise TypeError(f"{func.__name__} must declare return a return type of 'None'")
5329
5330    def register_preloop_hook(self, func: Callable[[], None]) -> None:
5331        """Register a function to be called at the beginning of the command loop."""
5332        self._validate_prepostloop_callable(func)
5333        self._preloop_hooks.append(func)
5334
5335    def register_postloop_hook(self, func: Callable[[], None]) -> None:
5336        """Register a function to be called at the end of the command loop."""
5337        self._validate_prepostloop_callable(func)
5338        self._postloop_hooks.append(func)
5339
5340    @classmethod
5341    def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None:
5342        """Check parameter and return types for postparsing hooks"""
5343        cls._validate_callable_param_count(cast(Callable[..., Any], func), 1)
5344        signature = inspect.signature(func)
5345        _, param = list(signature.parameters.items())[0]
5346        if param.annotation != plugin.PostparsingData:
5347            raise TypeError(f"{func.__name__} must have one parameter declared with type 'cmd2.plugin.PostparsingData'")
5348        if signature.return_annotation != plugin.PostparsingData:
5349            raise TypeError(f"{func.__name__} must declare return a return type of 'cmd2.plugin.PostparsingData'")
5350
5351    def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None:
5352        """Register a function to be called after parsing user input but before running the command"""
5353        self._validate_postparsing_callable(func)
5354        self._postparsing_hooks.append(func)
5355
5356    CommandDataType = TypeVar('CommandDataType')
5357
5358    @classmethod
5359    def _validate_prepostcmd_hook(
5360        cls, func: Callable[[CommandDataType], CommandDataType], data_type: Type[CommandDataType]
5361    ) -> None:
5362        """Check parameter and return types for pre and post command hooks."""
5363        signature = inspect.signature(func)
5364        # validate that the callable has the right number of parameters
5365        cls._validate_callable_param_count(cast(Callable[..., Any], func), 1)
5366        # validate the parameter has the right annotation
5367        paramname = list(signature.parameters.keys())[0]
5368        param = signature.parameters[paramname]
5369        if param.annotation != data_type:
5370            raise TypeError(f'argument 1 of {func.__name__} has incompatible type {param.annotation}, expected {data_type}')
5371        # validate the return value has the right annotation
5372        if signature.return_annotation == signature.empty:
5373            raise TypeError(f'{func.__name__} does not have a declared return type, expected {data_type}')
5374        if signature.return_annotation != data_type:
5375            raise TypeError(
5376                f'{func.__name__} has incompatible return type {signature.return_annotation}, expected ' f'{data_type}'
5377            )
5378
5379    def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None:
5380        """Register a hook to be called before the command function."""
5381        self._validate_prepostcmd_hook(func, plugin.PrecommandData)
5382        self._precmd_hooks.append(func)
5383
5384    def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None:
5385        """Register a hook to be called after the command function."""
5386        self._validate_prepostcmd_hook(func, plugin.PostcommandData)
5387        self._postcmd_hooks.append(func)
5388
5389    @classmethod
5390    def _validate_cmdfinalization_callable(
5391        cls, func: Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData]
5392    ) -> None:
5393        """Check parameter and return types for command finalization hooks."""
5394        cls._validate_callable_param_count(func, 1)
5395        signature = inspect.signature(func)
5396        _, param = list(signature.parameters.items())[0]
5397        if param.annotation != plugin.CommandFinalizationData:
5398            raise TypeError(f"{func.__name__} must have one parameter declared with type {plugin.CommandFinalizationData}")
5399        if signature.return_annotation != plugin.CommandFinalizationData:
5400            raise TypeError("{func.__name__} must declare return a return type of {plugin.CommandFinalizationData}")
5401
5402    def register_cmdfinalization_hook(
5403        self, func: Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData]
5404    ) -> None:
5405        """Register a hook to be called after a command is completed, whether it completes successfully or not."""
5406        self._validate_cmdfinalization_callable(func)
5407        self._cmdfinalization_hooks.append(func)
5408
5409    def _resolve_func_self(
5410        self,
5411        cmd_support_func: Callable[..., Any],
5412        cmd_self: Union[CommandSet, 'Cmd', None],
5413    ) -> Optional[object]:
5414        """
5415        Attempt to resolve a candidate instance to pass as 'self' for an unbound class method that was
5416        used when defining command's argparse object. Since we restrict registration to only a single CommandSet
5417        instance of each type, using type is a reasonably safe way to resolve the correct object instance
5418
5419        :param cmd_support_func: command support function. This could be a completer or namespace provider
5420        :param cmd_self: The `self` associated with the command or subcommand
5421        """
5422        # figure out what class the command support function was defined in
5423        func_class: Optional[Type[Any]] = get_defining_class(cmd_support_func)
5424
5425        # Was there a defining class identified? If so, is it a sub-class of CommandSet?
5426        if func_class is not None and issubclass(func_class, CommandSet):
5427            # Since the support function is provided as an unbound function, we need to locate the instance
5428            # of the CommandSet to pass in as `self` to emulate a bound method call.
5429            # We're searching for candidates that match the support function's defining class type in this order:
5430            #   1. Is the command's CommandSet a sub-class of the support function's class?
5431            #   2. Do any of the registered CommandSets in the Cmd2 application exactly match the type?
5432            #   3. Is there a registered CommandSet that is is the only matching subclass?
5433
5434            func_self: Optional[Union[CommandSet, 'Cmd']]
5435
5436            # check if the command's CommandSet is a sub-class of the support function's defining class
5437            if isinstance(cmd_self, func_class):
5438                # Case 1: Command's CommandSet is a sub-class of the support function's CommandSet
5439                func_self = cmd_self
5440            else:
5441                # Search all registered CommandSets
5442                func_self = None
5443                candidate_sets: List[CommandSet] = []
5444                for installed_cmd_set in self._installed_command_sets:
5445                    if type(installed_cmd_set) == func_class:
5446                        # Case 2: CommandSet is an exact type match for the function's CommandSet
5447                        func_self = installed_cmd_set
5448                        break
5449
5450                    # Add candidate for Case 3:
5451                    if isinstance(installed_cmd_set, func_class):
5452                        candidate_sets.append(installed_cmd_set)
5453                if func_self is None and len(candidate_sets) == 1:
5454                    # Case 3: There exists exactly 1 CommandSet that is a sub-class match of the function's CommandSet
5455                    func_self = candidate_sets[0]
5456            return func_self
5457        else:
5458            return self
5459