1# coding=utf-8 2"""Variant on standard library's cmd with extra features. 3 4To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you 5were using the standard library's cmd, while enjoying the extra features. 6 7Searchable command history (commands: "history") 8Run commands from file, save to file, edit commands in file 9Multi-line commands 10Special-character shortcut commands (beyond cmd's "?" and "!") 11Settable environment parameters 12Parsing commands with `argparse` argument parsers (flags) 13Redirection to file or paste buffer (clipboard) with > or >> 14Easy transcript-based testing of applications (see examples/example.py) 15Bash-style ``select`` available 16 17Note that redirection with > and | will only work if `self.poutput()` 18is used in place of `print`. 19 20- Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com 21 22Git repository on GitHub at https://github.com/python-cmd2/cmd2 23""" 24# This module has many imports, quite a few of which are only 25# infrequently utilized. To reduce the initial overhead of 26# import this module, many of these imports are lazy-loaded 27# i.e. we only import the module when we use it 28# For example, we don't import the 'traceback' module 29# until the pexcept() function is called and the debug 30# setting is True 31import argparse 32import cmd 33import functools 34import glob 35import inspect 36import os 37import pydoc 38import re 39import sys 40import threading 41from code import ( 42 InteractiveConsole, 43) 44from collections import ( 45 OrderedDict, 46 namedtuple, 47) 48from contextlib import ( 49 redirect_stdout, 50) 51from types import ( 52 FrameType, 53 ModuleType, 54) 55from typing import ( 56 Any, 57 Callable, 58 Dict, 59 Iterable, 60 List, 61 Mapping, 62 Optional, 63 Set, 64 TextIO, 65 Tuple, 66 Type, 67 TypeVar, 68 Union, 69 cast, 70) 71 72from . import ( 73 ansi, 74 argparse_completer, 75 argparse_custom, 76 constants, 77 plugin, 78 utils, 79) 80from .argparse_custom import ( 81 ChoicesProviderFunc, 82 CompleterFunc, 83 CompletionItem, 84) 85from .clipboard import ( 86 can_clip, 87 get_paste_buffer, 88 write_to_paste_buffer, 89) 90from .command_definition import ( 91 CommandFunc, 92 CommandSet, 93) 94from .constants import ( 95 CLASS_ATTR_DEFAULT_HELP_CATEGORY, 96 COMMAND_FUNC_PREFIX, 97 COMPLETER_FUNC_PREFIX, 98 HELP_FUNC_PREFIX, 99) 100from .decorators import ( 101 as_subcommand_to, 102 with_argparser, 103) 104from .exceptions import ( 105 Cmd2ShlexError, 106 CommandSetRegistrationError, 107 CompletionError, 108 EmbeddedConsoleExit, 109 EmptyStatement, 110 PassThroughException, 111 RedirectionError, 112 SkipPostcommandHooks, 113) 114from .history import ( 115 History, 116 HistoryItem, 117) 118from .parsing import ( 119 Macro, 120 MacroArg, 121 Statement, 122 StatementParser, 123 shlex_split, 124) 125from .rl_utils import ( 126 RlType, 127 rl_escape_prompt, 128 rl_get_point, 129 rl_get_prompt, 130 rl_set_prompt, 131 rl_type, 132 rl_warning, 133 vt100_support, 134) 135from .table_creator import ( 136 Column, 137 SimpleTable, 138) 139from .utils import ( 140 Settable, 141 get_defining_class, 142) 143 144# Set up readline 145if rl_type == RlType.NONE: # pragma: no cover 146 sys.stderr.write(ansi.style_warning(rl_warning)) 147else: 148 from .rl_utils import ( # type: ignore[attr-defined] 149 readline, 150 rl_force_redisplay, 151 ) 152 153 # Used by rlcompleter in Python console loaded by py command 154 orig_rl_delims = readline.get_completer_delims() 155 156 if rl_type == RlType.PYREADLINE: 157 158 # Save the original pyreadline display completion function since we need to override it and restore it 159 # noinspection PyProtectedMember,PyUnresolvedReferences 160 orig_pyreadline_display = readline.rl.mode._display_completions 161 162 elif rl_type == RlType.GNU: 163 164 # Get the readline lib so we can make changes to it 165 import ctypes 166 167 from .rl_utils import ( 168 readline_lib, 169 ) 170 171 rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters") 172 orig_rl_basic_quotes = cast(bytes, ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value) 173 174 175class _SavedReadlineSettings: 176 """readline settings that are backed up when switching between readline environments""" 177 178 def __init__(self) -> None: 179 self.completer = None 180 self.delims = '' 181 self.basic_quotes: Optional[bytes] = None 182 183 184class _SavedCmd2Env: 185 """cmd2 environment settings that are backed up when entering an interactive Python shell""" 186 187 def __init__(self) -> None: 188 self.readline_settings = _SavedReadlineSettings() 189 self.readline_module: Optional[ModuleType] = None 190 self.history: List[str] = [] 191 self.sys_stdout: Optional[TextIO] = None 192 self.sys_stdin: Optional[TextIO] = None 193 194 195# Contains data about a disabled command which is used to restore its original functions when the command is enabled 196DisabledCommand = namedtuple('DisabledCommand', ['command_function', 'help_function', 'completer_function']) 197 198 199class Cmd(cmd.Cmd): 200 """An easy but powerful framework for writing line-oriented command interpreters. 201 202 Extends the Python Standard Library’s cmd package by adding a lot of useful features 203 to the out of the box configuration. 204 205 Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes. 206 """ 207 208 DEFAULT_EDITOR = utils.find_editor() 209 210 INTERNAL_COMMAND_EPILOG = ( 211 "Notes:\n" " This command is for internal use and is not intended to be called from the\n" " command line." 212 ) 213 214 # Sorting keys for strings 215 ALPHABETICAL_SORT_KEY = utils.norm_fold 216 NATURAL_SORT_KEY = utils.natural_keys 217 218 def __init__( 219 self, 220 completekey: str = 'tab', 221 stdin: Optional[TextIO] = None, 222 stdout: Optional[TextIO] = None, 223 *, 224 persistent_history_file: str = '', 225 persistent_history_length: int = 1000, 226 startup_script: str = '', 227 silence_startup_script: bool = False, 228 include_py: bool = False, 229 include_ipy: bool = False, 230 allow_cli_args: bool = True, 231 transcript_files: Optional[List[str]] = None, 232 allow_redirection: bool = True, 233 multiline_commands: Optional[List[str]] = None, 234 terminators: Optional[List[str]] = None, 235 shortcuts: Optional[Dict[str, str]] = None, 236 command_sets: Optional[Iterable[CommandSet]] = None, 237 auto_load_commands: bool = True, 238 ) -> None: 239 """An easy but powerful framework for writing line-oriented command 240 interpreters. Extends Python's cmd package. 241 242 :param completekey: readline name of a completion key, default to Tab 243 :param stdin: alternate input file object, if not specified, sys.stdin is used 244 :param stdout: alternate output file object, if not specified, sys.stdout is used 245 :param persistent_history_file: file path to load a persistent cmd2 command history from 246 :param persistent_history_length: max number of history items to write 247 to the persistent history file 248 :param startup_script: file path to a script to execute at startup 249 :param silence_startup_script: if ``True``, then the startup script's output will be 250 suppressed. Anything written to stderr will still display. 251 :param include_py: should the "py" command be included for an embedded Python shell 252 :param include_ipy: should the "ipy" command be included for an embedded IPython shell 253 :param allow_cli_args: if ``True``, then :meth:`cmd2.Cmd.__init__` will process command 254 line arguments as either commands to be run or, if ``-t`` or 255 ``--test`` are given, transcript files to run. This should be 256 set to ``False`` if your application parses its own command line 257 arguments. 258 :param transcript_files: pass a list of transcript files to be run on initialization. 259 This allows running transcript tests when ``allow_cli_args`` 260 is ``False``. If ``allow_cli_args`` is ``True`` this parameter 261 is ignored. 262 :param allow_redirection: If ``False``, prevent output redirection and piping to shell 263 commands. This parameter prevents redirection and piping, but 264 does not alter parsing behavior. A user can still type 265 redirection and piping tokens, and they will be parsed as such 266 but they won't do anything. 267 :param multiline_commands: list of commands allowed to accept multi-line input 268 :param terminators: list of characters that terminate a command. These are mainly 269 intended for terminating multiline commands, but will also 270 terminate single-line commands. If not supplied, the default 271 is a semicolon. If your app only contains single-line commands 272 and you want terminators to be treated as literals by the parser, 273 then set this to an empty list. 274 :param shortcuts: dictionary containing shortcuts for commands. If not supplied, 275 then defaults to constants.DEFAULT_SHORTCUTS. If you do not want 276 any shortcuts, pass an empty dictionary. 277 :param command_sets: Provide CommandSet instances to load during cmd2 initialization. 278 This allows CommandSets with custom constructor parameters to be 279 loaded. This also allows the a set of CommandSets to be provided 280 when `auto_load_commands` is set to False 281 :param auto_load_commands: If True, cmd2 will check for all subclasses of `CommandSet` 282 that are currently loaded by Python and automatically 283 instantiate and register all commands. If False, CommandSets 284 must be manually installed with `register_command_set`. 285 """ 286 # Check if py or ipy need to be disabled in this instance 287 if not include_py: 288 setattr(self, 'do_py', None) 289 if not include_ipy: 290 setattr(self, 'do_ipy', None) 291 292 # initialize plugin system 293 # needs to be done before we call __init__(0) 294 self._initialize_plugin_system() 295 296 # Call super class constructor 297 super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) 298 299 # Attributes which should NOT be dynamically settable via the set command at runtime 300 self.default_to_shell = False # Attempt to run unrecognized commands as shell commands 301 self.allow_redirection = allow_redirection # Security setting to prevent redirection of stdout 302 303 # Attributes which ARE dynamically settable via the set command at runtime 304 self.always_show_hint = False 305 self.debug = False 306 self.echo = False 307 self.editor = Cmd.DEFAULT_EDITOR 308 self.feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing) 309 self.quiet = False # Do not suppress nonessential output 310 self.timing = False # Prints elapsed time for each command 311 312 # The maximum number of CompletionItems to display during tab completion. If the number of completion 313 # suggestions exceeds this number, they will be displayed in the typical columnized format and will 314 # not include the description value of the CompletionItems. 315 self.max_completion_items = 50 316 317 # A dictionary mapping settable names to their Settable instance 318 self._settables: Dict[str, Settable] = dict() 319 self._always_prefix_settables: bool = False 320 321 # CommandSet containers 322 self._installed_command_sets: Set[CommandSet] = set() 323 self._cmd_to_command_sets: Dict[str, CommandSet] = {} 324 325 self.build_settables() 326 327 # Use as prompt for multiline commands on the 2nd+ line of input 328 self.continuation_prompt = '> ' 329 330 # Allow access to your application in embedded Python shells and scripts py via self 331 self.self_in_py = False 332 333 # Commands to exclude from the help menu and tab completion 334 self.hidden_commands = ['eof', '_relative_run_script'] 335 336 # Initialize history 337 self._persistent_history_length = persistent_history_length 338 self._initialize_history(persistent_history_file) 339 340 # Commands to exclude from the history command 341 self.exclude_from_history = ['eof', 'history'] 342 343 # Dictionary of macro names and their values 344 self.macros: Dict[str, Macro] = dict() 345 346 # Keeps track of typed command history in the Python shell 347 self._py_history: List[str] = [] 348 349 # The name by which Python environments refer to the PyBridge to call app commands 350 self.py_bridge_name = 'app' 351 352 # Defines app-specific variables/functions available in Python shells and pyscripts 353 self.py_locals: Dict[str, Any] = dict() 354 355 # True if running inside a Python shell or pyscript, False otherwise 356 self._in_py = False 357 358 self.statement_parser = StatementParser( 359 terminators=terminators, multiline_commands=multiline_commands, shortcuts=shortcuts 360 ) 361 362 # Stores results from the last command run to enable usage of results in Python shells and pyscripts 363 self.last_result: Any = None 364 365 # Used by run_script command to store current script dir as a LIFO queue to support _relative_run_script command 366 self._script_dir: List[str] = [] 367 368 # Context manager used to protect critical sections in the main thread from stopping due to a KeyboardInterrupt 369 self.sigint_protection = utils.ContextFlag() 370 371 # If the current command created a process to pipe to, then this will be a ProcReader object. 372 # Otherwise it will be None. It's used to know when a pipe process can be killed and/or waited upon. 373 self._cur_pipe_proc_reader: Optional[utils.ProcReader] = None 374 375 # Used to keep track of whether we are redirecting or piping output 376 self._redirecting = False 377 378 # Used to keep track of whether a continuation prompt is being displayed 379 self._at_continuation_prompt = False 380 381 # The multiline command currently being typed which is used to tab complete multiline commands. 382 self._multiline_in_progress = '' 383 384 # Set the header used for the help function's listing of documented functions 385 self.doc_header = "Documented commands (use 'help -v' for verbose/'help <topic>' for details):" 386 387 # The error that prints when no help information can be found 388 self.help_error = "No help on {}" 389 390 # The error that prints when a non-existent command is run 391 self.default_error = "{} is not a recognized command, alias, or macro" 392 393 # If non-empty, this string will be displayed if a broken pipe error occurs 394 self.broken_pipe_warning = '' 395 396 # Commands that will run at the beginning of the command loop 397 self._startup_commands: List[str] = [] 398 399 # If a startup script is provided and exists, then execute it in the startup commands 400 if startup_script: 401 startup_script = os.path.abspath(os.path.expanduser(startup_script)) 402 if os.path.exists(startup_script): 403 script_cmd = f"run_script {utils.quote_string(startup_script)}" 404 if silence_startup_script: 405 script_cmd += f" {constants.REDIRECTION_OUTPUT} {os.devnull}" 406 self._startup_commands.append(script_cmd) 407 408 # Transcript files to run instead of interactive command loop 409 self._transcript_files: Optional[List[str]] = None 410 411 # Check for command line args 412 if allow_cli_args: 413 parser = argparse_custom.DEFAULT_ARGUMENT_PARSER() 414 parser.add_argument('-t', '--test', action="store_true", help='Test against transcript(s) in FILE (wildcards OK)') 415 callopts, callargs = parser.parse_known_args() 416 417 # If transcript testing was called for, use other arguments as transcript files 418 if callopts.test: 419 self._transcript_files = callargs 420 # If commands were supplied at invocation, then add them to the command queue 421 elif callargs: 422 self._startup_commands.extend(callargs) 423 elif transcript_files: 424 self._transcript_files = transcript_files 425 426 # Set the pager(s) for use with the ppaged() method for displaying output using a pager 427 if sys.platform.startswith('win'): 428 self.pager = self.pager_chop = 'more' 429 else: 430 # Here is the meaning of the various flags we are using with the less command: 431 # -S causes lines longer than the screen width to be chopped (truncated) rather than wrapped 432 # -R causes ANSI "style" escape sequences to be output in raw form (i.e. colors are displayed) 433 # -X disables sending the termcap initialization and deinitialization strings to the terminal 434 # -F causes less to automatically exit if the entire file can be displayed on the first screen 435 self.pager = 'less -RXF' 436 self.pager_chop = 'less -SRXF' 437 438 # This boolean flag determines whether or not the cmd2 application can interact with the clipboard 439 self._can_clip = can_clip 440 441 # This determines the value returned by cmdloop() when exiting the application 442 self.exit_code = 0 443 444 # This lock should be acquired before doing any asynchronous changes to the terminal to 445 # ensure the updates to the terminal don't interfere with the input being typed or output 446 # being printed by a command. 447 self.terminal_lock = threading.RLock() 448 449 # Commands that have been disabled from use. This is to support commands that are only available 450 # during specific states of the application. This dictionary's keys are the command names and its 451 # values are DisabledCommand objects. 452 self.disabled_commands: Dict[str, DisabledCommand] = dict() 453 454 # If any command has been categorized, then all other commands that haven't been categorized 455 # will display under this section in the help output. 456 self.default_category = 'Uncategorized' 457 458 # The default key for sorting string results. Its default value performs a case-insensitive alphabetical sort. 459 # If natural sorting is preferred, then set this to NATURAL_SORT_KEY. 460 # cmd2 uses this key for sorting: 461 # command and category names 462 # alias, macro, settable, and shortcut names 463 # tab completion results when self.matches_sorted is False 464 self.default_sort_key = Cmd.ALPHABETICAL_SORT_KEY 465 466 ############################################################################################################ 467 # The following variables are used by tab completion functions. They are reset each time complete() is run 468 # in _reset_completion_defaults() and it is up to completer functions to set them before returning results. 469 ############################################################################################################ 470 471 # If True and a single match is returned to complete(), then a space will be appended 472 # if the match appears at the end of the line 473 self.allow_appended_space = True 474 475 # If True and a single match is returned to complete(), then a closing quote 476 # will be added if there is an unmatched opening quote 477 self.allow_closing_quote = True 478 479 # An optional hint which prints above tab completion suggestions 480 self.completion_hint = '' 481 482 # Normally cmd2 uses readline's formatter to columnize the list of completion suggestions. 483 # If a custom format is preferred, write the formatted completions to this string. cmd2 will 484 # then print it instead of the readline format. ANSI style sequences and newlines are supported 485 # when using this value. Even when using formatted_completions, the full matches must still be returned 486 # from your completer function. ArgparseCompleter writes its tab completion tables to this string. 487 self.formatted_completions = '' 488 489 # Used by complete() for readline tab completion 490 self.completion_matches: List[str] = [] 491 492 # Use this list if you need to display tab completion suggestions that are different than the actual text 493 # of the matches. For instance, if you are completing strings that contain a common delimiter and you only 494 # want to display the final portion of the matches as the tab completion suggestions. The full matches 495 # still must be returned from your completer function. For an example, look at path_complete() which 496 # uses this to show only the basename of paths as the suggestions. delimiter_complete() also populates 497 # this list. These are ignored if self.formatted_completions is populated. 498 self.display_matches: List[str] = [] 499 500 # Used by functions like path_complete() and delimiter_complete() to properly 501 # quote matches that are completed in a delimited fashion 502 self.matches_delimited = False 503 504 # Set to True before returning matches to complete() in cases where matches have already been sorted. 505 # If False, then complete() will sort the matches using self.default_sort_key before they are displayed. 506 # This does not affect self.formatted_completions. 507 self.matches_sorted = False 508 509 ############################################################################################################ 510 # The following code block loads CommandSets, verifies command names, and registers subcommands. 511 # This block should appear after all attributes have been created since the registration code 512 # depends on them and it's possible a module's on_register() method may need to access some. 513 ############################################################################################################ 514 # Load modular commands 515 if command_sets: 516 for command_set in command_sets: 517 self.register_command_set(command_set) 518 519 if auto_load_commands: 520 self._autoload_commands() 521 522 # Verify commands don't have invalid names (like starting with a shortcut) 523 for cur_cmd in self.get_all_commands(): 524 valid, errmsg = self.statement_parser.is_valid_command(cur_cmd) 525 if not valid: 526 raise ValueError(f"Invalid command name '{cur_cmd}': {errmsg}") 527 528 # Add functions decorated to be subcommands 529 self._register_subcommands(self) 530 531 def find_commandsets(self, commandset_type: Type[CommandSet], *, subclass_match: bool = False) -> List[CommandSet]: 532 """ 533 Find all CommandSets that match the provided CommandSet type. 534 By default, locates a CommandSet that is an exact type match but may optionally return all CommandSets that 535 are sub-classes of the provided type 536 :param commandset_type: CommandSet sub-class type to search for 537 :param subclass_match: If True, return all sub-classes of provided type, otherwise only search for exact match 538 :return: Matching CommandSets 539 """ 540 return [ 541 cmdset 542 for cmdset in self._installed_command_sets 543 if type(cmdset) == commandset_type or (subclass_match and isinstance(cmdset, commandset_type)) 544 ] 545 546 def find_commandset_for_command(self, command_name: str) -> Optional[CommandSet]: 547 """ 548 Finds the CommandSet that registered the command name 549 :param command_name: command name to search 550 :return: CommandSet that provided the command 551 """ 552 return self._cmd_to_command_sets.get(command_name) 553 554 def _autoload_commands(self) -> None: 555 """Load modular command definitions.""" 556 # Search for all subclasses of CommandSet, instantiate them if they weren't already provided in the constructor 557 all_commandset_defs = CommandSet.__subclasses__() 558 existing_commandset_types = [type(command_set) for command_set in self._installed_command_sets] 559 560 def load_commandset_by_type(commandset_types: List[Type[CommandSet]]) -> None: 561 for cmdset_type in commandset_types: 562 # check if the type has sub-classes. We will only auto-load leaf class types. 563 subclasses = cmdset_type.__subclasses__() 564 if subclasses: 565 load_commandset_by_type(subclasses) 566 else: 567 init_sig = inspect.signature(cmdset_type.__init__) 568 if not ( 569 cmdset_type in existing_commandset_types 570 or len(init_sig.parameters) != 1 571 or 'self' not in init_sig.parameters 572 ): 573 cmdset = cmdset_type() 574 self.register_command_set(cmdset) 575 576 load_commandset_by_type(all_commandset_defs) 577 578 def register_command_set(self, cmdset: CommandSet) -> None: 579 """ 580 Installs a CommandSet, loading all commands defined in the CommandSet 581 582 :param cmdset: CommandSet to load 583 """ 584 existing_commandset_types = [type(command_set) for command_set in self._installed_command_sets] 585 if type(cmdset) in existing_commandset_types: 586 raise CommandSetRegistrationError('CommandSet ' + type(cmdset).__name__ + ' is already installed') 587 588 all_settables = self.settables 589 if self.always_prefix_settables: 590 if not cmdset.settable_prefix.strip(): 591 raise CommandSetRegistrationError('CommandSet settable prefix must not be empty') 592 for key in cmdset.settables.keys(): 593 prefixed_name = f'{cmdset.settable_prefix}.{key}' 594 if prefixed_name in all_settables: 595 raise CommandSetRegistrationError(f'Duplicate settable: {key}') 596 597 else: 598 for key in cmdset.settables.keys(): 599 if key in all_settables: 600 raise CommandSetRegistrationError(f'Duplicate settable {key} is already registered') 601 602 cmdset.on_register(self) 603 methods = inspect.getmembers( 604 cmdset, 605 predicate=lambda meth: isinstance(meth, Callable) # type: ignore[arg-type] 606 and hasattr(meth, '__name__') 607 and meth.__name__.startswith(COMMAND_FUNC_PREFIX), 608 ) 609 610 default_category = getattr(cmdset, CLASS_ATTR_DEFAULT_HELP_CATEGORY, None) 611 612 installed_attributes = [] 613 try: 614 for method_name, method in methods: 615 command = method_name[len(COMMAND_FUNC_PREFIX) :] 616 617 self._install_command_function(command, method, type(cmdset).__name__) 618 installed_attributes.append(method_name) 619 620 completer_func_name = COMPLETER_FUNC_PREFIX + command 621 cmd_completer = getattr(cmdset, completer_func_name, None) 622 if cmd_completer is not None: 623 self._install_completer_function(command, cmd_completer) 624 installed_attributes.append(completer_func_name) 625 626 help_func_name = HELP_FUNC_PREFIX + command 627 cmd_help = getattr(cmdset, help_func_name, None) 628 if cmd_help is not None: 629 self._install_help_function(command, cmd_help) 630 installed_attributes.append(help_func_name) 631 632 self._cmd_to_command_sets[command] = cmdset 633 634 if default_category and not hasattr(method, constants.CMD_ATTR_HELP_CATEGORY): 635 utils.categorize(method, default_category) 636 637 self._installed_command_sets.add(cmdset) 638 639 self._register_subcommands(cmdset) 640 cmdset.on_registered() 641 except Exception: 642 cmdset.on_unregister() 643 for attrib in installed_attributes: 644 delattr(self, attrib) 645 if cmdset in self._installed_command_sets: 646 self._installed_command_sets.remove(cmdset) 647 if cmdset in self._cmd_to_command_sets.values(): 648 self._cmd_to_command_sets = {key: val for key, val in self._cmd_to_command_sets.items() if val is not cmdset} 649 cmdset.on_unregistered() 650 raise 651 652 def _install_command_function(self, command: str, command_wrapper: Callable[..., Any], context: str = '') -> None: 653 cmd_func_name = COMMAND_FUNC_PREFIX + command 654 655 # Make sure command function doesn't share name with existing attribute 656 if hasattr(self, cmd_func_name): 657 raise CommandSetRegistrationError(f'Attribute already exists: {cmd_func_name} ({context})') 658 659 # Check if command has an invalid name 660 valid, errmsg = self.statement_parser.is_valid_command(command) 661 if not valid: 662 raise CommandSetRegistrationError(f"Invalid command name '{command}': {errmsg}") 663 664 # Check if command shares a name with an alias 665 if command in self.aliases: 666 self.pwarning(f"Deleting alias '{command}' because it shares its name with a new command") 667 del self.aliases[command] 668 669 # Check if command shares a name with a macro 670 if command in self.macros: 671 self.pwarning(f"Deleting macro '{command}' because it shares its name with a new command") 672 del self.macros[command] 673 674 setattr(self, cmd_func_name, command_wrapper) 675 676 def _install_completer_function(self, cmd_name: str, cmd_completer: CompleterFunc) -> None: 677 completer_func_name = COMPLETER_FUNC_PREFIX + cmd_name 678 679 if hasattr(self, completer_func_name): 680 raise CommandSetRegistrationError(f'Attribute already exists: {completer_func_name}') 681 setattr(self, completer_func_name, cmd_completer) 682 683 def _install_help_function(self, cmd_name: str, cmd_help: Callable[..., None]) -> None: 684 help_func_name = HELP_FUNC_PREFIX + cmd_name 685 686 if hasattr(self, help_func_name): 687 raise CommandSetRegistrationError(f'Attribute already exists: {help_func_name}') 688 setattr(self, help_func_name, cmd_help) 689 690 def unregister_command_set(self, cmdset: CommandSet) -> None: 691 """ 692 Uninstalls a CommandSet and unloads all associated commands 693 694 :param cmdset: CommandSet to uninstall 695 """ 696 if cmdset in self._installed_command_sets: 697 self._check_uninstallable(cmdset) 698 cmdset.on_unregister() 699 self._unregister_subcommands(cmdset) 700 701 methods = inspect.getmembers( 702 cmdset, 703 predicate=lambda meth: isinstance(meth, Callable) # type: ignore[arg-type] 704 and hasattr(meth, '__name__') 705 and meth.__name__.startswith(COMMAND_FUNC_PREFIX), 706 ) 707 708 for method in methods: 709 cmd_name = method[0][len(COMMAND_FUNC_PREFIX) :] 710 711 # Enable the command before uninstalling it to make sure we remove both 712 # the real functions and the ones used by the DisabledCommand object. 713 if cmd_name in self.disabled_commands: 714 self.enable_command(cmd_name) 715 716 if cmd_name in self._cmd_to_command_sets: 717 del self._cmd_to_command_sets[cmd_name] 718 719 delattr(self, COMMAND_FUNC_PREFIX + cmd_name) 720 721 if hasattr(self, COMPLETER_FUNC_PREFIX + cmd_name): 722 delattr(self, COMPLETER_FUNC_PREFIX + cmd_name) 723 if hasattr(self, HELP_FUNC_PREFIX + cmd_name): 724 delattr(self, HELP_FUNC_PREFIX + cmd_name) 725 726 cmdset.on_unregistered() 727 self._installed_command_sets.remove(cmdset) 728 729 def _check_uninstallable(self, cmdset: CommandSet) -> None: 730 methods = inspect.getmembers( 731 cmdset, 732 predicate=lambda meth: isinstance(meth, Callable) # type: ignore[arg-type] 733 and hasattr(meth, '__name__') 734 and meth.__name__.startswith(COMMAND_FUNC_PREFIX), 735 ) 736 737 for method in methods: 738 command_name = method[0][len(COMMAND_FUNC_PREFIX) :] 739 740 # Search for the base command function and verify it has an argparser defined 741 if command_name in self.disabled_commands: 742 command_func = self.disabled_commands[command_name].command_function 743 else: 744 command_func = self.cmd_func(command_name) 745 746 command_parser = cast(argparse.ArgumentParser, getattr(command_func, constants.CMD_ATTR_ARGPARSER, None)) 747 748 def check_parser_uninstallable(parser: argparse.ArgumentParser) -> None: 749 for action in parser._actions: 750 if isinstance(action, argparse._SubParsersAction): 751 for subparser in action.choices.values(): 752 attached_cmdset = getattr(subparser, constants.PARSER_ATTR_COMMANDSET, None) 753 if attached_cmdset is not None and attached_cmdset is not cmdset: 754 raise CommandSetRegistrationError( 755 'Cannot uninstall CommandSet when another CommandSet depends on it' 756 ) 757 check_parser_uninstallable(subparser) 758 break 759 760 if command_parser is not None: 761 check_parser_uninstallable(command_parser) 762 763 def _register_subcommands(self, cmdset: Union[CommandSet, 'Cmd']) -> None: 764 """ 765 Register subcommands with their base command 766 767 :param cmdset: CommandSet or cmd2.Cmd subclass containing subcommands 768 """ 769 if not (cmdset is self or cmdset in self._installed_command_sets): 770 raise CommandSetRegistrationError('Cannot register subcommands with an unregistered CommandSet') 771 772 # find methods that have the required attributes necessary to be recognized as a sub-command 773 methods = inspect.getmembers( 774 cmdset, 775 predicate=lambda meth: isinstance(meth, Callable) # type: ignore[arg-type] 776 and hasattr(meth, constants.SUBCMD_ATTR_NAME) 777 and hasattr(meth, constants.SUBCMD_ATTR_COMMAND) 778 and hasattr(meth, constants.CMD_ATTR_ARGPARSER), 779 ) 780 781 # iterate through all matching methods 782 for method_name, method in methods: 783 subcommand_name: str = getattr(method, constants.SUBCMD_ATTR_NAME) 784 full_command_name: str = getattr(method, constants.SUBCMD_ATTR_COMMAND) 785 subcmd_parser = getattr(method, constants.CMD_ATTR_ARGPARSER) 786 787 subcommand_valid, errmsg = self.statement_parser.is_valid_command(subcommand_name, is_subcommand=True) 788 if not subcommand_valid: 789 raise CommandSetRegistrationError(f'Subcommand {str(subcommand_name)} is not valid: {errmsg}') 790 791 command_tokens = full_command_name.split() 792 command_name = command_tokens[0] 793 subcommand_names = command_tokens[1:] 794 795 # Search for the base command function and verify it has an argparser defined 796 if command_name in self.disabled_commands: 797 command_func = self.disabled_commands[command_name].command_function 798 else: 799 command_func = self.cmd_func(command_name) 800 801 if command_func is None: 802 raise CommandSetRegistrationError( 803 f"Could not find command '{command_name}' needed by subcommand: {str(method)}" 804 ) 805 command_parser = getattr(command_func, constants.CMD_ATTR_ARGPARSER, None) 806 if command_parser is None: 807 raise CommandSetRegistrationError( 808 f"Could not find argparser for command '{command_name}' needed by subcommand: {str(method)}" 809 ) 810 811 def find_subcommand(action: argparse.ArgumentParser, subcmd_names: List[str]) -> argparse.ArgumentParser: 812 if not subcmd_names: 813 return action 814 cur_subcmd = subcmd_names.pop(0) 815 for sub_action in action._actions: 816 if isinstance(sub_action, argparse._SubParsersAction): 817 for choice_name, choice in sub_action.choices.items(): 818 if choice_name == cur_subcmd: 819 return find_subcommand(choice, subcmd_names) 820 break 821 raise CommandSetRegistrationError(f"Could not find subcommand '{full_command_name}'") 822 823 target_parser = find_subcommand(command_parser, subcommand_names) 824 825 for action in target_parser._actions: 826 if isinstance(action, argparse._SubParsersAction): 827 # Temporary workaround for avoiding subcommand help text repeatedly getting added to 828 # action._choices_actions. Until we have instance-specific parser objects, we will remove 829 # any existing subcommand which has the same name before replacing it. This problem is 830 # exercised when more than one cmd2.Cmd-based object is created and the same subcommands 831 # get added each time. Argparse overwrites the previous subcommand but keeps growing the help 832 # text which is shown by running something like 'alias -h'. 833 action.remove_parser(subcommand_name) # type: ignore[arg-type,attr-defined] 834 835 # Get the kwargs for add_parser() 836 add_parser_kwargs = getattr(method, constants.SUBCMD_ATTR_ADD_PARSER_KWARGS, {}) 837 838 # Set subcmd_parser as the parent to the parser we're creating to get its arguments 839 add_parser_kwargs['parents'] = [subcmd_parser] 840 841 # argparse only copies actions from a parent and not the following settings. 842 # To retain these settings, we will copy them from subcmd_parser and pass them 843 # as ArgumentParser constructor arguments to add_parser(). 844 add_parser_kwargs['prog'] = subcmd_parser.prog 845 add_parser_kwargs['usage'] = subcmd_parser.usage 846 add_parser_kwargs['description'] = subcmd_parser.description 847 add_parser_kwargs['epilog'] = subcmd_parser.epilog 848 add_parser_kwargs['formatter_class'] = subcmd_parser.formatter_class 849 add_parser_kwargs['prefix_chars'] = subcmd_parser.prefix_chars 850 add_parser_kwargs['fromfile_prefix_chars'] = subcmd_parser.fromfile_prefix_chars 851 add_parser_kwargs['argument_default'] = subcmd_parser.argument_default 852 add_parser_kwargs['conflict_handler'] = subcmd_parser.conflict_handler 853 add_parser_kwargs['allow_abbrev'] = subcmd_parser.allow_abbrev 854 855 # Set add_help to False and use whatever help option subcmd_parser already has 856 add_parser_kwargs['add_help'] = False 857 858 attached_parser = action.add_parser(subcommand_name, **add_parser_kwargs) 859 860 # Set the subcommand handler 861 defaults = {constants.NS_ATTR_SUBCMD_HANDLER: method} 862 attached_parser.set_defaults(**defaults) 863 864 # Copy value for custom ArgparseCompleter type, which will be None if not present on subcmd_parser 865 attached_parser.set_ap_completer_type(subcmd_parser.get_ap_completer_type()) # type: ignore[attr-defined] 866 867 # Set what instance the handler is bound to 868 setattr(attached_parser, constants.PARSER_ATTR_COMMANDSET, cmdset) 869 break 870 871 def _unregister_subcommands(self, cmdset: Union[CommandSet, 'Cmd']) -> None: 872 """ 873 Unregister subcommands from their base command 874 875 :param cmdset: CommandSet containing subcommands 876 """ 877 if not (cmdset is self or cmdset in self._installed_command_sets): 878 raise CommandSetRegistrationError('Cannot unregister subcommands with an unregistered CommandSet') 879 880 # find methods that have the required attributes necessary to be recognized as a sub-command 881 methods = inspect.getmembers( 882 cmdset, 883 predicate=lambda meth: isinstance(meth, Callable) # type: ignore[arg-type] 884 and hasattr(meth, constants.SUBCMD_ATTR_NAME) 885 and hasattr(meth, constants.SUBCMD_ATTR_COMMAND) 886 and hasattr(meth, constants.CMD_ATTR_ARGPARSER), 887 ) 888 889 # iterate through all matching methods 890 for method_name, method in methods: 891 subcommand_name = getattr(method, constants.SUBCMD_ATTR_NAME) 892 command_name = getattr(method, constants.SUBCMD_ATTR_COMMAND) 893 894 # Search for the base command function and verify it has an argparser defined 895 if command_name in self.disabled_commands: 896 command_func = self.disabled_commands[command_name].command_function 897 else: 898 command_func = self.cmd_func(command_name) 899 900 if command_func is None: # pragma: no cover 901 # This really shouldn't be possible since _register_subcommands would prevent this from happening 902 # but keeping in case it does for some strange reason 903 raise CommandSetRegistrationError( 904 f"Could not find command '{command_name}' needed by subcommand: {str(method)}" 905 ) 906 command_parser = getattr(command_func, constants.CMD_ATTR_ARGPARSER, None) 907 if command_parser is None: # pragma: no cover 908 # This really shouldn't be possible since _register_subcommands would prevent this from happening 909 # but keeping in case it does for some strange reason 910 raise CommandSetRegistrationError( 911 f"Could not find argparser for command '{command_name}' needed by subcommand: {str(method)}" 912 ) 913 914 for action in command_parser._actions: 915 if isinstance(action, argparse._SubParsersAction): 916 action.remove_parser(subcommand_name) # type: ignore[arg-type,attr-defined] 917 break 918 919 @property 920 def always_prefix_settables(self) -> bool: 921 """ 922 Flags whether CommandSet settable values should always be prefixed 923 924 :return: True if CommandSet settable values will always be prefixed. False if not. 925 """ 926 return self._always_prefix_settables 927 928 @always_prefix_settables.setter 929 def always_prefix_settables(self, new_value: bool) -> None: 930 """ 931 Set whether CommandSet settable values should always be prefixed. 932 933 :param new_value: True if CommandSet settable values should always be prefixed. False if not. 934 :raises ValueError: If a registered CommandSet does not have a defined prefix 935 """ 936 if not self._always_prefix_settables and new_value: 937 for cmd_set in self._installed_command_sets: 938 if not cmd_set.settable_prefix: 939 raise ValueError( 940 f'Cannot force settable prefixes. CommandSet {cmd_set.__class__.__name__} does ' 941 f'not have a settable prefix defined.' 942 ) 943 self._always_prefix_settables = new_value 944 945 @property 946 def settables(self) -> Mapping[str, Settable]: 947 """ 948 Get all available user-settable attributes. This includes settables defined in installed CommandSets 949 950 :return: Mapping from attribute-name to Settable of all user-settable attributes from 951 """ 952 all_settables = dict(self._settables) 953 for cmd_set in self._installed_command_sets: 954 cmdset_settables = cmd_set.settables 955 for settable_name, settable in cmdset_settables.items(): 956 if self.always_prefix_settables: 957 all_settables[f'{cmd_set.settable_prefix}.{settable_name}'] = settable 958 else: 959 all_settables[settable_name] = settable 960 return all_settables 961 962 def add_settable(self, settable: Settable) -> None: 963 """ 964 Add a settable parameter to ``self.settables`` 965 966 :param settable: Settable object being added 967 """ 968 if not self.always_prefix_settables: 969 if settable.name in self.settables.keys() and settable.name not in self._settables.keys(): 970 raise KeyError(f'Duplicate settable: {settable.name}') 971 self._settables[settable.name] = settable 972 973 def remove_settable(self, name: str) -> None: 974 """ 975 Convenience method for removing a settable parameter from ``self.settables`` 976 977 :param name: name of the settable being removed 978 :raises: KeyError if the Settable matches this name 979 """ 980 try: 981 del self._settables[name] 982 except KeyError: 983 raise KeyError(name + " is not a settable parameter") 984 985 def build_settables(self) -> None: 986 """Create the dictionary of user-settable parameters""" 987 988 def get_allow_style_choices(cli_self: Cmd) -> List[str]: 989 """Used to tab complete allow_style values""" 990 return [val.name.lower() for val in ansi.AllowStyle] 991 992 def allow_style_type(value: str) -> ansi.AllowStyle: 993 """Converts a string value into an ansi.AllowStyle""" 994 try: 995 return ansi.AllowStyle[value.upper()] 996 except KeyError: 997 raise ValueError( 998 f"must be {ansi.AllowStyle.ALWAYS}, {ansi.AllowStyle.NEVER}, or " 999 f"{ansi.AllowStyle.TERMINAL} (case-insensitive)" 1000 ) 1001 1002 self.add_settable( 1003 Settable( 1004 'allow_style', 1005 allow_style_type, 1006 'Allow ANSI text style sequences in output (valid values: ' 1007 f'{ansi.AllowStyle.ALWAYS}, {ansi.AllowStyle.NEVER}, {ansi.AllowStyle.TERMINAL})', 1008 self, 1009 choices_provider=cast(ChoicesProviderFunc, get_allow_style_choices), 1010 ) 1011 ) 1012 1013 self.add_settable( 1014 Settable( 1015 'always_show_hint', 1016 bool, 1017 'Display tab completion hint even when completion suggestions print', 1018 self, 1019 ) 1020 ) 1021 self.add_settable(Settable('debug', bool, "Show full traceback on exception", self)) 1022 self.add_settable(Settable('echo', bool, "Echo command issued into output", self)) 1023 self.add_settable(Settable('editor', str, "Program used by 'edit'", self)) 1024 self.add_settable(Settable('feedback_to_output', bool, "Include nonessentials in '|', '>' results", self)) 1025 self.add_settable( 1026 Settable('max_completion_items', int, "Maximum number of CompletionItems to display during tab completion", self) 1027 ) 1028 self.add_settable(Settable('quiet', bool, "Don't print nonessential feedback", self)) 1029 self.add_settable(Settable('timing', bool, "Report execution times", self)) 1030 1031 # ----- Methods related to presenting output to the user ----- 1032 1033 @property 1034 def allow_style(self) -> ansi.AllowStyle: 1035 """Read-only property needed to support do_set when it reads allow_style""" 1036 return ansi.allow_style 1037 1038 @allow_style.setter 1039 def allow_style(self, new_val: ansi.AllowStyle) -> None: 1040 """Setter property needed to support do_set when it updates allow_style""" 1041 ansi.allow_style = new_val 1042 1043 def _completion_supported(self) -> bool: 1044 """Return whether tab completion is supported""" 1045 return self.use_rawinput and bool(self.completekey) and rl_type != RlType.NONE 1046 1047 @property 1048 def visible_prompt(self) -> str: 1049 """Read-only property to get the visible prompt with any ANSI style escape codes stripped. 1050 1051 Used by transcript testing to make it easier and more reliable when users are doing things like coloring the 1052 prompt using ANSI color codes. 1053 1054 :return: prompt stripped of any ANSI escape codes 1055 """ 1056 return ansi.strip_style(self.prompt) 1057 1058 def poutput(self, msg: Any = '', *, end: str = '\n') -> None: 1059 """Print message to self.stdout and appends a newline by default 1060 1061 Also handles BrokenPipeError exceptions for when a command's output has 1062 been piped to another process and that process terminates before the 1063 cmd2 command is finished executing. 1064 1065 :param msg: object to print 1066 :param end: string appended after the end of the message, default a newline 1067 """ 1068 try: 1069 ansi.style_aware_write(self.stdout, f"{msg}{end}") 1070 except BrokenPipeError: 1071 # This occurs if a command's output is being piped to another 1072 # process and that process closes before the command is 1073 # finished. If you would like your application to print a 1074 # warning message, then set the broken_pipe_warning attribute 1075 # to the message you want printed. 1076 if self.broken_pipe_warning: 1077 sys.stderr.write(self.broken_pipe_warning) 1078 1079 # noinspection PyMethodMayBeStatic 1080 def perror(self, msg: Any = '', *, end: str = '\n', apply_style: bool = True) -> None: 1081 """Print message to sys.stderr 1082 1083 :param msg: object to print 1084 :param end: string appended after the end of the message, default a newline 1085 :param apply_style: If True, then ansi.style_error will be applied to the message text. Set to False in cases 1086 where the message text already has the desired style. Defaults to True. 1087 """ 1088 if apply_style: 1089 final_msg = ansi.style_error(msg) 1090 else: 1091 final_msg = str(msg) 1092 ansi.style_aware_write(sys.stderr, final_msg + end) 1093 1094 def pwarning(self, msg: Any = '', *, end: str = '\n', apply_style: bool = True) -> None: 1095 """Wraps perror, but applies ansi.style_warning by default 1096 1097 :param msg: object to print 1098 :param end: string appended after the end of the message, default a newline 1099 :param apply_style: If True, then ansi.style_warning will be applied to the message text. Set to False in cases 1100 where the message text already has the desired style. Defaults to True. 1101 """ 1102 if apply_style: 1103 msg = ansi.style_warning(msg) 1104 self.perror(msg, end=end, apply_style=False) 1105 1106 def pexcept(self, msg: Any, *, end: str = '\n', apply_style: bool = True) -> None: 1107 """Print Exception message to sys.stderr. If debug is true, print exception traceback if one exists. 1108 1109 :param msg: message or Exception to print 1110 :param end: string appended after the end of the message, default a newline 1111 :param apply_style: If True, then ansi.style_error will be applied to the message text. Set to False in cases 1112 where the message text already has the desired style. Defaults to True. 1113 """ 1114 if self.debug and sys.exc_info() != (None, None, None): 1115 import traceback 1116 1117 traceback.print_exc() 1118 1119 if isinstance(msg, Exception): 1120 final_msg = f"EXCEPTION of type '{type(msg).__name__}' occurred with message: {msg}" 1121 else: 1122 final_msg = str(msg) 1123 1124 if apply_style: 1125 final_msg = ansi.style_error(final_msg) 1126 1127 if not self.debug and 'debug' in self.settables: 1128 warning = "\nTo enable full traceback, run the following command: 'set debug true'" 1129 final_msg += ansi.style_warning(warning) 1130 1131 self.perror(final_msg, end=end, apply_style=False) 1132 1133 def pfeedback(self, msg: Any, *, end: str = '\n') -> None: 1134 """For printing nonessential feedback. Can be silenced with `quiet`. 1135 Inclusion in redirected output is controlled by `feedback_to_output`. 1136 1137 :param msg: object to print 1138 :param end: string appended after the end of the message, default a newline 1139 """ 1140 if not self.quiet: 1141 if self.feedback_to_output: 1142 self.poutput(msg, end=end) 1143 else: 1144 self.perror(msg, end=end, apply_style=False) 1145 1146 def ppaged(self, msg: Any, *, end: str = '\n', chop: bool = False) -> None: 1147 """Print output using a pager if it would go off screen and stdout isn't currently being redirected. 1148 1149 Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when 1150 stdout or stdin are not a fully functional terminal. 1151 1152 :param msg: object to print 1153 :param end: string appended after the end of the message, default a newline 1154 :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped 1155 - truncated text is still accessible by scrolling with the right & left arrow keys 1156 - chopping is ideal for displaying wide tabular data as is done in utilities like pgcli 1157 False -> causes lines longer than the screen width to wrap to the next line 1158 - wrapping is ideal when you want to keep users from having to use horizontal scrolling 1159 1160 WARNING: On Windows, the text always wraps regardless of what the chop argument is set to 1161 """ 1162 # msg can be any type, so convert to string before checking if it's blank 1163 msg_str = str(msg) 1164 1165 # Consider None to be no data to print 1166 if msg is None or msg_str == '': 1167 return 1168 1169 try: 1170 import subprocess 1171 1172 # Attempt to detect if we are not running within a fully functional terminal. 1173 # Don't try to use the pager when being run by a continuous integration system like Jenkins + pexpect. 1174 functional_terminal = False 1175 1176 if self.stdin.isatty() and self.stdout.isatty(): 1177 if sys.platform.startswith('win') or os.environ.get('TERM') is not None: 1178 functional_terminal = True 1179 1180 # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python) 1181 # Also only attempt to use a pager if actually running in a real fully functional terminal 1182 if functional_terminal and not self._redirecting and not self.in_pyscript() and not self.in_script(): 1183 if ansi.allow_style == ansi.AllowStyle.NEVER: 1184 msg_str = ansi.strip_style(msg_str) 1185 msg_str += end 1186 1187 pager = self.pager 1188 if chop: 1189 pager = self.pager_chop 1190 1191 # Prevent KeyboardInterrupts while in the pager. The pager application will 1192 # still receive the SIGINT since it is in the same process group as us. 1193 with self.sigint_protection: 1194 pipe_proc = subprocess.Popen(pager, shell=True, stdin=subprocess.PIPE) 1195 pipe_proc.communicate(msg_str.encode('utf-8', 'replace')) 1196 else: 1197 self.poutput(msg_str, end=end) 1198 except BrokenPipeError: 1199 # This occurs if a command's output is being piped to another process and that process closes before the 1200 # command is finished. If you would like your application to print a warning message, then set the 1201 # broken_pipe_warning attribute to the message you want printed.` 1202 if self.broken_pipe_warning: 1203 sys.stderr.write(self.broken_pipe_warning) 1204 1205 # ----- Methods related to tab completion ----- 1206 1207 def _reset_completion_defaults(self) -> None: 1208 """ 1209 Resets tab completion settings 1210 Needs to be called each time readline runs tab completion 1211 """ 1212 self.allow_appended_space = True 1213 self.allow_closing_quote = True 1214 self.completion_hint = '' 1215 self.formatted_completions = '' 1216 self.completion_matches = [] 1217 self.display_matches = [] 1218 self.matches_delimited = False 1219 self.matches_sorted = False 1220 1221 if rl_type == RlType.GNU: 1222 readline.set_completion_display_matches_hook(self._display_matches_gnu_readline) 1223 elif rl_type == RlType.PYREADLINE: 1224 # noinspection PyUnresolvedReferences 1225 readline.rl.mode._display_completions = self._display_matches_pyreadline 1226 1227 def tokens_for_completion(self, line: str, begidx: int, endidx: int) -> Tuple[List[str], List[str]]: 1228 """Used by tab completion functions to get all tokens through the one being completed. 1229 1230 :param line: the current input line with leading whitespace removed 1231 :param begidx: the beginning index of the prefix text 1232 :param endidx: the ending index of the prefix text 1233 :return: A 2 item tuple where the items are 1234 **On Success** 1235 - tokens: list of unquoted tokens - this is generally the list needed for tab completion functions 1236 - raw_tokens: list of tokens with any quotes preserved = this can be used to know if a token was quoted 1237 or is missing a closing quote 1238 Both lists are guaranteed to have at least 1 item. The last item in both lists is the token being tab 1239 completed 1240 **On Failure** 1241 - Two empty lists 1242 """ 1243 import copy 1244 1245 unclosed_quote = '' 1246 quotes_to_try = copy.copy(constants.QUOTES) 1247 1248 tmp_line = line[:endidx] 1249 tmp_endidx = endidx 1250 1251 # Parse the line into tokens 1252 while True: 1253 try: 1254 initial_tokens = shlex_split(tmp_line[:tmp_endidx]) 1255 1256 # If the cursor is at an empty token outside of a quoted string, 1257 # then that is the token being completed. Add it to the list. 1258 if not unclosed_quote and begidx == tmp_endidx: 1259 initial_tokens.append('') 1260 break 1261 except ValueError as ex: 1262 # Make sure the exception was due to an unclosed quote and 1263 # we haven't exhausted the closing quotes to try 1264 if str(ex) == "No closing quotation" and quotes_to_try: 1265 # Add a closing quote and try to parse again 1266 unclosed_quote = quotes_to_try[0] 1267 quotes_to_try = quotes_to_try[1:] 1268 1269 tmp_line = line[:endidx] 1270 tmp_line += unclosed_quote 1271 tmp_endidx = endidx + 1 1272 else: # pragma: no cover 1273 # The parsing error is not caused by unclosed quotes. 1274 # Return empty lists since this means the line is malformed. 1275 return [], [] 1276 1277 # Further split tokens on punctuation characters 1278 raw_tokens = self.statement_parser.split_on_punctuation(initial_tokens) 1279 1280 # Save the unquoted tokens 1281 tokens = [utils.strip_quotes(cur_token) for cur_token in raw_tokens] 1282 1283 # If the token being completed had an unclosed quote, we need 1284 # to remove the closing quote that was added in order for it 1285 # to match what was on the command line. 1286 if unclosed_quote: 1287 raw_tokens[-1] = raw_tokens[-1][:-1] 1288 1289 return tokens, raw_tokens 1290 1291 # noinspection PyMethodMayBeStatic, PyUnusedLocal 1292 def basic_complete( 1293 self, 1294 text: str, 1295 line: str, 1296 begidx: int, 1297 endidx: int, 1298 match_against: Iterable[str], 1299 ) -> List[str]: 1300 """ 1301 Basic tab completion function that matches against a list of strings without considering line contents 1302 or cursor position. The args required by this function are defined in the header of Python's cmd.py. 1303 1304 :param text: the string prefix we are attempting to match (all matches must begin with it) 1305 :param line: the current input line with leading whitespace removed 1306 :param begidx: the beginning index of the prefix text 1307 :param endidx: the ending index of the prefix text 1308 :param match_against: the strings being matched against 1309 :return: a list of possible tab completions 1310 """ 1311 return [cur_match for cur_match in match_against if cur_match.startswith(text)] 1312 1313 def delimiter_complete( 1314 self, 1315 text: str, 1316 line: str, 1317 begidx: int, 1318 endidx: int, 1319 match_against: Iterable[str], 1320 delimiter: str, 1321 ) -> List[str]: 1322 """ 1323 Performs tab completion against a list but each match is split on a delimiter and only 1324 the portion of the match being tab completed is shown as the completion suggestions. 1325 This is useful if you match against strings that are hierarchical in nature and have a 1326 common delimiter. 1327 1328 An easy way to illustrate this concept is path completion since paths are just directories/files 1329 delimited by a slash. If you are tab completing items in /home/user you don't get the following 1330 as suggestions: 1331 1332 /home/user/file.txt /home/user/program.c 1333 /home/user/maps/ /home/user/cmd2.py 1334 1335 Instead you are shown: 1336 1337 file.txt program.c 1338 maps/ cmd2.py 1339 1340 For a large set of data, this can be visually more pleasing and easier to search. 1341 1342 Another example would be strings formatted with the following syntax: company::department::name 1343 In this case the delimiter would be :: and the user could easily narrow down what they are looking 1344 for if they were only shown suggestions in the category they are at in the string. 1345 1346 :param text: the string prefix we are attempting to match (all matches must begin with it) 1347 :param line: the current input line with leading whitespace removed 1348 :param begidx: the beginning index of the prefix text 1349 :param endidx: the ending index of the prefix text 1350 :param match_against: the list being matched against 1351 :param delimiter: what delimits each portion of the matches (ex: paths are delimited by a slash) 1352 :return: a list of possible tab completions 1353 """ 1354 matches = self.basic_complete(text, line, begidx, endidx, match_against) 1355 1356 # Display only the portion of the match that's being completed based on delimiter 1357 if matches: 1358 # Set this to True for proper quoting of matches with spaces 1359 self.matches_delimited = True 1360 1361 # Get the common beginning for the matches 1362 common_prefix = os.path.commonprefix(matches) 1363 prefix_tokens = common_prefix.split(delimiter) 1364 1365 # Calculate what portion of the match we are completing 1366 display_token_index = 0 1367 if prefix_tokens: 1368 display_token_index = len(prefix_tokens) - 1 1369 1370 # Get this portion for each match and store them in self.display_matches 1371 for cur_match in matches: 1372 match_tokens = cur_match.split(delimiter) 1373 display_token = match_tokens[display_token_index] 1374 1375 if not display_token: 1376 display_token = delimiter 1377 self.display_matches.append(display_token) 1378 1379 return matches 1380 1381 def flag_based_complete( 1382 self, 1383 text: str, 1384 line: str, 1385 begidx: int, 1386 endidx: int, 1387 flag_dict: Dict[str, Union[Iterable[str], CompleterFunc]], 1388 *, 1389 all_else: Union[None, Iterable[str], CompleterFunc] = None, 1390 ) -> List[str]: 1391 """Tab completes based on a particular flag preceding the token being completed. 1392 1393 :param text: the string prefix we are attempting to match (all matches must begin with it) 1394 :param line: the current input line with leading whitespace removed 1395 :param begidx: the beginning index of the prefix text 1396 :param endidx: the ending index of the prefix text 1397 :param flag_dict: dictionary whose structure is the following: 1398 `keys` - flags (ex: -c, --create) that result in tab completion for the next argument in the 1399 command line 1400 `values` - there are two types of values: 1401 1. iterable list of strings to match against (dictionaries, lists, etc.) 1402 2. function that performs tab completion (ex: path_complete) 1403 :param all_else: an optional parameter for tab completing any token that isn't preceded by a flag in flag_dict 1404 :return: a list of possible tab completions 1405 """ 1406 # Get all tokens through the one being completed 1407 tokens, _ = self.tokens_for_completion(line, begidx, endidx) 1408 if not tokens: # pragma: no cover 1409 return [] 1410 1411 completions_matches = [] 1412 match_against = all_else 1413 1414 # Must have at least 2 args for a flag to precede the token being completed 1415 if len(tokens) > 1: 1416 flag = tokens[-2] 1417 if flag in flag_dict: 1418 match_against = flag_dict[flag] 1419 1420 # Perform tab completion using an Iterable 1421 if isinstance(match_against, Iterable): 1422 completions_matches = self.basic_complete(text, line, begidx, endidx, match_against) 1423 1424 # Perform tab completion using a function 1425 elif callable(match_against): 1426 completions_matches = match_against(text, line, begidx, endidx) 1427 1428 return completions_matches 1429 1430 def index_based_complete( 1431 self, 1432 text: str, 1433 line: str, 1434 begidx: int, 1435 endidx: int, 1436 index_dict: Mapping[int, Union[Iterable[str], CompleterFunc]], 1437 *, 1438 all_else: Optional[Union[Iterable[str], CompleterFunc]] = None, 1439 ) -> List[str]: 1440 """Tab completes based on a fixed position in the input string. 1441 1442 :param text: the string prefix we are attempting to match (all matches must begin with it) 1443 :param line: the current input line with leading whitespace removed 1444 :param begidx: the beginning index of the prefix text 1445 :param endidx: the ending index of the prefix text 1446 :param index_dict: dictionary whose structure is the following: 1447 `keys` - 0-based token indexes into command line that determine which tokens perform tab 1448 completion 1449 `values` - there are two types of values: 1450 1. iterable list of strings to match against (dictionaries, lists, etc.) 1451 2. function that performs tab completion (ex: path_complete) 1452 :param all_else: an optional parameter for tab completing any token that isn't at an index in index_dict 1453 :return: a list of possible tab completions 1454 """ 1455 # Get all tokens through the one being completed 1456 tokens, _ = self.tokens_for_completion(line, begidx, endidx) 1457 if not tokens: # pragma: no cover 1458 return [] 1459 1460 matches = [] 1461 1462 # Get the index of the token being completed 1463 index = len(tokens) - 1 1464 1465 # Check if token is at an index in the dictionary 1466 match_against: Optional[Union[Iterable[str], CompleterFunc]] 1467 if index in index_dict: 1468 match_against = index_dict[index] 1469 else: 1470 match_against = all_else 1471 1472 # Perform tab completion using a Iterable 1473 if isinstance(match_against, Iterable): 1474 matches = self.basic_complete(text, line, begidx, endidx, match_against) 1475 1476 # Perform tab completion using a function 1477 elif callable(match_against): 1478 matches = match_against(text, line, begidx, endidx) 1479 1480 return matches 1481 1482 # noinspection PyUnusedLocal 1483 def path_complete( 1484 self, text: str, line: str, begidx: int, endidx: int, *, path_filter: Optional[Callable[[str], bool]] = None 1485 ) -> List[str]: 1486 """Performs completion of local file system paths 1487 1488 :param text: the string prefix we are attempting to match (all matches must begin with it) 1489 :param line: the current input line with leading whitespace removed 1490 :param begidx: the beginning index of the prefix text 1491 :param endidx: the ending index of the prefix text 1492 :param path_filter: optional filter function that determines if a path belongs in the results 1493 this function takes a path as its argument and returns True if the path should 1494 be kept in the results 1495 :return: a list of possible tab completions 1496 """ 1497 1498 # Used to complete ~ and ~user strings 1499 def complete_users() -> List[str]: 1500 1501 # We are returning ~user strings that resolve to directories, 1502 # so don't append a space or quote in the case of a single result. 1503 self.allow_appended_space = False 1504 self.allow_closing_quote = False 1505 1506 users = [] 1507 1508 # Windows lacks the pwd module so we can't get a list of users. 1509 # Instead we will return a result once the user enters text that 1510 # resolves to an existing home directory. 1511 if sys.platform.startswith('win'): 1512 expanded_path = os.path.expanduser(text) 1513 if os.path.isdir(expanded_path): 1514 user = text 1515 if add_trailing_sep_if_dir: 1516 user += os.path.sep 1517 users.append(user) 1518 else: 1519 import pwd 1520 1521 # Iterate through a list of users from the password database 1522 for cur_pw in pwd.getpwall(): 1523 1524 # Check if the user has an existing home dir 1525 if os.path.isdir(cur_pw.pw_dir): 1526 1527 # Add a ~ to the user to match against text 1528 cur_user = '~' + cur_pw.pw_name 1529 if cur_user.startswith(text): 1530 if add_trailing_sep_if_dir: 1531 cur_user += os.path.sep 1532 users.append(cur_user) 1533 1534 return users 1535 1536 # Determine if a trailing separator should be appended to directory completions 1537 add_trailing_sep_if_dir = False 1538 if endidx == len(line) or (endidx < len(line) and line[endidx] != os.path.sep): 1539 add_trailing_sep_if_dir = True 1540 1541 # Used to replace cwd in the final results 1542 cwd = os.getcwd() 1543 cwd_added = False 1544 1545 # Used to replace expanded user path in final result 1546 orig_tilde_path = '' 1547 expanded_tilde_path = '' 1548 1549 # If the search text is blank, then search in the CWD for * 1550 if not text: 1551 search_str = os.path.join(os.getcwd(), '*') 1552 cwd_added = True 1553 else: 1554 # Purposely don't match any path containing wildcards 1555 wildcards = ['*', '?'] 1556 for wildcard in wildcards: 1557 if wildcard in text: 1558 return [] 1559 1560 # Start the search string 1561 search_str = text + '*' 1562 1563 # Handle tilde expansion and completion 1564 if text.startswith('~'): 1565 sep_index = text.find(os.path.sep, 1) 1566 1567 # If there is no slash, then the user is still completing the user after the tilde 1568 if sep_index == -1: 1569 return complete_users() 1570 1571 # Otherwise expand the user dir 1572 else: 1573 search_str = os.path.expanduser(search_str) 1574 1575 # Get what we need to restore the original tilde path later 1576 orig_tilde_path = text[:sep_index] 1577 expanded_tilde_path = os.path.expanduser(orig_tilde_path) 1578 1579 # If the search text does not have a directory, then use the cwd 1580 elif not os.path.dirname(text): 1581 search_str = os.path.join(os.getcwd(), search_str) 1582 cwd_added = True 1583 1584 # Set this to True for proper quoting of paths with spaces 1585 self.matches_delimited = True 1586 1587 # Find all matching path completions 1588 matches = glob.glob(search_str) 1589 1590 # Filter out results that don't belong 1591 if path_filter is not None: 1592 matches = [c for c in matches if path_filter(c)] 1593 1594 # Don't append a space or closing quote to directory 1595 if len(matches) == 1 and os.path.isdir(matches[0]): 1596 self.allow_appended_space = False 1597 self.allow_closing_quote = False 1598 1599 # Sort the matches before any trailing slashes are added 1600 matches.sort(key=self.default_sort_key) 1601 self.matches_sorted = True 1602 1603 # Build display_matches and add a slash to directories 1604 for index, cur_match in enumerate(matches): 1605 1606 # Display only the basename of this path in the tab completion suggestions 1607 self.display_matches.append(os.path.basename(cur_match)) 1608 1609 # Add a separator after directories if the next character isn't already a separator 1610 if os.path.isdir(cur_match) and add_trailing_sep_if_dir: 1611 matches[index] += os.path.sep 1612 self.display_matches[index] += os.path.sep 1613 1614 # Remove cwd if it was added to match the text readline expects 1615 if cwd_added: 1616 if cwd == os.path.sep: 1617 to_replace = cwd 1618 else: 1619 to_replace = cwd + os.path.sep 1620 matches = [cur_path.replace(to_replace, '', 1) for cur_path in matches] 1621 1622 # Restore the tilde string if we expanded one to match the text readline expects 1623 if expanded_tilde_path: 1624 matches = [cur_path.replace(expanded_tilde_path, orig_tilde_path, 1) for cur_path in matches] 1625 1626 return matches 1627 1628 def shell_cmd_complete(self, text: str, line: str, begidx: int, endidx: int, *, complete_blank: bool = False) -> List[str]: 1629 """Performs completion of executables either in a user's path or a given path 1630 1631 :param text: the string prefix we are attempting to match (all matches must begin with it) 1632 :param line: the current input line with leading whitespace removed 1633 :param begidx: the beginning index of the prefix text 1634 :param endidx: the ending index of the prefix text 1635 :param complete_blank: If True, then a blank will complete all shell commands in a user's path. If False, then 1636 no completion is performed. Defaults to False to match Bash shell behavior. 1637 :return: a list of possible tab completions 1638 """ 1639 # Don't tab complete anything if no shell command has been started 1640 if not complete_blank and not text: 1641 return [] 1642 1643 # If there are no path characters in the search text, then do shell command completion in the user's path 1644 if not text.startswith('~') and os.path.sep not in text: 1645 return utils.get_exes_in_path(text) 1646 1647 # Otherwise look for executables in the given path 1648 else: 1649 return self.path_complete( 1650 text, line, begidx, endidx, path_filter=lambda path: os.path.isdir(path) or os.access(path, os.X_OK) 1651 ) 1652 1653 def _redirect_complete(self, text: str, line: str, begidx: int, endidx: int, compfunc: CompleterFunc) -> List[str]: 1654 """Called by complete() as the first tab completion function for all commands 1655 It determines if it should tab complete for redirection (|, >, >>) or use the 1656 completer function for the current command 1657 1658 :param text: the string prefix we are attempting to match (all matches must begin with it) 1659 :param line: the current input line with leading whitespace removed 1660 :param begidx: the beginning index of the prefix text 1661 :param endidx: the ending index of the prefix text 1662 :param compfunc: the completer function for the current command 1663 this will be called if we aren't completing for redirection 1664 :return: a list of possible tab completions 1665 """ 1666 # Get all tokens through the one being completed. We want the raw tokens 1667 # so we can tell if redirection strings are quoted and ignore them. 1668 _, raw_tokens = self.tokens_for_completion(line, begidx, endidx) 1669 if not raw_tokens: # pragma: no cover 1670 return [] 1671 1672 # Must at least have the command 1673 if len(raw_tokens) > 1: 1674 1675 # True when command line contains any redirection tokens 1676 has_redirection = False 1677 1678 # Keep track of state while examining tokens 1679 in_pipe = False 1680 in_file_redir = False 1681 do_shell_completion = False 1682 do_path_completion = False 1683 prior_token = None 1684 1685 for cur_token in raw_tokens: 1686 # Process redirection tokens 1687 if cur_token in constants.REDIRECTION_TOKENS: 1688 has_redirection = True 1689 1690 # Check if we are at a pipe 1691 if cur_token == constants.REDIRECTION_PIPE: 1692 # Do not complete bad syntax (e.g cmd | |) 1693 if prior_token == constants.REDIRECTION_PIPE: 1694 return [] 1695 1696 in_pipe = True 1697 in_file_redir = False 1698 1699 # Otherwise this is a file redirection token 1700 else: 1701 if prior_token in constants.REDIRECTION_TOKENS or in_file_redir: 1702 # Do not complete bad syntax (e.g cmd | >) (e.g cmd > blah >) 1703 return [] 1704 1705 in_pipe = False 1706 in_file_redir = True 1707 1708 # Only tab complete after redirection tokens if redirection is allowed 1709 elif self.allow_redirection: 1710 do_shell_completion = False 1711 do_path_completion = False 1712 1713 if prior_token == constants.REDIRECTION_PIPE: 1714 do_shell_completion = True 1715 elif in_pipe or prior_token in (constants.REDIRECTION_OUTPUT, constants.REDIRECTION_APPEND): 1716 do_path_completion = True 1717 1718 prior_token = cur_token 1719 1720 if do_shell_completion: 1721 return self.shell_cmd_complete(text, line, begidx, endidx) 1722 1723 elif do_path_completion: 1724 return self.path_complete(text, line, begidx, endidx) 1725 1726 # If there were redirection strings anywhere on the command line, then we 1727 # are no longer tab completing for the current command 1728 elif has_redirection: 1729 return [] 1730 1731 # Call the command's completer function 1732 return compfunc(text, line, begidx, endidx) 1733 1734 @staticmethod 1735 def _pad_matches_to_display(matches_to_display: List[str]) -> Tuple[List[str], int]: # pragma: no cover 1736 """Adds padding to the matches being displayed as tab completion suggestions. 1737 The default padding of readline/pyreadine is small and not visually appealing 1738 especially if matches have spaces. It appears very squished together. 1739 1740 :param matches_to_display: the matches being padded 1741 :return: the padded matches and length of padding that was added 1742 """ 1743 if rl_type == RlType.GNU: 1744 # Add 2 to the padding of 2 that readline uses for a total of 4. 1745 padding = 2 * ' ' 1746 1747 elif rl_type == RlType.PYREADLINE: 1748 # Add 3 to the padding of 1 that pyreadline uses for a total of 4. 1749 padding = 3 * ' ' 1750 1751 else: 1752 return matches_to_display, 0 1753 1754 return [cur_match + padding for cur_match in matches_to_display], len(padding) 1755 1756 def _display_matches_gnu_readline( 1757 self, substitution: str, matches: List[str], longest_match_length: int 1758 ) -> None: # pragma: no cover 1759 """Prints a match list using GNU readline's rl_display_match_list() 1760 1761 :param substitution: the substitution written to the command line 1762 :param matches: the tab completion matches to display 1763 :param longest_match_length: longest printed length of the matches 1764 """ 1765 if rl_type == RlType.GNU: 1766 1767 # Print hint if one exists and we are supposed to display it 1768 hint_printed = False 1769 if self.always_show_hint and self.completion_hint: 1770 hint_printed = True 1771 sys.stdout.write('\n' + self.completion_hint) 1772 1773 # Check if we already have formatted results to print 1774 if self.formatted_completions: 1775 if not hint_printed: 1776 sys.stdout.write('\n') 1777 sys.stdout.write('\n' + self.formatted_completions + '\n\n') 1778 1779 # Otherwise use readline's formatter 1780 else: 1781 # Check if we should show display_matches 1782 if self.display_matches: 1783 matches_to_display = self.display_matches 1784 1785 # Recalculate longest_match_length for display_matches 1786 longest_match_length = 0 1787 1788 for cur_match in matches_to_display: 1789 cur_length = ansi.style_aware_wcswidth(cur_match) 1790 if cur_length > longest_match_length: 1791 longest_match_length = cur_length 1792 else: 1793 matches_to_display = matches 1794 1795 # Add padding for visual appeal 1796 matches_to_display, padding_length = self._pad_matches_to_display(matches_to_display) 1797 longest_match_length += padding_length 1798 1799 # We will use readline's display function (rl_display_match_list()), so we 1800 # need to encode our string as bytes to place in a C array. 1801 encoded_substitution = bytes(substitution, encoding='utf-8') 1802 encoded_matches = [bytes(cur_match, encoding='utf-8') for cur_match in matches_to_display] 1803 1804 # rl_display_match_list() expects matches to be in argv format where 1805 # substitution is the first element, followed by the matches, and then a NULL. 1806 # noinspection PyCallingNonCallable,PyTypeChecker 1807 strings_array = cast(List[Optional[bytes]], (ctypes.c_char_p * (1 + len(encoded_matches) + 1))()) 1808 1809 # Copy in the encoded strings and add a NULL to the end 1810 strings_array[0] = encoded_substitution 1811 strings_array[1:-1] = encoded_matches 1812 strings_array[-1] = None 1813 1814 # rl_display_match_list(strings_array, number of completion matches, longest match length) 1815 readline_lib.rl_display_match_list(strings_array, len(encoded_matches), longest_match_length) 1816 1817 # Redraw prompt and input line 1818 rl_force_redisplay() 1819 1820 def _display_matches_pyreadline(self, matches: List[str]) -> None: # pragma: no cover 1821 """Prints a match list using pyreadline's _display_completions() 1822 1823 :param matches: the tab completion matches to display 1824 """ 1825 if rl_type == RlType.PYREADLINE: 1826 1827 # Print hint if one exists and we are supposed to display it 1828 hint_printed = False 1829 if self.always_show_hint and self.completion_hint: 1830 hint_printed = True 1831 readline.rl.mode.console.write('\n' + self.completion_hint) 1832 1833 # Check if we already have formatted results to print 1834 if self.formatted_completions: 1835 if not hint_printed: 1836 readline.rl.mode.console.write('\n') 1837 readline.rl.mode.console.write('\n' + self.formatted_completions + '\n\n') 1838 1839 # Redraw the prompt and input lines 1840 rl_force_redisplay() 1841 1842 # Otherwise use pyreadline's formatter 1843 else: 1844 # Check if we should show display_matches 1845 if self.display_matches: 1846 matches_to_display = self.display_matches 1847 else: 1848 matches_to_display = matches 1849 1850 # Add padding for visual appeal 1851 matches_to_display, _ = self._pad_matches_to_display(matches_to_display) 1852 1853 # Display matches using actual display function. This also redraws the prompt and input lines. 1854 orig_pyreadline_display(matches_to_display) 1855 1856 @staticmethod 1857 def _determine_ap_completer_type(parser: argparse.ArgumentParser) -> Type[argparse_completer.ArgparseCompleter]: 1858 """ 1859 Determine what type of ArgparseCompleter to use on a given parser. If the parser does not have one 1860 set, then use argparse_completer.DEFAULT_AP_COMPLETER. 1861 1862 :param parser: the parser to examine 1863 :return: type of ArgparseCompleter 1864 """ 1865 completer_type: Optional[ 1866 Type[argparse_completer.ArgparseCompleter] 1867 ] = parser.get_ap_completer_type() # type: ignore[attr-defined] 1868 1869 if completer_type is None: 1870 completer_type = argparse_completer.DEFAULT_AP_COMPLETER 1871 return completer_type 1872 1873 def _perform_completion( 1874 self, text: str, line: str, begidx: int, endidx: int, custom_settings: Optional[utils.CustomCompletionSettings] = None 1875 ) -> None: 1876 """ 1877 Helper function for complete() that performs the actual completion 1878 1879 :param text: the string prefix we are attempting to match (all matches must begin with it) 1880 :param line: the current input line with leading whitespace removed 1881 :param begidx: the beginning index of the prefix text 1882 :param endidx: the ending index of the prefix text 1883 :param custom_settings: optional prepopulated completion settings 1884 """ 1885 # If custom_settings is None, then we are completing a command's argument. 1886 # Parse the command line to get the command token. 1887 command = '' 1888 if custom_settings is None: 1889 statement = self.statement_parser.parse_command_only(line) 1890 command = statement.command 1891 1892 # Malformed command line (e.g. quoted command token) 1893 if not command: 1894 return 1895 1896 expanded_line = statement.command_and_args 1897 1898 # We overwrote line with a properly formatted but fully stripped version 1899 # Restore the end spaces since line is only supposed to be lstripped when 1900 # passed to completer functions according to Python docs 1901 rstripped_len = len(line) - len(line.rstrip()) 1902 expanded_line += ' ' * rstripped_len 1903 1904 # Fix the index values if expanded_line has a different size than line 1905 if len(expanded_line) != len(line): 1906 diff = len(expanded_line) - len(line) 1907 begidx += diff 1908 endidx += diff 1909 1910 # Overwrite line to pass into completers 1911 line = expanded_line 1912 1913 # Get all tokens through the one being completed 1914 tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx) 1915 if not tokens: # pragma: no cover 1916 return 1917 1918 # Determine the completer function to use for the command's argument 1919 if custom_settings is None: 1920 # Check if a macro was entered 1921 if command in self.macros: 1922 completer_func = self.path_complete 1923 1924 # Check if a command was entered 1925 elif command in self.get_all_commands(): 1926 # Get the completer function for this command 1927 func_attr = getattr(self, constants.COMPLETER_FUNC_PREFIX + command, None) 1928 1929 if func_attr is not None: 1930 completer_func = func_attr 1931 else: 1932 # There's no completer function, next see if the command uses argparse 1933 func = self.cmd_func(command) 1934 argparser: Optional[argparse.ArgumentParser] = getattr(func, constants.CMD_ATTR_ARGPARSER, None) 1935 1936 if func is not None and argparser is not None: 1937 # Get arguments for complete() 1938 preserve_quotes = getattr(func, constants.CMD_ATTR_PRESERVE_QUOTES) 1939 cmd_set = self._cmd_to_command_sets[command] if command in self._cmd_to_command_sets else None 1940 1941 # Create the argparse completer 1942 completer_type = self._determine_ap_completer_type(argparser) 1943 completer = completer_type(argparser, self) 1944 1945 completer_func = functools.partial( 1946 completer.complete, tokens=raw_tokens[1:] if preserve_quotes else tokens[1:], cmd_set=cmd_set 1947 ) 1948 else: 1949 completer_func = self.completedefault # type: ignore[assignment] 1950 1951 # Not a recognized macro or command 1952 else: 1953 # Check if this command should be run as a shell command 1954 if self.default_to_shell and command in utils.get_exes_in_path(command): 1955 completer_func = self.path_complete 1956 else: 1957 completer_func = self.completedefault # type: ignore[assignment] 1958 1959 # Otherwise we are completing the command token or performing custom completion 1960 else: 1961 # Create the argparse completer 1962 completer_type = self._determine_ap_completer_type(custom_settings.parser) 1963 completer = completer_type(custom_settings.parser, self) 1964 1965 completer_func = functools.partial( 1966 completer.complete, tokens=raw_tokens if custom_settings.preserve_quotes else tokens, cmd_set=None 1967 ) 1968 1969 # Text we need to remove from completions later 1970 text_to_remove = '' 1971 1972 # Get the token being completed with any opening quote preserved 1973 raw_completion_token = raw_tokens[-1] 1974 1975 # Used for adding quotes to the completion token 1976 completion_token_quote = '' 1977 1978 # Check if the token being completed has an opening quote 1979 if raw_completion_token and raw_completion_token[0] in constants.QUOTES: 1980 1981 # Since the token is still being completed, we know the opening quote is unclosed. 1982 # Save the quote so we can add a matching closing quote later. 1983 completion_token_quote = raw_completion_token[0] 1984 1985 # readline still performs word breaks after a quote. Therefore something like quoted search 1986 # text with a space would have resulted in begidx pointing to the middle of the token we 1987 # we want to complete. Figure out where that token actually begins and save the beginning 1988 # portion of it that was not part of the text readline gave us. We will remove it from the 1989 # completions later since readline expects them to start with the original text. 1990 actual_begidx = line[:endidx].rfind(tokens[-1]) 1991 1992 if actual_begidx != begidx: 1993 text_to_remove = line[actual_begidx:begidx] 1994 1995 # Adjust text and where it begins so the completer routines 1996 # get unbroken search text to complete on. 1997 text = text_to_remove + text 1998 begidx = actual_begidx 1999 2000 # Attempt tab completion for redirection first, and if that isn't occurring, 2001 # call the completer function for the current command 2002 self.completion_matches = self._redirect_complete(text, line, begidx, endidx, completer_func) 2003 2004 if self.completion_matches: 2005 2006 # Eliminate duplicates 2007 self.completion_matches = utils.remove_duplicates(self.completion_matches) 2008 self.display_matches = utils.remove_duplicates(self.display_matches) 2009 2010 if not self.display_matches: 2011 # Since self.display_matches is empty, set it to self.completion_matches 2012 # before we alter them. That way the suggestions will reflect how we parsed 2013 # the token being completed and not how readline did. 2014 import copy 2015 2016 self.display_matches = copy.copy(self.completion_matches) 2017 2018 # Check if we need to add an opening quote 2019 if not completion_token_quote: 2020 2021 add_quote = False 2022 2023 # This is the tab completion text that will appear on the command line. 2024 common_prefix = os.path.commonprefix(self.completion_matches) 2025 2026 if self.matches_delimited: 2027 # Check if any portion of the display matches appears in the tab completion 2028 display_prefix = os.path.commonprefix(self.display_matches) 2029 2030 # For delimited matches, we check for a space in what appears before the display 2031 # matches (common_prefix) as well as in the display matches themselves. 2032 if ' ' in common_prefix or (display_prefix and any(' ' in match for match in self.display_matches)): 2033 add_quote = True 2034 2035 # If there is a tab completion and any match has a space, then add an opening quote 2036 elif common_prefix and any(' ' in match for match in self.completion_matches): 2037 add_quote = True 2038 2039 if add_quote: 2040 # Figure out what kind of quote to add and save it as the unclosed_quote 2041 if any('"' in match for match in self.completion_matches): 2042 completion_token_quote = "'" 2043 else: 2044 completion_token_quote = '"' 2045 2046 self.completion_matches = [completion_token_quote + match for match in self.completion_matches] 2047 2048 # Check if we need to remove text from the beginning of tab completions 2049 elif text_to_remove: 2050 self.completion_matches = [match.replace(text_to_remove, '', 1) for match in self.completion_matches] 2051 2052 # If we have one result, then add a closing quote if needed and allowed 2053 if len(self.completion_matches) == 1 and self.allow_closing_quote and completion_token_quote: 2054 self.completion_matches[0] += completion_token_quote 2055 2056 def complete( # type: ignore[override] 2057 self, text: str, state: int, custom_settings: Optional[utils.CustomCompletionSettings] = None 2058 ) -> Optional[str]: 2059 """Override of cmd's complete method which returns the next possible completion for 'text' 2060 2061 This completer function is called by readline as complete(text, state), for state in 0, 1, 2, …, 2062 until it returns a non-string value. It should return the next possible completion starting with text. 2063 2064 Since readline suppresses any exception raised in completer functions, they can be difficult to debug. 2065 Therefore this function wraps the actual tab completion logic and prints to stderr any exception that 2066 occurs before returning control to readline. 2067 2068 :param text: the current word that user is typing 2069 :param state: non-negative integer 2070 :param custom_settings: used when not tab completing the main command line 2071 :return: the next possible completion for text or None 2072 """ 2073 # noinspection PyBroadException 2074 try: 2075 if state == 0: 2076 self._reset_completion_defaults() 2077 2078 # Check if we are completing a multiline command 2079 if self._at_continuation_prompt: 2080 # lstrip and prepend the previously typed portion of this multiline command 2081 lstripped_previous = self._multiline_in_progress.lstrip().replace(constants.LINE_FEED, ' ') 2082 line = lstripped_previous + readline.get_line_buffer() 2083 2084 # Increment the indexes to account for the prepended text 2085 begidx = len(lstripped_previous) + readline.get_begidx() 2086 endidx = len(lstripped_previous) + readline.get_endidx() 2087 else: 2088 # lstrip the original line 2089 orig_line = readline.get_line_buffer() 2090 line = orig_line.lstrip() 2091 num_stripped = len(orig_line) - len(line) 2092 2093 # Calculate new indexes for the stripped line. If the cursor is at a position before the end of a 2094 # line of spaces, then the following math could result in negative indexes. Enforce a max of 0. 2095 begidx = max(readline.get_begidx() - num_stripped, 0) 2096 endidx = max(readline.get_endidx() - num_stripped, 0) 2097 2098 # Shortcuts are not word break characters when tab completing. Therefore shortcuts become part 2099 # of the text variable if there isn't a word break, like a space, after it. We need to remove it 2100 # from text and update the indexes. This only applies if we are at the beginning of the command line. 2101 shortcut_to_restore = '' 2102 if begidx == 0 and custom_settings is None: 2103 for (shortcut, _) in self.statement_parser.shortcuts: 2104 if text.startswith(shortcut): 2105 # Save the shortcut to restore later 2106 shortcut_to_restore = shortcut 2107 2108 # Adjust text and where it begins 2109 text = text[len(shortcut_to_restore) :] 2110 begidx += len(shortcut_to_restore) 2111 break 2112 else: 2113 # No shortcut was found. Complete the command token. 2114 parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(add_help=False) 2115 parser.add_argument( 2116 'command', 2117 metavar="COMMAND", 2118 help="command, alias, or macro name", 2119 choices=self._get_commands_aliases_and_macros_for_completion(), 2120 ) 2121 custom_settings = utils.CustomCompletionSettings(parser) 2122 2123 self._perform_completion(text, line, begidx, endidx, custom_settings) 2124 2125 # Check if we need to restore a shortcut in the tab completions 2126 # so it doesn't get erased from the command line 2127 if shortcut_to_restore: 2128 self.completion_matches = [shortcut_to_restore + match for match in self.completion_matches] 2129 2130 # If we have one result and we are at the end of the line, then add a space if allowed 2131 if len(self.completion_matches) == 1 and endidx == len(line) and self.allow_appended_space: 2132 self.completion_matches[0] += ' ' 2133 2134 # Sort matches if they haven't already been sorted 2135 if not self.matches_sorted: 2136 self.completion_matches.sort(key=self.default_sort_key) 2137 self.display_matches.sort(key=self.default_sort_key) 2138 self.matches_sorted = True 2139 2140 try: 2141 return self.completion_matches[state] 2142 except IndexError: 2143 return None 2144 2145 except CompletionError as ex: 2146 # Don't print error and redraw the prompt unless the error has length 2147 err_str = str(ex) 2148 if err_str: 2149 if ex.apply_style: 2150 err_str = ansi.style_error(err_str) 2151 ansi.style_aware_write(sys.stdout, '\n' + err_str + '\n') 2152 rl_force_redisplay() 2153 return None 2154 except Exception as ex: 2155 # Insert a newline so the exception doesn't print in the middle of the command line being tab completed 2156 self.perror() 2157 self.pexcept(ex) 2158 rl_force_redisplay() 2159 return None 2160 2161 def in_script(self) -> bool: 2162 """Return whether a text script is running""" 2163 return self._current_script_dir is not None 2164 2165 def in_pyscript(self) -> bool: 2166 """Return whether running inside a Python shell or pyscript""" 2167 return self._in_py 2168 2169 @property 2170 def aliases(self) -> Dict[str, str]: 2171 """Read-only property to access the aliases stored in the StatementParser""" 2172 return self.statement_parser.aliases 2173 2174 def get_names(self) -> List[str]: 2175 """Return an alphabetized list of names comprising the attributes of the cmd2 class instance.""" 2176 return dir(self) 2177 2178 def get_all_commands(self) -> List[str]: 2179 """Return a list of all commands""" 2180 return [ 2181 name[len(constants.COMMAND_FUNC_PREFIX) :] 2182 for name in self.get_names() 2183 if name.startswith(constants.COMMAND_FUNC_PREFIX) and callable(getattr(self, name)) 2184 ] 2185 2186 def get_visible_commands(self) -> List[str]: 2187 """Return a list of commands that have not been hidden or disabled""" 2188 return [ 2189 command 2190 for command in self.get_all_commands() 2191 if command not in self.hidden_commands and command not in self.disabled_commands 2192 ] 2193 2194 # Table displayed when tab completing aliases 2195 _alias_completion_table = SimpleTable([Column('Value', width=80)], divider_char=None) 2196 2197 def _get_alias_completion_items(self) -> List[CompletionItem]: 2198 """Return list of alias names and values as CompletionItems""" 2199 results: List[CompletionItem] = [] 2200 2201 for cur_key in self.aliases: 2202 row_data = [self.aliases[cur_key]] 2203 results.append(CompletionItem(cur_key, self._alias_completion_table.generate_data_row(row_data))) 2204 2205 return results 2206 2207 # Table displayed when tab completing macros 2208 _macro_completion_table = SimpleTable([Column('Value', width=80)], divider_char=None) 2209 2210 def _get_macro_completion_items(self) -> List[CompletionItem]: 2211 """Return list of macro names and values as CompletionItems""" 2212 results: List[CompletionItem] = [] 2213 2214 for cur_key in self.macros: 2215 row_data = [self.macros[cur_key].value] 2216 results.append(CompletionItem(cur_key, self._macro_completion_table.generate_data_row(row_data))) 2217 2218 return results 2219 2220 # Table displayed when tab completing Settables 2221 _settable_completion_table = SimpleTable([Column('Value', width=30), Column('Description', width=60)], divider_char=None) 2222 2223 def _get_settable_completion_items(self) -> List[CompletionItem]: 2224 """Return list of Settable names, values, and descriptions as CompletionItems""" 2225 results: List[CompletionItem] = [] 2226 2227 for cur_key in self.settables: 2228 row_data = [self.settables[cur_key].get_value(), self.settables[cur_key].description] 2229 results.append(CompletionItem(cur_key, self._settable_completion_table.generate_data_row(row_data))) 2230 2231 return results 2232 2233 def _get_commands_aliases_and_macros_for_completion(self) -> List[str]: 2234 """Return a list of visible commands, aliases, and macros for tab completion""" 2235 visible_commands = set(self.get_visible_commands()) 2236 alias_names = set(self.aliases) 2237 macro_names = set(self.macros) 2238 return list(visible_commands | alias_names | macro_names) 2239 2240 def get_help_topics(self) -> List[str]: 2241 """Return a list of help topics""" 2242 all_topics = [ 2243 name[len(constants.HELP_FUNC_PREFIX) :] 2244 for name in self.get_names() 2245 if name.startswith(constants.HELP_FUNC_PREFIX) and callable(getattr(self, name)) 2246 ] 2247 2248 # Filter out hidden and disabled commands 2249 return [topic for topic in all_topics if topic not in self.hidden_commands and topic not in self.disabled_commands] 2250 2251 # noinspection PyUnusedLocal 2252 def sigint_handler(self, signum: int, _: FrameType) -> None: 2253 """Signal handler for SIGINTs which typically come from Ctrl-C events. 2254 2255 If you need custom SIGINT behavior, then override this function. 2256 2257 :param signum: signal number 2258 :param _: required param for signal handlers 2259 """ 2260 if self._cur_pipe_proc_reader is not None: 2261 # Pass the SIGINT to the current pipe process 2262 self._cur_pipe_proc_reader.send_sigint() 2263 2264 # Check if we are allowed to re-raise the KeyboardInterrupt 2265 if not self.sigint_protection: 2266 self._raise_keyboard_interrupt() 2267 2268 def _raise_keyboard_interrupt(self) -> None: 2269 """Helper function to raise a KeyboardInterrupt""" 2270 raise KeyboardInterrupt("Got a keyboard interrupt") 2271 2272 def precmd(self, statement: Union[Statement, str]) -> Statement: 2273 """Hook method executed just before the command is executed by 2274 :meth:`~cmd2.Cmd.onecmd` and after adding it to history. 2275 2276 :param statement: subclass of str which also contains the parsed input 2277 :return: a potentially modified version of the input Statement object 2278 2279 See :meth:`~cmd2.Cmd.register_postparsing_hook` and 2280 :meth:`~cmd2.Cmd.register_precmd_hook` for more robust ways 2281 to run hooks before the command is executed. See 2282 :ref:`features/hooks:Postparsing Hooks` and 2283 :ref:`features/hooks:Precommand Hooks` for more information. 2284 """ 2285 return Statement(statement) if not isinstance(statement, Statement) else statement 2286 2287 def postcmd(self, stop: bool, statement: Union[Statement, str]) -> bool: 2288 """Hook method executed just after a command is executed by 2289 :meth:`~cmd2.Cmd.onecmd`. 2290 2291 :param stop: return `True` to request the command loop terminate 2292 :param statement: subclass of str which also contains the parsed input 2293 2294 See :meth:`~cmd2.Cmd.register_postcmd_hook` and :meth:`~cmd2.Cmd.register_cmdfinalization_hook` for more robust ways 2295 to run hooks after the command is executed. See 2296 :ref:`features/hooks:Postcommand Hooks` and 2297 :ref:`features/hooks:Command Finalization Hooks` for more information. 2298 """ 2299 return stop 2300 2301 def preloop(self) -> None: 2302 """Hook method executed once when the :meth:`~.cmd2.Cmd.cmdloop()` 2303 method is called. 2304 2305 See :meth:`~cmd2.Cmd.register_preloop_hook` for a more robust way 2306 to run hooks before the command loop begins. See 2307 :ref:`features/hooks:Application Lifecycle Hooks` for more information. 2308 """ 2309 pass 2310 2311 def postloop(self) -> None: 2312 """Hook method executed once when the :meth:`~.cmd2.Cmd.cmdloop()` 2313 method is about to return. 2314 2315 See :meth:`~cmd2.Cmd.register_postloop_hook` for a more robust way 2316 to run hooks after the command loop completes. See 2317 :ref:`features/hooks:Application Lifecycle Hooks` for more information. 2318 """ 2319 pass 2320 2321 def parseline(self, line: str) -> Tuple[str, str, str]: 2322 """Parse the line into a command name and a string containing the arguments. 2323 2324 NOTE: This is an override of a parent class method. It is only used by other parent class methods. 2325 2326 Different from the parent class method, this ignores self.identchars. 2327 2328 :param line: line read by readline 2329 :return: tuple containing (command, args, line) 2330 """ 2331 statement = self.statement_parser.parse_command_only(line) 2332 return statement.command, statement.args, statement.command_and_args 2333 2334 def onecmd_plus_hooks( 2335 self, line: str, *, add_to_history: bool = True, raise_keyboard_interrupt: bool = False, py_bridge_call: bool = False 2336 ) -> bool: 2337 """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks. 2338 2339 :param line: command line to run 2340 :param add_to_history: If True, then add this command to history. Defaults to True. 2341 :param raise_keyboard_interrupt: if True, then KeyboardInterrupt exceptions will be raised if stop isn't already 2342 True. This is used when running commands in a loop to be able to stop the whole 2343 loop and not just the current command. Defaults to False. 2344 :param py_bridge_call: This should only ever be set to True by PyBridge to signify the beginning 2345 of an app() call from Python. It is used to enable/disable the storage of the 2346 command's stdout. 2347 :return: True if running of commands should stop 2348 """ 2349 import datetime 2350 2351 stop = False 2352 statement = None 2353 2354 try: 2355 # Convert the line into a Statement 2356 statement = self._input_line_to_statement(line) 2357 2358 # call the postparsing hooks 2359 postparsing_data = plugin.PostparsingData(False, statement) 2360 for postparsing_func in self._postparsing_hooks: 2361 postparsing_data = postparsing_func(postparsing_data) 2362 if postparsing_data.stop: 2363 break 2364 2365 # unpack the postparsing_data object 2366 statement = postparsing_data.statement 2367 stop = postparsing_data.stop 2368 if stop: 2369 # we should not run the command, but 2370 # we need to run the finalization hooks 2371 raise EmptyStatement 2372 2373 redir_saved_state: Optional[utils.RedirectionSavedState] = None 2374 2375 try: 2376 # Get sigint protection while we set up redirection 2377 with self.sigint_protection: 2378 if py_bridge_call: 2379 # Start saving command's stdout at this point 2380 self.stdout.pause_storage = False # type: ignore[attr-defined] 2381 2382 redir_saved_state = self._redirect_output(statement) 2383 2384 timestart = datetime.datetime.now() 2385 2386 # precommand hooks 2387 precmd_data = plugin.PrecommandData(statement) 2388 for precmd_func in self._precmd_hooks: 2389 precmd_data = precmd_func(precmd_data) 2390 statement = precmd_data.statement 2391 2392 # call precmd() for compatibility with cmd.Cmd 2393 statement = self.precmd(statement) 2394 2395 # go run the command function 2396 stop = self.onecmd(statement, add_to_history=add_to_history) 2397 2398 # postcommand hooks 2399 postcmd_data = plugin.PostcommandData(stop, statement) 2400 for postcmd_func in self._postcmd_hooks: 2401 postcmd_data = postcmd_func(postcmd_data) 2402 2403 # retrieve the final value of stop, ignoring any statement modification from the hooks 2404 stop = postcmd_data.stop 2405 2406 # call postcmd() for compatibility with cmd.Cmd 2407 stop = self.postcmd(stop, statement) 2408 2409 if self.timing: 2410 self.pfeedback(f'Elapsed: {datetime.datetime.now() - timestart}') 2411 finally: 2412 # Get sigint protection while we restore stuff 2413 with self.sigint_protection: 2414 if redir_saved_state is not None: 2415 self._restore_output(statement, redir_saved_state) 2416 2417 if py_bridge_call: 2418 # Stop saving command's stdout before command finalization hooks run 2419 self.stdout.pause_storage = True # type: ignore[attr-defined] 2420 except (SkipPostcommandHooks, EmptyStatement): 2421 # Don't do anything, but do allow command finalization hooks to run 2422 pass 2423 except Cmd2ShlexError as ex: 2424 self.perror(f"Invalid syntax: {ex}") 2425 except RedirectionError as ex: 2426 self.perror(ex) 2427 except KeyboardInterrupt as ex: 2428 if raise_keyboard_interrupt and not stop: 2429 raise ex 2430 except SystemExit as ex: 2431 self.exit_code = ex.code 2432 stop = True 2433 except PassThroughException as ex: 2434 raise ex.wrapped_ex 2435 except Exception as ex: 2436 self.pexcept(ex) 2437 finally: 2438 try: 2439 stop = self._run_cmdfinalization_hooks(stop, statement) 2440 except KeyboardInterrupt as ex: 2441 if raise_keyboard_interrupt and not stop: 2442 raise ex 2443 except SystemExit as ex: 2444 self.exit_code = ex.code 2445 stop = True 2446 except PassThroughException as ex: 2447 raise ex.wrapped_ex 2448 except Exception as ex: 2449 self.pexcept(ex) 2450 2451 return stop 2452 2453 def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool: 2454 """Run the command finalization hooks""" 2455 with self.sigint_protection: 2456 if not sys.platform.startswith('win') and self.stdin.isatty(): 2457 # Before the next command runs, fix any terminal problems like those 2458 # caused by certain binary characters having been printed to it. 2459 import subprocess 2460 2461 proc = subprocess.Popen(['stty', 'sane']) 2462 proc.communicate() 2463 2464 data = plugin.CommandFinalizationData(stop, statement) 2465 for func in self._cmdfinalization_hooks: 2466 data = func(data) 2467 # retrieve the final value of stop, ignoring any 2468 # modifications to the statement 2469 return data.stop 2470 2471 def runcmds_plus_hooks( 2472 self, 2473 cmds: Union[List[HistoryItem], List[str]], 2474 *, 2475 add_to_history: bool = True, 2476 stop_on_keyboard_interrupt: bool = False, 2477 ) -> bool: 2478 """ 2479 Used when commands are being run in an automated fashion like text scripts or history replays. 2480 The prompt and command line for each command will be printed if echo is True. 2481 2482 :param cmds: commands to run 2483 :param add_to_history: If True, then add these commands to history. Defaults to True. 2484 :param stop_on_keyboard_interrupt: if True, then stop running contents of cmds if Ctrl-C is pressed instead of moving 2485 to the next command in the list. This is used when the commands are part of a 2486 group, like a text script, which should stop upon Ctrl-C. Defaults to False. 2487 :return: True if running of commands should stop 2488 """ 2489 for line in cmds: 2490 if isinstance(line, HistoryItem): 2491 line = line.raw 2492 2493 if self.echo: 2494 self.poutput(f'{self.prompt}{line}') 2495 2496 try: 2497 if self.onecmd_plus_hooks( 2498 line, add_to_history=add_to_history, raise_keyboard_interrupt=stop_on_keyboard_interrupt 2499 ): 2500 return True 2501 except KeyboardInterrupt as ex: 2502 if stop_on_keyboard_interrupt: 2503 self.perror(ex) 2504 break 2505 2506 return False 2507 2508 def _complete_statement(self, line: str) -> Statement: 2509 """Keep accepting lines of input until the command is complete. 2510 2511 There is some pretty hacky code here to handle some quirks of 2512 self._read_command_line(). It returns a literal 'eof' if the input 2513 pipe runs out. We can't refactor it because we need to retain 2514 backwards compatibility with the standard library version of cmd. 2515 2516 :param line: the line being parsed 2517 :return: the completed Statement 2518 :raises: Cmd2ShlexError if a shlex error occurs (e.g. No closing quotation) 2519 :raises: EmptyStatement when the resulting Statement is blank 2520 """ 2521 while True: 2522 try: 2523 statement = self.statement_parser.parse(line) 2524 if statement.multiline_command and statement.terminator: 2525 # we have a completed multiline command, we are done 2526 break 2527 if not statement.multiline_command: 2528 # it's not a multiline command, but we parsed it ok 2529 # so we are done 2530 break 2531 except Cmd2ShlexError: 2532 # we have unclosed quotation marks, lets parse only the command 2533 # and see if it's a multiline 2534 statement = self.statement_parser.parse_command_only(line) 2535 if not statement.multiline_command: 2536 # not a multiline command, so raise the exception 2537 raise 2538 2539 # if we get here we must have: 2540 # - a multiline command with no terminator 2541 # - a multiline command with unclosed quotation marks 2542 try: 2543 self._at_continuation_prompt = True 2544 2545 # Save the command line up to this point for tab completion 2546 self._multiline_in_progress = line + '\n' 2547 2548 nextline = self._read_command_line(self.continuation_prompt) 2549 if nextline == 'eof': 2550 # they entered either a blank line, or we hit an EOF 2551 # for some other reason. Turn the literal 'eof' 2552 # into a blank line, which serves as a command 2553 # terminator 2554 nextline = '\n' 2555 self.poutput(nextline) 2556 line = f'{self._multiline_in_progress}{nextline}' 2557 except KeyboardInterrupt: 2558 self.poutput('^C') 2559 statement = self.statement_parser.parse('') 2560 break 2561 finally: 2562 self._at_continuation_prompt = False 2563 2564 if not statement.command: 2565 raise EmptyStatement 2566 return statement 2567 2568 def _input_line_to_statement(self, line: str) -> Statement: 2569 """ 2570 Parse the user's input line and convert it to a Statement, ensuring that all macros are also resolved 2571 2572 :param line: the line being parsed 2573 :return: parsed command line as a Statement 2574 :raises: Cmd2ShlexError if a shlex error occurs (e.g. No closing quotation) 2575 :raises: EmptyStatement when the resulting Statement is blank 2576 """ 2577 used_macros = [] 2578 orig_line = None 2579 2580 # Continue until all macros are resolved 2581 while True: 2582 # Make sure all input has been read and convert it to a Statement 2583 statement = self._complete_statement(line) 2584 2585 # Save the fully entered line if this is the first loop iteration 2586 if orig_line is None: 2587 orig_line = statement.raw 2588 2589 # Check if this command matches a macro and wasn't already processed to avoid an infinite loop 2590 if statement.command in self.macros.keys() and statement.command not in used_macros: 2591 used_macros.append(statement.command) 2592 resolve_result = self._resolve_macro(statement) 2593 if resolve_result is None: 2594 raise EmptyStatement 2595 line = resolve_result 2596 else: 2597 break 2598 2599 # This will be true when a macro was used 2600 if orig_line != statement.raw: 2601 # Build a Statement that contains the resolved macro line 2602 # but the originally typed line for its raw member. 2603 statement = Statement( 2604 statement.args, 2605 raw=orig_line, 2606 command=statement.command, 2607 arg_list=statement.arg_list, 2608 multiline_command=statement.multiline_command, 2609 terminator=statement.terminator, 2610 suffix=statement.suffix, 2611 pipe_to=statement.pipe_to, 2612 output=statement.output, 2613 output_to=statement.output_to, 2614 ) 2615 return statement 2616 2617 def _resolve_macro(self, statement: Statement) -> Optional[str]: 2618 """ 2619 Resolve a macro and return the resulting string 2620 2621 :param statement: the parsed statement from the command line 2622 :return: the resolved macro or None on error 2623 """ 2624 if statement.command not in self.macros.keys(): 2625 raise KeyError(f"{statement.command} is not a macro") 2626 2627 macro = self.macros[statement.command] 2628 2629 # Make sure enough arguments were passed in 2630 if len(statement.arg_list) < macro.minimum_arg_count: 2631 plural = '' if macro.minimum_arg_count == 1 else 's' 2632 self.perror(f"The macro '{statement.command}' expects at least {macro.minimum_arg_count} argument{plural}") 2633 return None 2634 2635 # Resolve the arguments in reverse and read their values from statement.argv since those 2636 # are unquoted. Macro args should have been quoted when the macro was created. 2637 resolved = macro.value 2638 reverse_arg_list = sorted(macro.arg_list, key=lambda ma: ma.start_index, reverse=True) 2639 2640 for macro_arg in reverse_arg_list: 2641 if macro_arg.is_escaped: 2642 to_replace = '{{' + macro_arg.number_str + '}}' 2643 replacement = '{' + macro_arg.number_str + '}' 2644 else: 2645 to_replace = '{' + macro_arg.number_str + '}' 2646 replacement = statement.argv[int(macro_arg.number_str)] 2647 2648 parts = resolved.rsplit(to_replace, maxsplit=1) 2649 resolved = parts[0] + replacement + parts[1] 2650 2651 # Append extra arguments and use statement.arg_list since these arguments need their quotes preserved 2652 for stmt_arg in statement.arg_list[macro.minimum_arg_count :]: 2653 resolved += ' ' + stmt_arg 2654 2655 # Restore any terminator, suffix, redirection, etc. 2656 return resolved + statement.post_command 2657 2658 def _redirect_output(self, statement: Statement) -> utils.RedirectionSavedState: 2659 """Set up a command's output redirection for >, >>, and |. 2660 2661 :param statement: a parsed statement from the user 2662 :return: A bool telling if an error occurred and a utils.RedirectionSavedState object 2663 :raises: RedirectionError if an error occurs trying to pipe or redirect 2664 """ 2665 import io 2666 import subprocess 2667 2668 # Initialize the redirection saved state 2669 redir_saved_state = utils.RedirectionSavedState( 2670 cast(TextIO, self.stdout), sys.stdout, self._cur_pipe_proc_reader, self._redirecting 2671 ) 2672 2673 # The ProcReader for this command 2674 cmd_pipe_proc_reader: Optional[utils.ProcReader] = None 2675 2676 if not self.allow_redirection: 2677 # Don't return since we set some state variables at the end of the function 2678 pass 2679 2680 elif statement.pipe_to: 2681 # Create a pipe with read and write sides 2682 read_fd, write_fd = os.pipe() 2683 2684 # Open each side of the pipe 2685 subproc_stdin = io.open(read_fd, 'r') 2686 new_stdout: TextIO = cast(TextIO, io.open(write_fd, 'w')) 2687 2688 # Create pipe process in a separate group to isolate our signals from it. If a Ctrl-C event occurs, 2689 # our sigint handler will forward it only to the most recent pipe process. This makes sure pipe 2690 # processes close in the right order (most recent first). 2691 kwargs: Dict[str, Any] = dict() 2692 if sys.platform == 'win32': 2693 kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP 2694 else: 2695 kwargs['start_new_session'] = True 2696 2697 # Attempt to run the pipe process in the user's preferred shell instead of the default behavior of using sh. 2698 shell = os.environ.get("SHELL") 2699 if shell: 2700 kwargs['executable'] = shell 2701 2702 # For any stream that is a StdSim, we will use a pipe so we can capture its output 2703 proc = subprocess.Popen( # type: ignore[call-overload] 2704 statement.pipe_to, 2705 stdin=subproc_stdin, 2706 stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout, # type: ignore[unreachable] 2707 stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr, # type: ignore[unreachable] 2708 shell=True, 2709 **kwargs, 2710 ) 2711 2712 # Popen was called with shell=True so the user can chain pipe commands and redirect their output 2713 # like: !ls -l | grep user | wc -l > out.txt. But this makes it difficult to know if the pipe process 2714 # started OK, since the shell itself always starts. Therefore, we will wait a short time and check 2715 # if the pipe process is still running. 2716 try: 2717 proc.wait(0.2) 2718 except subprocess.TimeoutExpired: 2719 pass 2720 2721 # Check if the pipe process already exited 2722 if proc.returncode is not None: 2723 subproc_stdin.close() 2724 new_stdout.close() 2725 raise RedirectionError(f'Pipe process exited with code {proc.returncode} before command could run') 2726 else: 2727 redir_saved_state.redirecting = True # type: ignore[unreachable] 2728 cmd_pipe_proc_reader = utils.ProcReader(proc, cast(TextIO, self.stdout), sys.stderr) 2729 sys.stdout = self.stdout = new_stdout 2730 2731 elif statement.output: 2732 import tempfile 2733 2734 if (not statement.output_to) and (not self._can_clip): 2735 raise RedirectionError("Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies") 2736 2737 # Redirecting to a file 2738 elif statement.output_to: 2739 # statement.output can only contain REDIRECTION_APPEND or REDIRECTION_OUTPUT 2740 mode = 'a' if statement.output == constants.REDIRECTION_APPEND else 'w' 2741 try: 2742 # Use line buffering 2743 new_stdout = cast(TextIO, open(utils.strip_quotes(statement.output_to), mode=mode, buffering=1)) 2744 except OSError as ex: 2745 raise RedirectionError(f'Failed to redirect because: {ex}') 2746 2747 redir_saved_state.redirecting = True 2748 sys.stdout = self.stdout = new_stdout 2749 2750 # Redirecting to a paste buffer 2751 else: 2752 new_stdout = cast(TextIO, tempfile.TemporaryFile(mode="w+")) 2753 redir_saved_state.redirecting = True 2754 sys.stdout = self.stdout = new_stdout 2755 2756 if statement.output == constants.REDIRECTION_APPEND: 2757 self.stdout.write(get_paste_buffer()) 2758 self.stdout.flush() 2759 2760 # These are updated regardless of whether the command redirected 2761 self._cur_pipe_proc_reader = cmd_pipe_proc_reader 2762 self._redirecting = redir_saved_state.redirecting 2763 2764 return redir_saved_state 2765 2766 def _restore_output(self, statement: Statement, saved_redir_state: utils.RedirectionSavedState) -> None: 2767 """Handles restoring state after output redirection 2768 2769 :param statement: Statement object which contains the parsed input from the user 2770 :param saved_redir_state: contains information needed to restore state data 2771 """ 2772 if saved_redir_state.redirecting: 2773 # If we redirected output to the clipboard 2774 if statement.output and not statement.output_to: 2775 self.stdout.seek(0) 2776 write_to_paste_buffer(self.stdout.read()) 2777 2778 try: 2779 # Close the file or pipe that stdout was redirected to 2780 self.stdout.close() 2781 except BrokenPipeError: 2782 pass 2783 2784 # Restore the stdout values 2785 self.stdout = cast(TextIO, saved_redir_state.saved_self_stdout) 2786 sys.stdout = cast(TextIO, saved_redir_state.saved_sys_stdout) 2787 2788 # Check if we need to wait for the process being piped to 2789 if self._cur_pipe_proc_reader is not None: 2790 self._cur_pipe_proc_reader.wait() 2791 2792 # These are restored regardless of whether the command redirected 2793 self._cur_pipe_proc_reader = saved_redir_state.saved_pipe_proc_reader 2794 self._redirecting = saved_redir_state.saved_redirecting 2795 2796 def cmd_func(self, command: str) -> Optional[CommandFunc]: 2797 """ 2798 Get the function for a command 2799 2800 :param command: the name of the command 2801 2802 :Example: 2803 2804 >>> helpfunc = self.cmd_func('help') 2805 2806 helpfunc now contains a reference to the ``do_help`` method 2807 """ 2808 func_name = self._cmd_func_name(command) 2809 if func_name: 2810 return cast(Optional[CommandFunc], getattr(self, func_name)) 2811 return None 2812 2813 def _cmd_func_name(self, command: str) -> str: 2814 """Get the method name associated with a given command. 2815 2816 :param command: command to look up method name which implements it 2817 :return: method name which implements the given command 2818 """ 2819 target = constants.COMMAND_FUNC_PREFIX + command 2820 return target if callable(getattr(self, target, None)) else '' 2821 2822 # noinspection PyMethodOverriding 2823 def onecmd(self, statement: Union[Statement, str], *, add_to_history: bool = True) -> bool: 2824 """This executes the actual do_* method for a command. 2825 2826 If the command provided doesn't exist, then it executes default() instead. 2827 2828 :param statement: intended to be a Statement instance parsed command from the input stream, alternative 2829 acceptance of a str is present only for backward compatibility with cmd 2830 :param add_to_history: If True, then add this command to history. Defaults to True. 2831 :return: a flag indicating whether the interpretation of commands should stop 2832 """ 2833 # For backwards compatibility with cmd, allow a str to be passed in 2834 if not isinstance(statement, Statement): 2835 statement = self._input_line_to_statement(statement) 2836 2837 func = self.cmd_func(statement.command) 2838 if func: 2839 # Check to see if this command should be stored in history 2840 if ( 2841 statement.command not in self.exclude_from_history 2842 and statement.command not in self.disabled_commands 2843 and add_to_history 2844 ): 2845 self.history.append(statement) 2846 2847 stop = func(statement) 2848 2849 else: 2850 stop = self.default(statement) 2851 2852 return stop if stop is not None else False 2853 2854 def default(self, statement: Statement) -> Optional[bool]: # type: ignore[override] 2855 """Executed when the command given isn't a recognized command implemented by a do_* method. 2856 2857 :param statement: Statement object with parsed input 2858 """ 2859 if self.default_to_shell: 2860 if 'shell' not in self.exclude_from_history: 2861 self.history.append(statement) 2862 2863 # noinspection PyTypeChecker 2864 return self.do_shell(statement.command_and_args) 2865 else: 2866 err_msg = self.default_error.format(statement.command) 2867 2868 # Set apply_style to False so default_error's style is not overridden 2869 self.perror(err_msg, apply_style=False) 2870 return None 2871 2872 def read_input( 2873 self, 2874 prompt: str, 2875 *, 2876 history: Optional[List[str]] = None, 2877 completion_mode: utils.CompletionMode = utils.CompletionMode.NONE, 2878 preserve_quotes: bool = False, 2879 choices: Optional[Iterable[Any]] = None, 2880 choices_provider: Optional[ChoicesProviderFunc] = None, 2881 completer: Optional[CompleterFunc] = None, 2882 parser: Optional[argparse.ArgumentParser] = None, 2883 ) -> str: 2884 """ 2885 Read input from appropriate stdin value. Also supports tab completion and up-arrow history while 2886 input is being entered. 2887 2888 :param prompt: prompt to display to user 2889 :param history: optional list of strings to use for up-arrow history. If completion_mode is 2890 CompletionMode.COMMANDS and this is None, then cmd2's command list history will 2891 be used. The passed in history will not be edited. It is the caller's responsibility 2892 to add the returned input to history if desired. Defaults to None. 2893 :param completion_mode: tells what type of tab completion to support. Tab completion only works when 2894 self.use_rawinput is True and sys.stdin is a terminal. Defaults to 2895 CompletionMode.NONE. 2896 2897 The following optional settings apply when completion_mode is CompletionMode.CUSTOM: 2898 2899 :param preserve_quotes: if True, then quoted tokens will keep their quotes when processed by 2900 ArgparseCompleter. This is helpful in cases when you're tab completing 2901 flag-like tokens (e.g. -o, --option) and you don't want them to be 2902 treated as argparse flags when quoted. Set this to True if you plan 2903 on passing the string to argparse with the tokens still quoted. 2904 2905 A maximum of one of these should be provided: 2906 2907 :param choices: iterable of accepted values for single argument 2908 :param choices_provider: function that provides choices for single argument 2909 :param completer: tab completion function that provides choices for single argument 2910 :param parser: an argument parser which supports the tab completion of multiple arguments 2911 2912 :return: the line read from stdin with all trailing new lines removed 2913 :raises: any exceptions raised by input() and stdin.readline() 2914 """ 2915 readline_configured = False 2916 saved_completer: Optional[CompleterFunc] = None 2917 saved_history: Optional[List[str]] = None 2918 2919 def configure_readline() -> None: 2920 """Configure readline tab completion and history""" 2921 nonlocal readline_configured 2922 nonlocal saved_completer 2923 nonlocal saved_history 2924 nonlocal parser 2925 2926 if readline_configured: # pragma: no cover 2927 return 2928 2929 # Configure tab completion 2930 if self._completion_supported(): 2931 saved_completer = readline.get_completer() 2932 2933 # Disable completion 2934 if completion_mode == utils.CompletionMode.NONE: 2935 # noinspection PyUnusedLocal 2936 def complete_none(text: str, state: int) -> Optional[str]: # pragma: no cover 2937 return None 2938 2939 complete_func = complete_none 2940 2941 # Complete commands 2942 elif completion_mode == utils.CompletionMode.COMMANDS: 2943 complete_func = self.complete 2944 2945 # Set custom completion settings 2946 else: 2947 if parser is None: 2948 parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(add_help=False) 2949 parser.add_argument( 2950 'arg', 2951 suppress_tab_hint=True, 2952 choices=choices, # type: ignore[arg-type] 2953 choices_provider=choices_provider, 2954 completer=completer, 2955 ) 2956 2957 custom_settings = utils.CustomCompletionSettings(parser, preserve_quotes=preserve_quotes) 2958 complete_func = functools.partial(self.complete, custom_settings=custom_settings) 2959 2960 readline.set_completer(complete_func) 2961 2962 # Overwrite history if not completing commands or new history was provided 2963 if completion_mode != utils.CompletionMode.COMMANDS or history is not None: 2964 saved_history = [] 2965 for i in range(1, readline.get_current_history_length() + 1): 2966 # noinspection PyArgumentList 2967 saved_history.append(readline.get_history_item(i)) 2968 2969 readline.clear_history() 2970 if history is not None: 2971 for item in history: 2972 readline.add_history(item) 2973 2974 readline_configured = True 2975 2976 def restore_readline() -> None: 2977 """Restore readline tab completion and history""" 2978 nonlocal readline_configured 2979 if not readline_configured: # pragma: no cover 2980 return 2981 2982 if self._completion_supported(): 2983 readline.set_completer(saved_completer) 2984 2985 if saved_history is not None: 2986 readline.clear_history() 2987 for item in saved_history: 2988 readline.add_history(item) 2989 2990 readline_configured = False 2991 2992 # Check we are reading from sys.stdin 2993 if self.use_rawinput: 2994 if sys.stdin.isatty(): 2995 try: 2996 # Deal with the vagaries of readline and ANSI escape codes 2997 escaped_prompt = rl_escape_prompt(prompt) 2998 2999 with self.sigint_protection: 3000 configure_readline() 3001 line = input(escaped_prompt) 3002 finally: 3003 with self.sigint_protection: 3004 restore_readline() 3005 else: 3006 line = input() 3007 if self.echo: 3008 sys.stdout.write(f'{prompt}{line}\n') 3009 3010 # Otherwise read from self.stdin 3011 else: 3012 if self.stdin.isatty(): 3013 # on a tty, print the prompt first, then read the line 3014 self.poutput(prompt, end='') 3015 self.stdout.flush() 3016 line = self.stdin.readline() 3017 if len(line) == 0: 3018 line = 'eof' 3019 else: 3020 # we are reading from a pipe, read the line to see if there is 3021 # anything there, if so, then decide whether to print the 3022 # prompt or not 3023 line = self.stdin.readline() 3024 if len(line): 3025 # we read something, output the prompt and the something 3026 if self.echo: 3027 self.poutput(f'{prompt}{line}') 3028 else: 3029 line = 'eof' 3030 3031 return line.rstrip('\r\n') 3032 3033 def _read_command_line(self, prompt: str) -> str: 3034 """ 3035 Read command line from appropriate stdin 3036 3037 :param prompt: prompt to display to user 3038 :return: command line text of 'eof' if an EOFError was caught 3039 :raises: whatever exceptions are raised by input() except for EOFError 3040 """ 3041 try: 3042 # Wrap in try since terminal_lock may not be locked 3043 try: 3044 # Command line is about to be drawn. Allow asynchronous changes to the terminal. 3045 self.terminal_lock.release() 3046 except RuntimeError: 3047 pass 3048 return self.read_input(prompt, completion_mode=utils.CompletionMode.COMMANDS) 3049 except EOFError: 3050 return 'eof' 3051 finally: 3052 # Command line is gone. Do not allow asynchronous changes to the terminal. 3053 self.terminal_lock.acquire() 3054 3055 def _set_up_cmd2_readline(self) -> _SavedReadlineSettings: 3056 """ 3057 Called at beginning of command loop to set up readline with cmd2-specific settings 3058 3059 :return: Class containing saved readline settings 3060 """ 3061 readline_settings = _SavedReadlineSettings() 3062 3063 if self._completion_supported(): 3064 3065 # Set up readline for our tab completion needs 3066 if rl_type == RlType.GNU: 3067 # Set GNU readline's rl_basic_quote_characters to NULL so it won't automatically add a closing quote 3068 # We don't need to worry about setting rl_completion_suppress_quote since we never declared 3069 # rl_completer_quote_characters. 3070 readline_settings.basic_quotes = cast(bytes, ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value) 3071 rl_basic_quote_characters.value = None 3072 3073 readline_settings.completer = readline.get_completer() 3074 readline.set_completer(self.complete) 3075 3076 # Set the readline word delimiters for completion 3077 completer_delims = " \t\n" 3078 completer_delims += ''.join(constants.QUOTES) 3079 completer_delims += ''.join(constants.REDIRECTION_CHARS) 3080 completer_delims += ''.join(self.statement_parser.terminators) 3081 3082 readline_settings.delims = readline.get_completer_delims() 3083 readline.set_completer_delims(completer_delims) 3084 3085 # Enable tab completion 3086 readline.parse_and_bind(self.completekey + ": complete") 3087 3088 return readline_settings 3089 3090 def _restore_readline(self, readline_settings: _SavedReadlineSettings) -> None: 3091 """ 3092 Called at end of command loop to restore saved readline settings 3093 3094 :param readline_settings: the readline settings to restore 3095 """ 3096 if self._completion_supported(): 3097 3098 # Restore what we changed in readline 3099 readline.set_completer(readline_settings.completer) 3100 readline.set_completer_delims(readline_settings.delims) 3101 3102 if rl_type == RlType.GNU: 3103 readline.set_completion_display_matches_hook(None) 3104 rl_basic_quote_characters.value = readline_settings.basic_quotes 3105 elif rl_type == RlType.PYREADLINE: 3106 # noinspection PyUnresolvedReferences 3107 readline.rl.mode._display_completions = orig_pyreadline_display 3108 3109 def _cmdloop(self) -> None: 3110 """Repeatedly issue a prompt, accept input, parse an initial prefix 3111 off the received input, and dispatch to action methods, passing them 3112 the remainder of the line as argument. 3113 3114 This serves the same role as cmd.cmdloop(). 3115 """ 3116 saved_readline_settings = None 3117 3118 try: 3119 # Get sigint protection while we set up readline for cmd2 3120 with self.sigint_protection: 3121 saved_readline_settings = self._set_up_cmd2_readline() 3122 3123 # Run startup commands 3124 stop = self.runcmds_plus_hooks(self._startup_commands) 3125 self._startup_commands.clear() 3126 3127 while not stop: 3128 # Get commands from user 3129 try: 3130 line = self._read_command_line(self.prompt) 3131 except KeyboardInterrupt: 3132 self.poutput('^C') 3133 line = '' 3134 3135 # Run the command along with all associated pre and post hooks 3136 stop = self.onecmd_plus_hooks(line) 3137 finally: 3138 # Get sigint protection while we restore readline settings 3139 with self.sigint_protection: 3140 if saved_readline_settings is not None: 3141 self._restore_readline(saved_readline_settings) 3142 3143 ############################################################# 3144 # Parsers and functions for alias command and subcommands 3145 ############################################################# 3146 3147 # Top-level parser for alias 3148 alias_description = "Manage aliases\n" "\n" "An alias is a command that enables replacement of a word by another string." 3149 alias_epilog = "See also:\n" " macro" 3150 alias_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=alias_description, epilog=alias_epilog) 3151 alias_subparsers = alias_parser.add_subparsers(dest='subcommand', metavar='SUBCOMMAND') 3152 alias_subparsers.required = True 3153 3154 # Preserve quotes since we are passing strings to other commands 3155 @with_argparser(alias_parser, preserve_quotes=True) 3156 def do_alias(self, args: argparse.Namespace) -> None: 3157 """Manage aliases""" 3158 # Call handler for whatever subcommand was selected 3159 handler = args.cmd2_handler.get() 3160 handler(args) 3161 3162 # alias -> create 3163 alias_create_description = "Create or overwrite an alias" 3164 3165 alias_create_epilog = ( 3166 "Notes:\n" 3167 " If you want to use redirection, pipes, or terminators in the value of the\n" 3168 " alias, then quote them.\n" 3169 "\n" 3170 " Since aliases are resolved during parsing, tab completion will function as\n" 3171 " it would for the actual command the alias resolves to.\n" 3172 "\n" 3173 "Examples:\n" 3174 " alias create ls !ls -lF\n" 3175 " alias create show_log !cat \"log file.txt\"\n" 3176 " alias create save_results print_results \">\" out.txt\n" 3177 ) 3178 3179 alias_create_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER( 3180 description=alias_create_description, epilog=alias_create_epilog 3181 ) 3182 alias_create_parser.add_argument('name', help='name of this alias') 3183 alias_create_parser.add_argument( 3184 'command', help='what the alias resolves to', choices_provider=_get_commands_aliases_and_macros_for_completion 3185 ) 3186 alias_create_parser.add_argument( 3187 'command_args', nargs=argparse.REMAINDER, help='arguments to pass to command', completer=path_complete 3188 ) 3189 3190 @as_subcommand_to('alias', 'create', alias_create_parser, help=alias_create_description.lower()) 3191 def _alias_create(self, args: argparse.Namespace) -> None: 3192 """Create or overwrite an alias""" 3193 self.last_result = False 3194 3195 # Validate the alias name 3196 valid, errmsg = self.statement_parser.is_valid_command(args.name) 3197 if not valid: 3198 self.perror(f"Invalid alias name: {errmsg}") 3199 return 3200 3201 if args.name in self.get_all_commands(): 3202 self.perror("Alias cannot have the same name as a command") 3203 return 3204 3205 if args.name in self.macros: 3206 self.perror("Alias cannot have the same name as a macro") 3207 return 3208 3209 # Unquote redirection and terminator tokens 3210 tokens_to_unquote = constants.REDIRECTION_TOKENS 3211 tokens_to_unquote.extend(self.statement_parser.terminators) 3212 utils.unquote_specific_tokens(args.command_args, tokens_to_unquote) 3213 3214 # Build the alias value string 3215 value = args.command 3216 if args.command_args: 3217 value += ' ' + ' '.join(args.command_args) 3218 3219 # Set the alias 3220 result = "overwritten" if args.name in self.aliases else "created" 3221 self.poutput(f"Alias '{args.name}' {result}") 3222 3223 self.aliases[args.name] = value 3224 self.last_result = True 3225 3226 # alias -> delete 3227 alias_delete_help = "delete aliases" 3228 alias_delete_description = "Delete specified aliases or all aliases if --all is used" 3229 3230 alias_delete_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=alias_delete_description) 3231 alias_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all aliases") 3232 alias_delete_parser.add_argument( 3233 'names', 3234 nargs=argparse.ZERO_OR_MORE, 3235 help='alias(es) to delete', 3236 choices_provider=_get_alias_completion_items, 3237 descriptive_header=_alias_completion_table.generate_header(), 3238 ) 3239 3240 @as_subcommand_to('alias', 'delete', alias_delete_parser, help=alias_delete_help) 3241 def _alias_delete(self, args: argparse.Namespace) -> None: 3242 """Delete aliases""" 3243 self.last_result = True 3244 3245 if args.all: 3246 self.aliases.clear() 3247 self.poutput("All aliases deleted") 3248 elif not args.names: 3249 self.perror("Either --all or alias name(s) must be specified") 3250 self.last_result = False 3251 else: 3252 for cur_name in utils.remove_duplicates(args.names): 3253 if cur_name in self.aliases: 3254 del self.aliases[cur_name] 3255 self.poutput(f"Alias '{cur_name}' deleted") 3256 else: 3257 self.perror(f"Alias '{cur_name}' does not exist") 3258 3259 # alias -> list 3260 alias_list_help = "list aliases" 3261 alias_list_description = ( 3262 "List specified aliases in a reusable form that can be saved to a startup\n" 3263 "script to preserve aliases across sessions\n" 3264 "\n" 3265 "Without arguments, all aliases will be listed." 3266 ) 3267 3268 alias_list_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=alias_list_description) 3269 alias_list_parser.add_argument( 3270 'names', 3271 nargs=argparse.ZERO_OR_MORE, 3272 help='alias(es) to list', 3273 choices_provider=_get_alias_completion_items, 3274 descriptive_header=_alias_completion_table.generate_header(), 3275 ) 3276 3277 @as_subcommand_to('alias', 'list', alias_list_parser, help=alias_list_help) 3278 def _alias_list(self, args: argparse.Namespace) -> None: 3279 """List some or all aliases as 'alias create' commands""" 3280 self.last_result = {} # Dict[alias_name, alias_value] 3281 3282 tokens_to_quote = constants.REDIRECTION_TOKENS 3283 tokens_to_quote.extend(self.statement_parser.terminators) 3284 3285 if args.names: 3286 to_list = utils.remove_duplicates(args.names) 3287 else: 3288 to_list = sorted(self.aliases, key=self.default_sort_key) 3289 3290 not_found: List[str] = [] 3291 for name in to_list: 3292 if name not in self.aliases: 3293 not_found.append(name) 3294 continue 3295 3296 # Quote redirection and terminator tokens for the 'alias create' command 3297 tokens = shlex_split(self.aliases[name]) 3298 command = tokens[0] 3299 command_args = tokens[1:] 3300 utils.quote_specific_tokens(command_args, tokens_to_quote) 3301 3302 val = command 3303 if command_args: 3304 val += ' ' + ' '.join(command_args) 3305 3306 self.poutput(f"alias create {name} {val}") 3307 self.last_result[name] = val 3308 3309 for name in not_found: 3310 self.perror(f"Alias '{name}' not found") 3311 3312 ############################################################# 3313 # Parsers and functions for macro command and subcommands 3314 ############################################################# 3315 3316 # Top-level parser for macro 3317 macro_description = "Manage macros\n" "\n" "A macro is similar to an alias, but it can contain argument placeholders." 3318 macro_epilog = "See also:\n" " alias" 3319 macro_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=macro_description, epilog=macro_epilog) 3320 macro_subparsers = macro_parser.add_subparsers(dest='subcommand', metavar='SUBCOMMAND') 3321 macro_subparsers.required = True 3322 3323 # Preserve quotes since we are passing strings to other commands 3324 @with_argparser(macro_parser, preserve_quotes=True) 3325 def do_macro(self, args: argparse.Namespace) -> None: 3326 """Manage macros""" 3327 # Call handler for whatever subcommand was selected 3328 handler = args.cmd2_handler.get() 3329 handler(args) 3330 3331 # macro -> create 3332 macro_create_help = "create or overwrite a macro" 3333 macro_create_description = "Create or overwrite a macro" 3334 3335 macro_create_epilog = ( 3336 "A macro is similar to an alias, but it can contain argument placeholders.\n" 3337 "Arguments are expressed when creating a macro using {#} notation where {1}\n" 3338 "means the first argument.\n" 3339 "\n" 3340 "The following creates a macro called my_macro that expects two arguments:\n" 3341 "\n" 3342 " macro create my_macro make_dinner --meat {1} --veggie {2}\n" 3343 "\n" 3344 "When the macro is called, the provided arguments are resolved and the\n" 3345 "assembled command is run. For example:\n" 3346 "\n" 3347 " my_macro beef broccoli ---> make_dinner --meat beef --veggie broccoli\n" 3348 "\n" 3349 "Notes:\n" 3350 " To use the literal string {1} in your command, escape it this way: {{1}}.\n" 3351 "\n" 3352 " Extra arguments passed to a macro are appended to resolved command.\n" 3353 "\n" 3354 " An argument number can be repeated in a macro. In the following example the\n" 3355 " first argument will populate both {1} instances.\n" 3356 "\n" 3357 " macro create ft file_taxes -p {1} -q {2} -r {1}\n" 3358 "\n" 3359 " To quote an argument in the resolved command, quote it during creation.\n" 3360 "\n" 3361 " macro create backup !cp \"{1}\" \"{1}.orig\"\n" 3362 "\n" 3363 " If you want to use redirection, pipes, or terminators in the value of the\n" 3364 " macro, then quote them.\n" 3365 "\n" 3366 " macro create show_results print_results -type {1} \"|\" less\n" 3367 "\n" 3368 " Because macros do not resolve until after hitting Enter, tab completion\n" 3369 " will only complete paths while typing a macro." 3370 ) 3371 3372 macro_create_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER( 3373 description=macro_create_description, epilog=macro_create_epilog 3374 ) 3375 macro_create_parser.add_argument('name', help='name of this macro') 3376 macro_create_parser.add_argument( 3377 'command', help='what the macro resolves to', choices_provider=_get_commands_aliases_and_macros_for_completion 3378 ) 3379 macro_create_parser.add_argument( 3380 'command_args', nargs=argparse.REMAINDER, help='arguments to pass to command', completer=path_complete 3381 ) 3382 3383 @as_subcommand_to('macro', 'create', macro_create_parser, help=macro_create_help) 3384 def _macro_create(self, args: argparse.Namespace) -> None: 3385 """Create or overwrite a macro""" 3386 self.last_result = False 3387 3388 # Validate the macro name 3389 valid, errmsg = self.statement_parser.is_valid_command(args.name) 3390 if not valid: 3391 self.perror(f"Invalid macro name: {errmsg}") 3392 return 3393 3394 if args.name in self.get_all_commands(): 3395 self.perror("Macro cannot have the same name as a command") 3396 return 3397 3398 if args.name in self.aliases: 3399 self.perror("Macro cannot have the same name as an alias") 3400 return 3401 3402 # Unquote redirection and terminator tokens 3403 tokens_to_unquote = constants.REDIRECTION_TOKENS 3404 tokens_to_unquote.extend(self.statement_parser.terminators) 3405 utils.unquote_specific_tokens(args.command_args, tokens_to_unquote) 3406 3407 # Build the macro value string 3408 value = args.command 3409 if args.command_args: 3410 value += ' ' + ' '.join(args.command_args) 3411 3412 # Find all normal arguments 3413 arg_list = [] 3414 normal_matches = re.finditer(MacroArg.macro_normal_arg_pattern, value) 3415 max_arg_num = 0 3416 arg_nums = set() 3417 3418 while True: 3419 try: 3420 cur_match = normal_matches.__next__() 3421 3422 # Get the number string between the braces 3423 cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0] 3424 cur_num = int(cur_num_str) 3425 if cur_num < 1: 3426 self.perror("Argument numbers must be greater than 0") 3427 return 3428 3429 arg_nums.add(cur_num) 3430 if cur_num > max_arg_num: 3431 max_arg_num = cur_num 3432 3433 arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=False)) 3434 3435 except StopIteration: 3436 break 3437 3438 # Make sure the argument numbers are continuous 3439 if len(arg_nums) != max_arg_num: 3440 self.perror(f"Not all numbers between 1 and {max_arg_num} are present in the argument placeholders") 3441 return 3442 3443 # Find all escaped arguments 3444 escaped_matches = re.finditer(MacroArg.macro_escaped_arg_pattern, value) 3445 3446 while True: 3447 try: 3448 cur_match = escaped_matches.__next__() 3449 3450 # Get the number string between the braces 3451 cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0] 3452 3453 arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=True)) 3454 except StopIteration: 3455 break 3456 3457 # Set the macro 3458 result = "overwritten" if args.name in self.macros else "created" 3459 self.poutput(f"Macro '{args.name}' {result}") 3460 3461 self.macros[args.name] = Macro(name=args.name, value=value, minimum_arg_count=max_arg_num, arg_list=arg_list) 3462 self.last_result = True 3463 3464 # macro -> delete 3465 macro_delete_help = "delete macros" 3466 macro_delete_description = "Delete specified macros or all macros if --all is used" 3467 macro_delete_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=macro_delete_description) 3468 macro_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all macros") 3469 macro_delete_parser.add_argument( 3470 'names', 3471 nargs=argparse.ZERO_OR_MORE, 3472 help='macro(s) to delete', 3473 choices_provider=_get_macro_completion_items, 3474 descriptive_header=_macro_completion_table.generate_header(), 3475 ) 3476 3477 @as_subcommand_to('macro', 'delete', macro_delete_parser, help=macro_delete_help) 3478 def _macro_delete(self, args: argparse.Namespace) -> None: 3479 """Delete macros""" 3480 self.last_result = True 3481 3482 if args.all: 3483 self.macros.clear() 3484 self.poutput("All macros deleted") 3485 elif not args.names: 3486 self.perror("Either --all or macro name(s) must be specified") 3487 self.last_result = False 3488 else: 3489 for cur_name in utils.remove_duplicates(args.names): 3490 if cur_name in self.macros: 3491 del self.macros[cur_name] 3492 self.poutput(f"Macro '{cur_name}' deleted") 3493 else: 3494 self.perror(f"Macro '{cur_name}' does not exist") 3495 3496 # macro -> list 3497 macro_list_help = "list macros" 3498 macro_list_description = ( 3499 "List specified macros in a reusable form that can be saved to a startup script\n" 3500 "to preserve macros across sessions\n" 3501 "\n" 3502 "Without arguments, all macros will be listed." 3503 ) 3504 3505 macro_list_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=macro_list_description) 3506 macro_list_parser.add_argument( 3507 'names', 3508 nargs=argparse.ZERO_OR_MORE, 3509 help='macro(s) to list', 3510 choices_provider=_get_macro_completion_items, 3511 descriptive_header=_macro_completion_table.generate_header(), 3512 ) 3513 3514 @as_subcommand_to('macro', 'list', macro_list_parser, help=macro_list_help) 3515 def _macro_list(self, args: argparse.Namespace) -> None: 3516 """List some or all macros as 'macro create' commands""" 3517 self.last_result = {} # Dict[macro_name, macro_value] 3518 3519 tokens_to_quote = constants.REDIRECTION_TOKENS 3520 tokens_to_quote.extend(self.statement_parser.terminators) 3521 3522 if args.names: 3523 to_list = utils.remove_duplicates(args.names) 3524 else: 3525 to_list = sorted(self.macros, key=self.default_sort_key) 3526 3527 not_found: List[str] = [] 3528 for name in to_list: 3529 if name not in self.macros: 3530 not_found.append(name) 3531 continue 3532 3533 # Quote redirection and terminator tokens for the 'macro create' command 3534 tokens = shlex_split(self.macros[name].value) 3535 command = tokens[0] 3536 command_args = tokens[1:] 3537 utils.quote_specific_tokens(command_args, tokens_to_quote) 3538 3539 val = command 3540 if command_args: 3541 val += ' ' + ' '.join(command_args) 3542 3543 self.poutput(f"macro create {name} {val}") 3544 self.last_result[name] = val 3545 3546 for name in not_found: 3547 self.perror(f"Macro '{name}' not found") 3548 3549 def complete_help_command(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: 3550 """Completes the command argument of help""" 3551 3552 # Complete token against topics and visible commands 3553 topics = set(self.get_help_topics()) 3554 visible_commands = set(self.get_visible_commands()) 3555 strs_to_match = list(topics | visible_commands) 3556 return self.basic_complete(text, line, begidx, endidx, strs_to_match) 3557 3558 def complete_help_subcommands( 3559 self, text: str, line: str, begidx: int, endidx: int, arg_tokens: Dict[str, List[str]] 3560 ) -> List[str]: 3561 """Completes the subcommands argument of help""" 3562 3563 # Make sure we have a command whose subcommands we will complete 3564 command = arg_tokens['command'][0] 3565 if not command: 3566 return [] 3567 3568 # Check if this command uses argparse 3569 func = self.cmd_func(command) 3570 argparser = getattr(func, constants.CMD_ATTR_ARGPARSER, None) 3571 if func is None or argparser is None: 3572 return [] 3573 3574 completer = argparse_completer.DEFAULT_AP_COMPLETER(argparser, self) 3575 return completer.complete_subcommand_help(text, line, begidx, endidx, arg_tokens['subcommands']) 3576 3577 help_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER( 3578 description="List available commands or provide " "detailed help for a specific command" 3579 ) 3580 help_parser.add_argument( 3581 '-v', '--verbose', action='store_true', help="print a list of all commands with descriptions of each" 3582 ) 3583 help_parser.add_argument( 3584 'command', nargs=argparse.OPTIONAL, help="command to retrieve help for", completer=complete_help_command 3585 ) 3586 help_parser.add_argument( 3587 'subcommands', nargs=argparse.REMAINDER, help="subcommand(s) to retrieve help for", completer=complete_help_subcommands 3588 ) 3589 3590 # Get rid of cmd's complete_help() functions so ArgparseCompleter will complete the help command 3591 if getattr(cmd.Cmd, 'complete_help', None) is not None: 3592 delattr(cmd.Cmd, 'complete_help') 3593 3594 @with_argparser(help_parser) 3595 def do_help(self, args: argparse.Namespace) -> None: 3596 """List available commands or provide detailed help for a specific command""" 3597 self.last_result = True 3598 3599 if not args.command or args.verbose: 3600 self._help_menu(args.verbose) 3601 3602 else: 3603 # Getting help for a specific command 3604 func = self.cmd_func(args.command) 3605 help_func = getattr(self, constants.HELP_FUNC_PREFIX + args.command, None) 3606 argparser = getattr(func, constants.CMD_ATTR_ARGPARSER, None) 3607 3608 # If the command function uses argparse, then use argparse's help 3609 if func is not None and argparser is not None: 3610 completer = argparse_completer.DEFAULT_AP_COMPLETER(argparser, self) 3611 3612 # Set end to blank so the help output matches how it looks when "command -h" is used 3613 self.poutput(completer.format_help(args.subcommands), end='') 3614 3615 # If there is a help func delegate to do_help 3616 elif help_func is not None: 3617 super().do_help(args.command) 3618 3619 # If there's no help_func __doc__ then format and output it 3620 elif func is not None and func.__doc__ is not None: 3621 self.poutput(pydoc.getdoc(func)) 3622 3623 # If there is no help information then print an error 3624 else: 3625 err_msg = self.help_error.format(args.command) 3626 3627 # Set apply_style to False so help_error's style is not overridden 3628 self.perror(err_msg, apply_style=False) 3629 self.last_result = False 3630 3631 def print_topics(self, header: str, cmds: Optional[List[str]], cmdlen: int, maxcol: int) -> None: 3632 """ 3633 Print groups of commands and topics in columns and an optional header 3634 Override of cmd's print_topics() to handle headers with newlines, ANSI style sequences, and wide characters 3635 3636 :param header: string to print above commands being printed 3637 :param cmds: list of topics to print 3638 :param cmdlen: unused, even by cmd's version 3639 :param maxcol: max number of display columns to fit into 3640 """ 3641 if cmds: 3642 self.poutput(header) 3643 if self.ruler: 3644 divider = utils.align_left('', fill_char=self.ruler, width=ansi.widest_line(header)) 3645 self.poutput(divider) 3646 self.columnize(cmds, maxcol - 1) 3647 self.poutput() 3648 3649 def columnize(self, str_list: Optional[List[str]], display_width: int = 80) -> None: 3650 """Display a list of single-line strings as a compact set of columns. 3651 Override of cmd's print_topics() to handle strings with ANSI style sequences and wide characters 3652 3653 Each column is only as wide as necessary. 3654 Columns are separated by two spaces (one was not legible enough). 3655 """ 3656 if not str_list: 3657 self.poutput("<empty>") 3658 return 3659 3660 nonstrings = [i for i in range(len(str_list)) if not isinstance(str_list[i], str)] 3661 if nonstrings: 3662 raise TypeError(f"str_list[i] not a string for i in {nonstrings}") 3663 size = len(str_list) 3664 if size == 1: 3665 self.poutput(str_list[0]) 3666 return 3667 # Try every row count from 1 upwards 3668 for nrows in range(1, len(str_list)): 3669 ncols = (size + nrows - 1) // nrows 3670 colwidths = [] 3671 totwidth = -2 3672 for col in range(ncols): 3673 colwidth = 0 3674 for row in range(nrows): 3675 i = row + nrows * col 3676 if i >= size: 3677 break 3678 x = str_list[i] 3679 colwidth = max(colwidth, ansi.style_aware_wcswidth(x)) 3680 colwidths.append(colwidth) 3681 totwidth += colwidth + 2 3682 if totwidth > display_width: 3683 break 3684 if totwidth <= display_width: 3685 break 3686 else: 3687 nrows = len(str_list) 3688 ncols = 1 3689 colwidths = [0] 3690 for row in range(nrows): 3691 texts = [] 3692 for col in range(ncols): 3693 i = row + nrows * col 3694 if i >= size: 3695 x = "" 3696 else: 3697 x = str_list[i] 3698 texts.append(x) 3699 while texts and not texts[-1]: 3700 del texts[-1] 3701 for col in range(len(texts)): 3702 texts[col] = utils.align_left(texts[col], width=colwidths[col]) 3703 self.poutput(" ".join(texts)) 3704 3705 def _help_menu(self, verbose: bool = False) -> None: 3706 """Show a list of commands which help can be displayed for""" 3707 cmds_cats, cmds_doc, cmds_undoc, help_topics = self._build_command_info() 3708 3709 if not cmds_cats: 3710 # No categories found, fall back to standard behavior 3711 self.poutput(self.doc_leader) 3712 self._print_topics(self.doc_header, cmds_doc, verbose) 3713 else: 3714 # Categories found, Organize all commands by category 3715 self.poutput(self.doc_leader) 3716 self.poutput(self.doc_header, end="\n\n") 3717 for category in sorted(cmds_cats.keys(), key=self.default_sort_key): 3718 self._print_topics(category, cmds_cats[category], verbose) 3719 self._print_topics(self.default_category, cmds_doc, verbose) 3720 3721 self.print_topics(self.misc_header, help_topics, 15, 80) 3722 self.print_topics(self.undoc_header, cmds_undoc, 15, 80) 3723 3724 def _build_command_info(self) -> Tuple[Dict[str, List[str]], List[str], List[str], List[str]]: 3725 # Get a sorted list of help topics 3726 help_topics = sorted(self.get_help_topics(), key=self.default_sort_key) 3727 # Get a sorted list of visible command names 3728 visible_commands = sorted(self.get_visible_commands(), key=self.default_sort_key) 3729 cmds_doc: List[str] = [] 3730 cmds_undoc: List[str] = [] 3731 cmds_cats: Dict[str, List[str]] = {} 3732 for command in visible_commands: 3733 func = self.cmd_func(command) 3734 has_help_func = False 3735 3736 if command in help_topics: 3737 # Prevent the command from showing as both a command and help topic in the output 3738 help_topics.remove(command) 3739 3740 # Non-argparse commands can have help_functions for their documentation 3741 if not hasattr(func, constants.CMD_ATTR_ARGPARSER): 3742 has_help_func = True 3743 3744 if hasattr(func, constants.CMD_ATTR_HELP_CATEGORY): 3745 category: str = getattr(func, constants.CMD_ATTR_HELP_CATEGORY) 3746 cmds_cats.setdefault(category, []) 3747 cmds_cats[category].append(command) 3748 elif func.__doc__ or has_help_func: 3749 cmds_doc.append(command) 3750 else: 3751 cmds_undoc.append(command) 3752 return cmds_cats, cmds_doc, cmds_undoc, help_topics 3753 3754 def _print_topics(self, header: str, cmds: List[str], verbose: bool) -> None: 3755 """Customized version of print_topics that can switch between verbose or traditional output""" 3756 import io 3757 3758 if cmds: 3759 if not verbose: 3760 self.print_topics(header, cmds, 15, 80) 3761 else: 3762 # Find the widest command 3763 widest = max([ansi.style_aware_wcswidth(command) for command in cmds]) 3764 3765 # Define the table structure 3766 name_column = Column('', width=max(widest, 20)) 3767 desc_column = Column('', width=80) 3768 3769 topic_table = SimpleTable([name_column, desc_column], divider_char=self.ruler) 3770 3771 # Build the topic table 3772 table_str_buf = io.StringIO() 3773 if header: 3774 table_str_buf.write(header + "\n") 3775 3776 divider = topic_table.generate_divider() 3777 if divider: 3778 table_str_buf.write(divider + "\n") 3779 3780 # Try to get the documentation string for each command 3781 topics = self.get_help_topics() 3782 for command in cmds: 3783 cmd_func = self.cmd_func(command) 3784 doc: Optional[str] 3785 3786 # Non-argparse commands can have help_functions for their documentation 3787 if not hasattr(cmd_func, constants.CMD_ATTR_ARGPARSER) and command in topics: 3788 help_func = getattr(self, constants.HELP_FUNC_PREFIX + command) 3789 result = io.StringIO() 3790 3791 # try to redirect system stdout 3792 with redirect_stdout(result): 3793 # save our internal stdout 3794 stdout_orig = self.stdout 3795 try: 3796 # redirect our internal stdout 3797 self.stdout = cast(TextIO, result) 3798 help_func() 3799 finally: 3800 # restore internal stdout 3801 self.stdout = stdout_orig 3802 doc = result.getvalue() 3803 3804 else: 3805 doc = cmd_func.__doc__ 3806 3807 # Attempt to locate the first documentation block 3808 cmd_desc = '' 3809 if doc: 3810 found_first = False 3811 for doc_line in doc.splitlines(): 3812 stripped_line = doc_line.strip() 3813 3814 # Don't include :param type lines 3815 if stripped_line.startswith(':'): 3816 if found_first: 3817 break 3818 elif stripped_line: 3819 if found_first: 3820 cmd_desc += "\n" 3821 cmd_desc += stripped_line 3822 found_first = True 3823 elif found_first: 3824 break 3825 3826 # Add this command to the table 3827 table_row = topic_table.generate_data_row([command, cmd_desc]) 3828 table_str_buf.write(table_row + '\n') 3829 3830 self.poutput(table_str_buf.getvalue()) 3831 3832 shortcuts_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="List available shortcuts") 3833 3834 @with_argparser(shortcuts_parser) 3835 def do_shortcuts(self, _: argparse.Namespace) -> None: 3836 """List available shortcuts""" 3837 # Sort the shortcut tuples by name 3838 sorted_shortcuts = sorted(self.statement_parser.shortcuts, key=lambda x: self.default_sort_key(x[0])) 3839 result = "\n".join('{}: {}'.format(sc[0], sc[1]) for sc in sorted_shortcuts) 3840 self.poutput(f"Shortcuts for other commands:\n{result}") 3841 self.last_result = True 3842 3843 eof_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER( 3844 description="Called when Ctrl-D is pressed", epilog=INTERNAL_COMMAND_EPILOG 3845 ) 3846 3847 @with_argparser(eof_parser) 3848 def do_eof(self, _: argparse.Namespace) -> Optional[bool]: 3849 """ 3850 Called when Ctrl-D is pressed and calls quit with no arguments. 3851 This can be overridden if quit should be called differently. 3852 """ 3853 self.poutput() 3854 3855 # self.last_result will be set by do_quit() 3856 # noinspection PyTypeChecker 3857 return self.do_quit('') 3858 3859 quit_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Exit this application") 3860 3861 @with_argparser(quit_parser) 3862 def do_quit(self, _: argparse.Namespace) -> Optional[bool]: 3863 """Exit this application""" 3864 # Return True to stop the command loop 3865 self.last_result = True 3866 return True 3867 3868 def select(self, opts: Union[str, List[str], List[Tuple[Any, Optional[str]]]], prompt: str = 'Your choice? ') -> Any: 3869 """Presents a numbered menu to the user. Modeled after 3870 the bash shell's SELECT. Returns the item chosen. 3871 3872 Argument ``opts`` can be: 3873 3874 | a single string -> will be split into one-word options 3875 | a list of strings -> will be offered as options 3876 | a list of tuples -> interpreted as (value, text), so 3877 that the return value can differ from 3878 the text advertised to the user""" 3879 local_opts: Union[List[str], List[Tuple[Any, Optional[str]]]] 3880 if isinstance(opts, str): 3881 local_opts = cast(List[Tuple[Any, Optional[str]]], list(zip(opts.split(), opts.split()))) 3882 else: 3883 local_opts = opts 3884 fulloptions: List[Tuple[Any, Optional[str]]] = [] 3885 for opt in local_opts: 3886 if isinstance(opt, str): 3887 fulloptions.append((opt, opt)) 3888 else: 3889 try: 3890 fulloptions.append((opt[0], opt[1])) 3891 except IndexError: 3892 fulloptions.append((opt[0], opt[0])) 3893 for (idx, (_, text)) in enumerate(fulloptions): 3894 self.poutput(' %2d. %s' % (idx + 1, text)) 3895 3896 while True: 3897 try: 3898 response = self.read_input(prompt) 3899 except EOFError: 3900 response = '' 3901 self.poutput() 3902 except KeyboardInterrupt as ex: 3903 self.poutput('^C') 3904 raise ex 3905 3906 if not response: 3907 continue 3908 3909 try: 3910 choice = int(response) 3911 if choice < 1: 3912 raise IndexError 3913 return fulloptions[choice - 1][0] 3914 except (ValueError, IndexError): 3915 self.poutput(f"'{response}' isn't a valid choice. Pick a number between 1 and {len(fulloptions)}:") 3916 3917 def complete_set_value( 3918 self, text: str, line: str, begidx: int, endidx: int, arg_tokens: Dict[str, List[str]] 3919 ) -> List[str]: 3920 """Completes the value argument of set""" 3921 param = arg_tokens['param'][0] 3922 try: 3923 settable = self.settables[param] 3924 except KeyError: 3925 raise CompletionError(param + " is not a settable parameter") 3926 3927 # Create a parser with a value field based on this settable 3928 settable_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(parents=[Cmd.set_parser_parent]) 3929 3930 # Settables with choices list the values of those choices instead of the arg name 3931 # in help text and this shows in tab completion hints. Set metavar to avoid this. 3932 arg_name = 'value' 3933 settable_parser.add_argument( 3934 arg_name, 3935 metavar=arg_name, 3936 help=settable.description, 3937 choices=settable.choices, # type: ignore[arg-type] 3938 choices_provider=settable.choices_provider, 3939 completer=settable.completer, 3940 ) 3941 3942 completer = argparse_completer.DEFAULT_AP_COMPLETER(settable_parser, self) 3943 3944 # Use raw_tokens since quotes have been preserved 3945 _, raw_tokens = self.tokens_for_completion(line, begidx, endidx) 3946 return completer.complete(text, line, begidx, endidx, raw_tokens[1:]) 3947 3948 # When tab completing value, we recreate the set command parser with a value argument specific to 3949 # the settable being edited. To make this easier, define a parent parser with all the common elements. 3950 set_description = ( 3951 "Set a settable parameter or show current settings of parameters\n" 3952 "Call without arguments for a list of all settable parameters with their values.\n" 3953 "Call with just param to view that parameter's value." 3954 ) 3955 set_parser_parent = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=set_description, add_help=False) 3956 set_parser_parent.add_argument( 3957 'param', 3958 nargs=argparse.OPTIONAL, 3959 help='parameter to set or view', 3960 choices_provider=_get_settable_completion_items, 3961 descriptive_header=_settable_completion_table.generate_header(), 3962 ) 3963 3964 # Create the parser for the set command 3965 set_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(parents=[set_parser_parent]) 3966 set_parser.add_argument( 3967 'value', nargs=argparse.OPTIONAL, help='new value for settable', completer=complete_set_value, suppress_tab_hint=True 3968 ) 3969 3970 # Preserve quotes so users can pass in quoted empty strings and flags (e.g. -h) as the value 3971 @with_argparser(set_parser, preserve_quotes=True) 3972 def do_set(self, args: argparse.Namespace) -> None: 3973 """Set a settable parameter or show current settings of parameters""" 3974 self.last_result = False 3975 3976 if not self.settables: 3977 self.pwarning("There are no settable parameters") 3978 return 3979 3980 if args.param: 3981 try: 3982 settable = self.settables[args.param] 3983 except KeyError: 3984 self.perror(f"Parameter '{args.param}' not supported (type 'set' for list of parameters).") 3985 return 3986 3987 if args.value: 3988 # Try to update the settable's value 3989 try: 3990 orig_value = settable.get_value() 3991 new_value = settable.set_value(utils.strip_quotes(args.value)) 3992 # noinspection PyBroadException 3993 except Exception as ex: 3994 self.perror(f"Error setting {args.param}: {ex}") 3995 else: 3996 self.poutput(f"{args.param} - was: {orig_value!r}\nnow: {new_value!r}") 3997 self.last_result = True 3998 return 3999 4000 # Show one settable 4001 to_show = [args.param] 4002 else: 4003 # Show all settables 4004 to_show = list(self.settables.keys()) 4005 4006 # Define the table structure 4007 name_label = 'Name' 4008 max_name_width = max([ansi.style_aware_wcswidth(param) for param in to_show]) 4009 max_name_width = max(max_name_width, ansi.style_aware_wcswidth(name_label)) 4010 4011 cols: List[Column] = [ 4012 Column(name_label, width=max_name_width), 4013 Column('Value', width=30), 4014 Column('Description', width=60), 4015 ] 4016 4017 table = SimpleTable(cols, divider_char=self.ruler) 4018 self.poutput(table.generate_header()) 4019 4020 # Build the table and populate self.last_result 4021 self.last_result = {} # Dict[settable_name, settable_value] 4022 4023 for param in sorted(to_show, key=self.default_sort_key): 4024 settable = self.settables[param] 4025 row_data = [param, settable.get_value(), settable.description] 4026 self.poutput(table.generate_data_row(row_data)) 4027 self.last_result[param] = settable.get_value() 4028 4029 shell_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Execute a command as if at the OS prompt") 4030 shell_parser.add_argument('command', help='the command to run', completer=shell_cmd_complete) 4031 shell_parser.add_argument( 4032 'command_args', nargs=argparse.REMAINDER, help='arguments to pass to command', completer=path_complete 4033 ) 4034 4035 # Preserve quotes since we are passing these strings to the shell 4036 @with_argparser(shell_parser, preserve_quotes=True) 4037 def do_shell(self, args: argparse.Namespace) -> None: 4038 """Execute a command as if at the OS prompt""" 4039 import signal 4040 import subprocess 4041 4042 kwargs: Dict[str, Any] = dict() 4043 4044 # Set OS-specific parameters 4045 if sys.platform.startswith('win'): 4046 # Windows returns STATUS_CONTROL_C_EXIT when application stopped by Ctrl-C 4047 ctrl_c_ret_code = 0xC000013A 4048 else: 4049 # On POSIX, Popen() returns -SIGINT when application stopped by Ctrl-C 4050 ctrl_c_ret_code = signal.SIGINT.value * -1 4051 4052 # On POSIX with shell=True, Popen() defaults to /bin/sh as the shell. 4053 # sh reports an incorrect return code for some applications when Ctrl-C is pressed within that 4054 # application (e.g. less). Since sh received the SIGINT, it sets the return code to reflect being 4055 # closed by SIGINT even though less did not exit upon a Ctrl-C press. In the same situation, other 4056 # shells like bash and zsh report the actual return code of less. Therefore we will try to run the 4057 # user's preferred shell which most likely will be something other than sh. This also allows the user 4058 # to run builtin commands of their preferred shell. 4059 shell = os.environ.get("SHELL") 4060 if shell: 4061 kwargs['executable'] = shell 4062 4063 # Create a list of arguments to shell 4064 tokens = [args.command] + args.command_args 4065 4066 # Expand ~ where needed 4067 utils.expand_user_in_tokens(tokens) 4068 expanded_command = ' '.join(tokens) 4069 4070 # Prevent KeyboardInterrupts while in the shell process. The shell process will 4071 # still receive the SIGINT since it is in the same process group as us. 4072 with self.sigint_protection: 4073 # For any stream that is a StdSim, we will use a pipe so we can capture its output 4074 proc = subprocess.Popen( # type: ignore[call-overload] 4075 expanded_command, 4076 stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout, # type: ignore[unreachable] 4077 stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr, # type: ignore[unreachable] 4078 shell=True, 4079 **kwargs, 4080 ) 4081 4082 proc_reader = utils.ProcReader(proc, cast(TextIO, self.stdout), sys.stderr) # type: ignore[arg-type] 4083 proc_reader.wait() 4084 4085 # Save the return code of the application for use in a pyscript 4086 self.last_result = proc.returncode 4087 4088 # If the process was stopped by Ctrl-C, then inform the caller by raising a KeyboardInterrupt. 4089 # This is to support things like stop_on_keyboard_interrupt in run_cmds_plus_hooks(). 4090 if proc.returncode == ctrl_c_ret_code: 4091 self._raise_keyboard_interrupt() 4092 4093 @staticmethod 4094 def _reset_py_display() -> None: 4095 """ 4096 Resets the dynamic objects in the sys module that the py and ipy consoles fight over. 4097 When a Python console starts it adopts certain display settings if they've already been set. 4098 If an ipy console has previously been run, then py uses its settings and ends up looking 4099 like an ipy console in terms of prompt and exception text. This method forces the Python 4100 console to create its own display settings since they won't exist. 4101 4102 IPython does not have this problem since it always overwrites the display settings when it 4103 is run. Therefore this method only needs to be called before creating a Python console. 4104 """ 4105 # Delete any prompts that have been set 4106 attributes = ['ps1', 'ps2', 'ps3'] 4107 for cur_attr in attributes: 4108 try: 4109 del sys.__dict__[cur_attr] 4110 except KeyError: 4111 pass 4112 4113 # Reset functions 4114 sys.displayhook = sys.__displayhook__ 4115 sys.excepthook = sys.__excepthook__ 4116 4117 def _set_up_py_shell_env(self, interp: InteractiveConsole) -> _SavedCmd2Env: 4118 """ 4119 Set up interactive Python shell environment 4120 :return: Class containing saved up cmd2 environment 4121 """ 4122 cmd2_env = _SavedCmd2Env() 4123 4124 # Set up readline for Python shell 4125 if rl_type != RlType.NONE: 4126 # Save cmd2 history 4127 for i in range(1, readline.get_current_history_length() + 1): 4128 # noinspection PyArgumentList 4129 cmd2_env.history.append(readline.get_history_item(i)) 4130 4131 readline.clear_history() 4132 4133 # Restore py's history 4134 for item in self._py_history: 4135 readline.add_history(item) 4136 4137 if self._completion_supported(): 4138 # Set up tab completion for the Python console 4139 # rlcompleter relies on the default settings of the Python readline module 4140 if rl_type == RlType.GNU: 4141 cmd2_env.readline_settings.basic_quotes = cast( 4142 bytes, ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value 4143 ) 4144 rl_basic_quote_characters.value = orig_rl_basic_quotes 4145 4146 if 'gnureadline' in sys.modules: 4147 # rlcompleter imports readline by name, so it won't use gnureadline 4148 # Force rlcompleter to use gnureadline instead so it has our settings and history 4149 if 'readline' in sys.modules: 4150 cmd2_env.readline_module = sys.modules['readline'] 4151 4152 sys.modules['readline'] = sys.modules['gnureadline'] 4153 4154 cmd2_env.readline_settings.delims = readline.get_completer_delims() 4155 readline.set_completer_delims(orig_rl_delims) 4156 4157 # rlcompleter will not need cmd2's custom display function 4158 # This will be restored by cmd2 the next time complete() is called 4159 if rl_type == RlType.GNU: 4160 readline.set_completion_display_matches_hook(None) 4161 elif rl_type == RlType.PYREADLINE: 4162 # noinspection PyUnresolvedReferences 4163 readline.rl.mode._display_completions = orig_pyreadline_display 4164 4165 # Save off the current completer and set a new one in the Python console 4166 # Make sure it tab completes from its locals() dictionary 4167 cmd2_env.readline_settings.completer = readline.get_completer() 4168 interp.runcode("from rlcompleter import Completer") # type: ignore[arg-type] 4169 interp.runcode("import readline") # type: ignore[arg-type] 4170 interp.runcode("readline.set_completer(Completer(locals()).complete)") # type: ignore[arg-type] 4171 4172 # Set up sys module for the Python console 4173 self._reset_py_display() 4174 4175 cmd2_env.sys_stdout = sys.stdout 4176 sys.stdout = self.stdout # type: ignore[assignment] 4177 4178 cmd2_env.sys_stdin = sys.stdin 4179 sys.stdin = self.stdin # type: ignore[assignment] 4180 4181 return cmd2_env 4182 4183 def _restore_cmd2_env(self, cmd2_env: _SavedCmd2Env) -> None: 4184 """ 4185 Restore cmd2 environment after exiting an interactive Python shell 4186 4187 :param cmd2_env: the environment settings to restore 4188 """ 4189 sys.stdout = cmd2_env.sys_stdout # type: ignore[assignment] 4190 sys.stdin = cmd2_env.sys_stdin # type: ignore[assignment] 4191 4192 # Set up readline for cmd2 4193 if rl_type != RlType.NONE: 4194 # Save py's history 4195 self._py_history.clear() 4196 for i in range(1, readline.get_current_history_length() + 1): 4197 # noinspection PyArgumentList 4198 self._py_history.append(readline.get_history_item(i)) 4199 4200 readline.clear_history() 4201 4202 # Restore cmd2's history 4203 for item in cmd2_env.history: 4204 readline.add_history(item) 4205 4206 if self._completion_supported(): 4207 # Restore cmd2's tab completion settings 4208 readline.set_completer(cmd2_env.readline_settings.completer) 4209 readline.set_completer_delims(cmd2_env.readline_settings.delims) 4210 4211 if rl_type == RlType.GNU: 4212 rl_basic_quote_characters.value = cmd2_env.readline_settings.basic_quotes 4213 4214 if 'gnureadline' in sys.modules: 4215 # Restore what the readline module pointed to 4216 if cmd2_env.readline_module is None: 4217 del sys.modules['readline'] 4218 else: 4219 sys.modules['readline'] = cmd2_env.readline_module 4220 4221 def _run_python(self, *, pyscript: Optional[str] = None) -> Optional[bool]: 4222 """ 4223 Called by do_py() and do_run_pyscript(). 4224 If pyscript is None, then this function runs an interactive Python shell. 4225 Otherwise, it runs the pyscript file. 4226 4227 :param pyscript: optional path to a pyscript file to run. This is intended only to be used by do_run_pyscript() 4228 after it sets up sys.argv for the script. (Defaults to None) 4229 :return: True if running of commands should stop 4230 """ 4231 self.last_result = False 4232 4233 def py_quit() -> None: 4234 """Function callable from the interactive Python console to exit that environment""" 4235 raise EmbeddedConsoleExit 4236 4237 from .py_bridge import ( 4238 PyBridge, 4239 ) 4240 4241 py_bridge = PyBridge(self) 4242 saved_sys_path = None 4243 4244 if self.in_pyscript(): 4245 self.perror("Recursively entering interactive Python shells is not allowed") 4246 return None 4247 4248 try: 4249 self._in_py = True 4250 py_code_to_run = '' 4251 4252 # Make a copy of self.py_locals for the locals dictionary in the Python environment we are creating. 4253 # This is to prevent pyscripts from editing it. (e.g. locals().clear()). It also ensures a pyscript's 4254 # environment won't be filled with data from a previously run pyscript. Only make a shallow copy since 4255 # it's OK for py_locals to contain objects which are editable in a pyscript. 4256 local_vars = self.py_locals.copy() 4257 local_vars[self.py_bridge_name] = py_bridge 4258 local_vars['quit'] = py_quit 4259 local_vars['exit'] = py_quit 4260 4261 if self.self_in_py: 4262 local_vars['self'] = self 4263 4264 # Handle case where we were called by do_run_pyscript() 4265 if pyscript is not None: 4266 # Read the script file 4267 expanded_filename = os.path.expanduser(pyscript) 4268 4269 try: 4270 with open(expanded_filename) as f: 4271 py_code_to_run = f.read() 4272 except OSError as ex: 4273 self.perror(f"Error reading script file '{expanded_filename}': {ex}") 4274 return None 4275 4276 local_vars['__name__'] = '__main__' 4277 local_vars['__file__'] = expanded_filename 4278 4279 # Place the script's directory at sys.path[0] just as Python does when executing a script 4280 saved_sys_path = list(sys.path) 4281 sys.path.insert(0, os.path.dirname(os.path.abspath(expanded_filename))) 4282 4283 else: 4284 # This is the default name chosen by InteractiveConsole when no locals are passed in 4285 local_vars['__name__'] = '__console__' 4286 4287 # Create the Python interpreter 4288 self.last_result = True 4289 interp = InteractiveConsole(locals=local_vars) 4290 4291 # Check if we are running Python code 4292 if py_code_to_run: 4293 # noinspection PyBroadException 4294 try: 4295 interp.runcode(py_code_to_run) # type: ignore[arg-type] 4296 except BaseException: 4297 # We don't care about any exception that happened in the Python code 4298 pass 4299 4300 # Otherwise we will open an interactive Python shell 4301 else: 4302 cprt = 'Type "help", "copyright", "credits" or "license" for more information.' 4303 instructions = ( 4304 'Use `Ctrl-D` (Unix) / `Ctrl-Z` (Windows), `quit()`, `exit()` to exit.\n' 4305 f'Run CLI commands with: {self.py_bridge_name}("command ...")' 4306 ) 4307 banner = f"Python {sys.version} on {sys.platform}\n{cprt}\n\n{instructions}\n" 4308 4309 saved_cmd2_env = None 4310 4311 # noinspection PyBroadException 4312 try: 4313 # Get sigint protection while we set up the Python shell environment 4314 with self.sigint_protection: 4315 saved_cmd2_env = self._set_up_py_shell_env(interp) 4316 4317 # Since quit() or exit() raise an EmbeddedConsoleExit, interact() exits before printing 4318 # the exitmsg. Therefore we will not provide it one and print it manually later. 4319 interp.interact(banner=banner, exitmsg='') 4320 except BaseException: 4321 # We don't care about any exception that happened in the interactive console 4322 pass 4323 finally: 4324 # Get sigint protection while we restore cmd2 environment settings 4325 with self.sigint_protection: 4326 if saved_cmd2_env is not None: 4327 self._restore_cmd2_env(saved_cmd2_env) 4328 self.poutput("Now exiting Python shell...") 4329 4330 finally: 4331 with self.sigint_protection: 4332 if saved_sys_path is not None: 4333 sys.path = saved_sys_path 4334 self._in_py = False 4335 4336 return py_bridge.stop 4337 4338 py_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Run an interactive Python shell") 4339 4340 @with_argparser(py_parser) 4341 def do_py(self, _: argparse.Namespace) -> Optional[bool]: 4342 """ 4343 Run an interactive Python shell 4344 :return: True if running of commands should stop 4345 """ 4346 # self.last_resort will be set by _run_python() 4347 return self._run_python() 4348 4349 run_pyscript_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Run a Python script file inside the console") 4350 run_pyscript_parser.add_argument('script_path', help='path to the script file', completer=path_complete) 4351 run_pyscript_parser.add_argument( 4352 'script_arguments', nargs=argparse.REMAINDER, help='arguments to pass to script', completer=path_complete 4353 ) 4354 4355 @with_argparser(run_pyscript_parser) 4356 def do_run_pyscript(self, args: argparse.Namespace) -> Optional[bool]: 4357 """ 4358 Run a Python script file inside the console 4359 4360 :return: True if running of commands should stop 4361 """ 4362 self.last_result = False 4363 4364 # Expand ~ before placing this path in sys.argv just as a shell would 4365 args.script_path = os.path.expanduser(args.script_path) 4366 4367 # Add some protection against accidentally running a non-Python file. The happens when users 4368 # mix up run_script and run_pyscript. 4369 if not args.script_path.endswith('.py'): 4370 self.pwarning(f"'{args.script_path}' does not have a .py extension") 4371 selection = self.select('Yes No', 'Continue to try to run it as a Python script? ') 4372 if selection != 'Yes': 4373 return None 4374 4375 # Save current command line arguments 4376 orig_args = sys.argv 4377 4378 try: 4379 # Overwrite sys.argv to allow the script to take command line arguments 4380 sys.argv = [args.script_path] + args.script_arguments 4381 4382 # self.last_resort will be set by _run_python() 4383 py_return = self._run_python(pyscript=args.script_path) 4384 finally: 4385 # Restore command line arguments to original state 4386 sys.argv = orig_args 4387 4388 return py_return 4389 4390 ipython_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description="Run an interactive IPython shell") 4391 4392 # noinspection PyPackageRequirements 4393 @with_argparser(ipython_parser) 4394 def do_ipy(self, _: argparse.Namespace) -> Optional[bool]: # pragma: no cover 4395 """ 4396 Enter an interactive IPython shell 4397 4398 :return: True if running of commands should stop 4399 """ 4400 self.last_result = False 4401 4402 # Detect whether IPython is installed 4403 try: 4404 import traitlets.config.loader as TraitletsLoader # type: ignore[import] 4405 from IPython import ( # type: ignore[import] 4406 start_ipython, 4407 ) 4408 from IPython.terminal.interactiveshell import ( # type: ignore[import] 4409 TerminalInteractiveShell, 4410 ) 4411 from IPython.terminal.ipapp import ( # type: ignore[import] 4412 TerminalIPythonApp, 4413 ) 4414 except ImportError: 4415 self.perror("IPython package is not installed") 4416 return None 4417 4418 from .py_bridge import ( 4419 PyBridge, 4420 ) 4421 4422 if self.in_pyscript(): 4423 self.perror("Recursively entering interactive Python shells is not allowed") 4424 return None 4425 4426 self.last_result = True 4427 4428 try: 4429 self._in_py = True 4430 py_bridge = PyBridge(self) 4431 4432 # Make a copy of self.py_locals for the locals dictionary in the IPython environment we are creating. 4433 # This is to prevent ipy from editing it. (e.g. locals().clear()). Only make a shallow copy since 4434 # it's OK for py_locals to contain objects which are editable in ipy. 4435 local_vars = self.py_locals.copy() 4436 local_vars[self.py_bridge_name] = py_bridge 4437 if self.self_in_py: 4438 local_vars['self'] = self 4439 4440 # Configure IPython 4441 config = TraitletsLoader.Config() 4442 config.InteractiveShell.banner2 = ( 4443 'Entering an IPython shell. Type exit, quit, or Ctrl-D to exit.\n' 4444 f'Run CLI commands with: {self.py_bridge_name}("command ...")\n' 4445 ) 4446 4447 # Start IPython 4448 start_ipython(config=config, argv=[], user_ns=local_vars) 4449 self.poutput("Now exiting IPython shell...") 4450 4451 # The IPython application is a singleton and won't be recreated next time 4452 # this function runs. That's a problem since the contents of local_vars 4453 # may need to be changed. Therefore we must destroy all instances of the 4454 # relevant classes. 4455 TerminalIPythonApp.clear_instance() 4456 TerminalInteractiveShell.clear_instance() 4457 4458 return py_bridge.stop 4459 finally: 4460 self._in_py = False 4461 4462 history_description = "View, run, edit, save, or clear previously entered commands" 4463 4464 history_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=history_description) 4465 history_action_group = history_parser.add_mutually_exclusive_group() 4466 history_action_group.add_argument('-r', '--run', action='store_true', help='run selected history items') 4467 history_action_group.add_argument('-e', '--edit', action='store_true', help='edit and then run selected history items') 4468 history_action_group.add_argument( 4469 '-o', '--output_file', metavar='FILE', help='output commands to a script file, implies -s', completer=path_complete 4470 ) 4471 history_action_group.add_argument( 4472 '-t', 4473 '--transcript', 4474 metavar='TRANSCRIPT_FILE', 4475 help='output commands and results to a transcript file,\nimplies -s', 4476 completer=path_complete, 4477 ) 4478 history_action_group.add_argument('-c', '--clear', action='store_true', help='clear all history') 4479 4480 history_format_group = history_parser.add_argument_group(title='formatting') 4481 history_format_group.add_argument( 4482 '-s', '--script', action='store_true', help='output commands in script format, i.e. without command\n' 'numbers' 4483 ) 4484 history_format_group.add_argument( 4485 '-x', 4486 '--expanded', 4487 action='store_true', 4488 help='output fully parsed commands with any aliases and\n' 'macros expanded, instead of typed commands', 4489 ) 4490 history_format_group.add_argument( 4491 '-v', 4492 '--verbose', 4493 action='store_true', 4494 help='display history and include expanded commands if they\n' 'differ from the typed command', 4495 ) 4496 history_format_group.add_argument( 4497 '-a', '--all', action='store_true', help='display all commands, including ones persisted from\n' 'previous sessions' 4498 ) 4499 4500 history_arg_help = ( 4501 "empty all history items\n" 4502 "a one history item by number\n" 4503 "a..b, a:b, a:, ..b items by indices (inclusive)\n" 4504 "string items containing string\n" 4505 "/regex/ items matching regular expression" 4506 ) 4507 history_parser.add_argument('arg', nargs=argparse.OPTIONAL, help=history_arg_help) 4508 4509 @with_argparser(history_parser) 4510 def do_history(self, args: argparse.Namespace) -> Optional[bool]: 4511 """ 4512 View, run, edit, save, or clear previously entered commands 4513 4514 :return: True if running of commands should stop 4515 """ 4516 self.last_result = False 4517 4518 # -v must be used alone with no other options 4519 if args.verbose: 4520 if args.clear or args.edit or args.output_file or args.run or args.transcript or args.expanded or args.script: 4521 self.poutput("-v cannot be used with any other options") 4522 self.poutput(self.history_parser.format_usage()) 4523 return None 4524 4525 # -s and -x can only be used if none of these options are present: [-c -r -e -o -t] 4526 if (args.script or args.expanded) and (args.clear or args.edit or args.output_file or args.run or args.transcript): 4527 self.poutput("-s and -x cannot be used with -c, -r, -e, -o, or -t") 4528 self.poutput(self.history_parser.format_usage()) 4529 return None 4530 4531 if args.clear: 4532 self.last_result = True 4533 4534 # Clear command and readline history 4535 self.history.clear() 4536 4537 if self.persistent_history_file: 4538 try: 4539 os.remove(self.persistent_history_file) 4540 except FileNotFoundError: 4541 pass 4542 except OSError as ex: 4543 self.perror(f"Error removing history file '{self.persistent_history_file}': {ex}") 4544 self.last_result = False 4545 return None 4546 4547 if rl_type != RlType.NONE: 4548 readline.clear_history() 4549 return None 4550 4551 # If an argument was supplied, then retrieve partial contents of the history, otherwise retrieve it all 4552 history = self._get_history(args) 4553 4554 if args.run: 4555 if not args.arg: 4556 self.perror("Cowardly refusing to run all previously entered commands.") 4557 self.perror("If this is what you want to do, specify '1:' as the range of history.") 4558 else: 4559 stop = self.runcmds_plus_hooks(list(history.values())) 4560 self.last_result = True 4561 return stop 4562 elif args.edit: 4563 import tempfile 4564 4565 fd, fname = tempfile.mkstemp(suffix='.txt', text=True) 4566 fobj: TextIO 4567 with os.fdopen(fd, 'w') as fobj: 4568 for command in history.values(): 4569 if command.statement.multiline_command: 4570 fobj.write(f'{command.expanded}\n') 4571 else: 4572 fobj.write(f'{command.raw}\n') 4573 try: 4574 self.run_editor(fname) 4575 4576 # self.last_resort will be set by do_run_script() 4577 # noinspection PyTypeChecker 4578 return self.do_run_script(utils.quote_string(fname)) 4579 finally: 4580 os.remove(fname) 4581 elif args.output_file: 4582 full_path = os.path.abspath(os.path.expanduser(args.output_file)) 4583 try: 4584 with open(full_path, 'w') as fobj: 4585 for item in history.values(): 4586 if item.statement.multiline_command: 4587 fobj.write(f"{item.expanded}\n") 4588 else: 4589 fobj.write(f"{item.raw}\n") 4590 plural = '' if len(history) == 1 else 's' 4591 except OSError as ex: 4592 self.perror(f"Error saving history file '{full_path}': {ex}") 4593 else: 4594 self.pfeedback(f"{len(history)} command{plural} saved to {full_path}") 4595 self.last_result = True 4596 elif args.transcript: 4597 # self.last_resort will be set by _generate_transcript() 4598 self._generate_transcript(list(history.values()), args.transcript) 4599 else: 4600 # Display the history items retrieved 4601 for idx, hi in history.items(): 4602 self.poutput(hi.pr(idx, script=args.script, expanded=args.expanded, verbose=args.verbose)) 4603 self.last_result = history 4604 return None 4605 4606 def _get_history(self, args: argparse.Namespace) -> 'OrderedDict[int, HistoryItem]': 4607 """If an argument was supplied, then retrieve partial contents of the history; otherwise retrieve entire history. 4608 4609 This function returns a dictionary with history items keyed by their 1-based index in ascending order. 4610 """ 4611 if args.arg: 4612 try: 4613 int_arg = int(args.arg) 4614 return OrderedDict({int_arg: self.history.get(int_arg)}) 4615 except ValueError: 4616 pass 4617 4618 if '..' in args.arg or ':' in args.arg: 4619 # Get a slice of history 4620 history = self.history.span(args.arg, args.all) 4621 elif args.arg.startswith(r'/') and args.arg.endswith(r'/'): 4622 history = self.history.regex_search(args.arg, args.all) 4623 else: 4624 history = self.history.str_search(args.arg, args.all) 4625 else: 4626 # Get a copy of the history so it doesn't get mutated while we are using it 4627 history = self.history.span(':', args.all) 4628 return history 4629 4630 def _initialize_history(self, hist_file: str) -> None: 4631 """Initialize history using history related attributes 4632 4633 :param hist_file: optional path to persistent history file. If specified, then history from 4634 previous sessions will be included. Additionally, all history will be written 4635 to this file when the application exits. 4636 """ 4637 import json 4638 import lzma 4639 4640 self.history = History() 4641 # with no persistent history, nothing else in this method is relevant 4642 if not hist_file: 4643 self.persistent_history_file = hist_file 4644 return 4645 4646 hist_file = os.path.abspath(os.path.expanduser(hist_file)) 4647 4648 # On Windows, trying to open a directory throws a permission 4649 # error, not a `IsADirectoryError`. So we'll check it ourselves. 4650 if os.path.isdir(hist_file): 4651 self.perror(f"Persistent history file '{hist_file}' is a directory") 4652 return 4653 4654 # Create the directory for the history file if it doesn't already exist 4655 hist_file_dir = os.path.dirname(hist_file) 4656 try: 4657 os.makedirs(hist_file_dir, exist_ok=True) 4658 except OSError as ex: 4659 self.perror(f"Error creating persistent history file directory '{hist_file_dir}': {ex}") 4660 return 4661 4662 # Read and process history file 4663 try: 4664 with open(hist_file, 'rb') as fobj: 4665 compressed_bytes = fobj.read() 4666 history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8') 4667 self.history = History.from_json(history_json) 4668 except FileNotFoundError: 4669 # Just use an empty history 4670 pass 4671 except OSError as ex: 4672 self.perror(f"Cannot read persistent history file '{hist_file}': {ex}") 4673 return 4674 except (json.JSONDecodeError, lzma.LZMAError, KeyError, UnicodeDecodeError, ValueError) as ex: 4675 self.perror( 4676 f"Error processing persistent history file '{hist_file}': {ex}\n" 4677 f"The history file will be recreated when this application exits." 4678 ) 4679 4680 self.history.start_session() 4681 self.persistent_history_file = hist_file 4682 4683 # populate readline history 4684 if rl_type != RlType.NONE: 4685 last = None 4686 for item in self.history: 4687 # Break the command into its individual lines 4688 for line in item.raw.splitlines(): 4689 # readline only adds a single entry for multiple sequential identical lines 4690 # so we emulate that behavior here 4691 if line != last: 4692 readline.add_history(line) 4693 last = line 4694 4695 # register a function to write history at save 4696 # if the history file is in plain text format from 0.9.12 or lower 4697 # this will fail, and the history in the plain text file will be lost 4698 import atexit 4699 4700 atexit.register(self._persist_history) 4701 4702 def _persist_history(self) -> None: 4703 """Write history out to the persistent history file as compressed JSON""" 4704 import lzma 4705 4706 if not self.persistent_history_file: 4707 return 4708 4709 self.history.truncate(self._persistent_history_length) 4710 try: 4711 history_json = self.history.to_json() 4712 compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8')) 4713 4714 with open(self.persistent_history_file, 'wb') as fobj: 4715 fobj.write(compressed_bytes) 4716 except OSError as ex: 4717 self.perror(f"Cannot write persistent history file '{self.persistent_history_file}': {ex}") 4718 4719 def _generate_transcript(self, history: Union[List[HistoryItem], List[str]], transcript_file: str) -> None: 4720 """Generate a transcript file from a given history of commands""" 4721 self.last_result = False 4722 4723 # Validate the transcript file path to make sure directory exists and write access is available 4724 transcript_path = os.path.abspath(os.path.expanduser(transcript_file)) 4725 transcript_dir = os.path.dirname(transcript_path) 4726 if not os.path.isdir(transcript_dir) or not os.access(transcript_dir, os.W_OK): 4727 self.perror(f"'{transcript_dir}' is not a directory or you don't have write access") 4728 return 4729 4730 commands_run = 0 4731 try: 4732 with self.sigint_protection: 4733 # Disable echo while we manually redirect stdout to a StringIO buffer 4734 saved_echo = self.echo 4735 saved_stdout = self.stdout 4736 self.echo = False 4737 4738 # The problem with supporting regular expressions in transcripts 4739 # is that they shouldn't be processed in the command, just the output. 4740 # In addition, when we generate a transcript, any slashes in the output 4741 # are not really intended to indicate regular expressions, so they should 4742 # be escaped. 4743 # 4744 # We have to jump through some hoops here in order to catch the commands 4745 # separately from the output and escape the slashes in the output. 4746 transcript = '' 4747 for history_item in history: 4748 # build the command, complete with prompts. When we replay 4749 # the transcript, we look for the prompts to separate 4750 # the command from the output 4751 first = True 4752 command = '' 4753 if isinstance(history_item, HistoryItem): 4754 history_item = history_item.raw 4755 for line in history_item.splitlines(): 4756 if first: 4757 command += f"{self.prompt}{line}\n" 4758 first = False 4759 else: 4760 command += f"{self.continuation_prompt}{line}\n" 4761 transcript += command 4762 4763 # Use a StdSim object to capture output 4764 stdsim = utils.StdSim(cast(TextIO, self.stdout)) 4765 self.stdout = cast(TextIO, stdsim) 4766 4767 # then run the command and let the output go into our buffer 4768 try: 4769 stop = self.onecmd_plus_hooks(history_item, raise_keyboard_interrupt=True) 4770 except KeyboardInterrupt as ex: 4771 self.perror(ex) 4772 stop = True 4773 4774 commands_run += 1 4775 4776 # add the regex-escaped output to the transcript 4777 transcript += stdsim.getvalue().replace('/', r'\/') 4778 4779 # check if we are supposed to stop 4780 if stop: 4781 break 4782 finally: 4783 with self.sigint_protection: 4784 # Restore altered attributes to their original state 4785 self.echo = saved_echo 4786 self.stdout = cast(TextIO, saved_stdout) 4787 4788 # Check if all commands ran 4789 if commands_run < len(history): 4790 self.pwarning(f"Command {commands_run} triggered a stop and ended transcript generation early") 4791 4792 # finally, we can write the transcript out to the file 4793 try: 4794 with open(transcript_path, 'w') as fout: 4795 fout.write(transcript) 4796 except OSError as ex: 4797 self.perror(f"Error saving transcript file '{transcript_path}': {ex}") 4798 else: 4799 # and let the user know what we did 4800 if commands_run == 1: 4801 plural = 'command and its output' 4802 else: 4803 plural = 'commands and their outputs' 4804 self.pfeedback(f"{commands_run} {plural} saved to transcript file '{transcript_path}'") 4805 self.last_result = True 4806 4807 edit_description = ( 4808 "Run a text editor and optionally open a file with it\n" 4809 "\n" 4810 "The editor used is determined by a settable parameter. To set it:\n" 4811 "\n" 4812 " set editor (program-name)" 4813 ) 4814 4815 edit_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=edit_description) 4816 edit_parser.add_argument( 4817 'file_path', nargs=argparse.OPTIONAL, help="optional path to a file to open in editor", completer=path_complete 4818 ) 4819 4820 @with_argparser(edit_parser) 4821 def do_edit(self, args: argparse.Namespace) -> None: 4822 """Run a text editor and optionally open a file with it""" 4823 4824 # self.last_result will be set by do_shell() which is called by run_editor() 4825 self.run_editor(args.file_path) 4826 4827 def run_editor(self, file_path: Optional[str] = None) -> None: 4828 """ 4829 Run a text editor and optionally open a file with it 4830 4831 :param file_path: optional path of the file to edit. Defaults to None. 4832 :raises: EnvironmentError if self.editor is not set 4833 """ 4834 if not self.editor: 4835 raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") 4836 4837 command = utils.quote_string(os.path.expanduser(self.editor)) 4838 if file_path: 4839 command += " " + utils.quote_string(os.path.expanduser(file_path)) 4840 4841 # noinspection PyTypeChecker 4842 self.do_shell(command) 4843 4844 @property 4845 def _current_script_dir(self) -> Optional[str]: 4846 """Accessor to get the current script directory from the _script_dir LIFO queue.""" 4847 if self._script_dir: 4848 return self._script_dir[-1] 4849 else: 4850 return None 4851 4852 run_script_description = ( 4853 "Run commands in script file that is encoded as either ASCII or UTF-8 text\n" 4854 "\n" 4855 "Script should contain one command per line, just like the command would be\n" 4856 "typed in the console.\n" 4857 "\n" 4858 "If the -t/--transcript flag is used, this command instead records\n" 4859 "the output of the script commands to a transcript for testing purposes.\n" 4860 ) 4861 4862 run_script_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER(description=run_script_description) 4863 run_script_parser.add_argument( 4864 '-t', 4865 '--transcript', 4866 metavar='TRANSCRIPT_FILE', 4867 help='record the output of the script as a transcript file', 4868 completer=path_complete, 4869 ) 4870 run_script_parser.add_argument('script_path', help="path to the script file", completer=path_complete) 4871 4872 @with_argparser(run_script_parser) 4873 def do_run_script(self, args: argparse.Namespace) -> Optional[bool]: 4874 """Run commands in script file that is encoded as either ASCII or UTF-8 text. 4875 4876 :return: True if running of commands should stop 4877 """ 4878 self.last_result = False 4879 expanded_path = os.path.abspath(os.path.expanduser(args.script_path)) 4880 4881 # Add some protection against accidentally running a Python file. The happens when users 4882 # mix up run_script and run_pyscript. 4883 if expanded_path.endswith('.py'): 4884 self.pwarning(f"'{expanded_path}' appears to be a Python file") 4885 selection = self.select('Yes No', 'Continue to try to run it as a text script? ') 4886 if selection != 'Yes': 4887 return None 4888 4889 try: 4890 # An empty file is not an error, so just return 4891 if os.path.getsize(expanded_path) == 0: 4892 self.last_result = True 4893 return None 4894 4895 # Make sure the file is ASCII or UTF-8 encoded text 4896 if not utils.is_text_file(expanded_path): 4897 self.perror(f"'{expanded_path}' is not an ASCII or UTF-8 encoded text file") 4898 return None 4899 4900 # Read all lines of the script 4901 with open(expanded_path, encoding='utf-8') as target: 4902 script_commands = target.read().splitlines() 4903 except OSError as ex: 4904 self.perror(f"Problem accessing script from '{expanded_path}': {ex}") 4905 return None 4906 4907 orig_script_dir_count = len(self._script_dir) 4908 4909 try: 4910 self._script_dir.append(os.path.dirname(expanded_path)) 4911 4912 if args.transcript: 4913 # self.last_resort will be set by _generate_transcript() 4914 self._generate_transcript(script_commands, os.path.expanduser(args.transcript)) 4915 else: 4916 stop = self.runcmds_plus_hooks(script_commands, stop_on_keyboard_interrupt=True) 4917 self.last_result = True 4918 return stop 4919 4920 finally: 4921 with self.sigint_protection: 4922 # Check if a script dir was added before an exception occurred 4923 if orig_script_dir_count != len(self._script_dir): 4924 self._script_dir.pop() 4925 return None 4926 4927 relative_run_script_description = run_script_description 4928 relative_run_script_description += ( 4929 "\n\n" 4930 "If this is called from within an already-running script, the filename will be\n" 4931 "interpreted relative to the already-running script's directory." 4932 ) 4933 4934 relative_run_script_epilog = "Notes:\n" " This command is intended to only be used within text file scripts." 4935 4936 relative_run_script_parser = argparse_custom.DEFAULT_ARGUMENT_PARSER( 4937 description=relative_run_script_description, epilog=relative_run_script_epilog 4938 ) 4939 relative_run_script_parser.add_argument('file_path', help='a file path pointing to a script') 4940 4941 @with_argparser(relative_run_script_parser) 4942 def do__relative_run_script(self, args: argparse.Namespace) -> Optional[bool]: 4943 """ 4944 Run commands in script file that is encoded as either ASCII or UTF-8 text 4945 4946 :return: True if running of commands should stop 4947 """ 4948 file_path = args.file_path 4949 # NOTE: Relative path is an absolute path, it is just relative to the current script directory 4950 relative_path = os.path.join(self._current_script_dir or '', file_path) 4951 4952 # self.last_result will be set by do_run_script() 4953 # noinspection PyTypeChecker 4954 return self.do_run_script(utils.quote_string(relative_path)) 4955 4956 def _run_transcript_tests(self, transcript_paths: List[str]) -> None: 4957 """Runs transcript tests for provided file(s). 4958 4959 This is called when either -t is provided on the command line or the transcript_files argument is provided 4960 during construction of the cmd2.Cmd instance. 4961 4962 :param transcript_paths: list of transcript test file paths 4963 """ 4964 import time 4965 import unittest 4966 4967 import cmd2 4968 4969 from .transcript import ( 4970 Cmd2TestCase, 4971 ) 4972 4973 class TestMyAppCase(Cmd2TestCase): 4974 cmdapp = self 4975 4976 # Validate that there is at least one transcript file 4977 transcripts_expanded = utils.files_from_glob_patterns(transcript_paths, access=os.R_OK) 4978 if not transcripts_expanded: 4979 self.perror('No test files found - nothing to test') 4980 self.exit_code = 1 4981 return 4982 4983 verinfo = ".".join(map(str, sys.version_info[:3])) 4984 num_transcripts = len(transcripts_expanded) 4985 plural = '' if len(transcripts_expanded) == 1 else 's' 4986 self.poutput(ansi.style(utils.align_center(' cmd2 transcript test ', fill_char='='), bold=True)) 4987 self.poutput(f'platform {sys.platform} -- Python {verinfo}, cmd2-{cmd2.__version__}, readline-{rl_type}') 4988 self.poutput(f'cwd: {os.getcwd()}') 4989 self.poutput(f'cmd2 app: {sys.argv[0]}') 4990 self.poutput(ansi.style(f'collected {num_transcripts} transcript{plural}', bold=True)) 4991 4992 setattr(self.__class__, 'testfiles', transcripts_expanded) 4993 sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() 4994 testcase = TestMyAppCase() 4995 stream = cast(TextIO, utils.StdSim(sys.stderr)) 4996 # noinspection PyTypeChecker 4997 runner = unittest.TextTestRunner(stream=stream) 4998 start_time = time.time() 4999 test_results = runner.run(testcase) 5000 execution_time = time.time() - start_time 5001 if test_results.wasSuccessful(): 5002 ansi.style_aware_write(sys.stderr, stream.read()) 5003 finish_msg = f' {num_transcripts} transcript{plural} passed in {execution_time:.3f} seconds ' 5004 finish_msg = ansi.style_success(utils.align_center(finish_msg, fill_char='=')) 5005 self.poutput(finish_msg) 5006 else: 5007 # Strip off the initial traceback which isn't particularly useful for end users 5008 error_str = stream.read() 5009 end_of_trace = error_str.find('AssertionError:') 5010 file_offset = error_str[end_of_trace:].find('File ') 5011 start = end_of_trace + file_offset 5012 5013 # But print the transcript file name and line number followed by what was expected and what was observed 5014 self.perror(error_str[start:]) 5015 5016 # Return a failure error code to support automated transcript-based testing 5017 self.exit_code = 1 5018 5019 def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: # pragma: no cover 5020 """ 5021 Display an important message to the user while they are at a command line prompt. 5022 To the user it appears as if an alert message is printed above the prompt and their current input 5023 text and cursor location is left alone. 5024 5025 IMPORTANT: This function will not print an alert unless it can acquire self.terminal_lock to ensure 5026 a prompt is onscreen. Therefore it is best to acquire the lock before calling this function 5027 to guarantee the alert prints and to avoid raising a RuntimeError. 5028 5029 :param alert_msg: the message to display to the user 5030 :param new_prompt: If you also want to change the prompt that is displayed, then include it here. 5031 See async_update_prompt() docstring for guidance on updating a prompt. 5032 :raises RuntimeError: if called while another thread holds `terminal_lock` 5033 """ 5034 if not (vt100_support and self.use_rawinput): 5035 return 5036 5037 # Sanity check that can't fail if self.terminal_lock was acquired before calling this function 5038 if self.terminal_lock.acquire(blocking=False): 5039 5040 # Windows terminals tend to flicker when we redraw the prompt and input lines. 5041 # To reduce how often this occurs, only update terminal if there are changes. 5042 update_terminal = False 5043 5044 if alert_msg: 5045 alert_msg += '\n' 5046 update_terminal = True 5047 5048 if new_prompt is not None: 5049 self.prompt = new_prompt 5050 5051 # Check if the prompt to display has changed from what's currently displayed 5052 cur_onscreen_prompt = rl_get_prompt() 5053 new_onscreen_prompt = self.continuation_prompt if self._at_continuation_prompt else self.prompt 5054 5055 if new_onscreen_prompt != cur_onscreen_prompt: 5056 update_terminal = True 5057 5058 if update_terminal: 5059 import shutil 5060 5061 # Generate the string which will replace the current prompt and input lines with the alert 5062 terminal_str = ansi.async_alert_str( 5063 terminal_columns=shutil.get_terminal_size().columns, 5064 prompt=cur_onscreen_prompt, 5065 line=readline.get_line_buffer(), 5066 cursor_offset=rl_get_point(), 5067 alert_msg=alert_msg, 5068 ) 5069 if rl_type == RlType.GNU: 5070 sys.stderr.write(terminal_str) 5071 sys.stderr.flush() 5072 elif rl_type == RlType.PYREADLINE: 5073 # noinspection PyUnresolvedReferences 5074 readline.rl.mode.console.write(terminal_str) 5075 5076 # Update Readline's prompt before we redraw it 5077 rl_set_prompt(new_onscreen_prompt) 5078 5079 # Redraw the prompt and input lines below the alert 5080 rl_force_redisplay() 5081 5082 self.terminal_lock.release() 5083 5084 else: 5085 raise RuntimeError("another thread holds terminal_lock") 5086 5087 def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover 5088 """ 5089 Update the command line prompt while the user is still typing at it. This is good for alerting the user to 5090 system changes dynamically in between commands. For instance you could alter the color of the prompt to 5091 indicate a system status or increase a counter to report an event. If you do alter the actual text of the 5092 prompt, it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will 5093 be shifted and the update will not be seamless. 5094 5095 IMPORTANT: This function will not update the prompt unless it can acquire self.terminal_lock to ensure 5096 a prompt is onscreen. Therefore it is best to acquire the lock before calling this function 5097 to guarantee the prompt changes and to avoid raising a RuntimeError. 5098 5099 If user is at a continuation prompt while entering a multiline command, the onscreen prompt will 5100 not change. However self.prompt will still be updated and display immediately after the multiline 5101 line command completes. 5102 5103 :param new_prompt: what to change the prompt to 5104 :raises RuntimeError: if called while another thread holds `terminal_lock` 5105 """ 5106 self.async_alert('', new_prompt) 5107 5108 @staticmethod 5109 def set_window_title(title: str) -> None: # pragma: no cover 5110 """ 5111 Set the terminal window title. 5112 5113 NOTE: This function writes to stderr. Therefore if you call this during a command run by a pyscript, 5114 the string which updates the title will appear in that command's CommandResult.stderr data. 5115 5116 :param title: the new window title 5117 """ 5118 if not vt100_support: 5119 return 5120 5121 try: 5122 sys.stderr.write(ansi.set_title(title)) 5123 sys.stderr.flush() 5124 except AttributeError: 5125 # Debugging in Pycharm has issues with setting terminal title 5126 pass 5127 5128 def enable_command(self, command: str) -> None: 5129 """ 5130 Enable a command by restoring its functions 5131 5132 :param command: the command being enabled 5133 """ 5134 # If the commands is already enabled, then return 5135 if command not in self.disabled_commands: 5136 return 5137 5138 help_func_name = constants.HELP_FUNC_PREFIX + command 5139 completer_func_name = constants.COMPLETER_FUNC_PREFIX + command 5140 5141 # Restore the command function to its original value 5142 dc = self.disabled_commands[command] 5143 setattr(self, self._cmd_func_name(command), dc.command_function) 5144 5145 # Restore the help function to its original value 5146 if dc.help_function is None: 5147 delattr(self, help_func_name) 5148 else: 5149 setattr(self, help_func_name, dc.help_function) 5150 5151 # Restore the completer function to its original value 5152 if dc.completer_function is None: 5153 delattr(self, completer_func_name) 5154 else: 5155 setattr(self, completer_func_name, dc.completer_function) 5156 5157 # Remove the disabled command entry 5158 del self.disabled_commands[command] 5159 5160 def enable_category(self, category: str) -> None: 5161 """ 5162 Enable an entire category of commands 5163 5164 :param category: the category to enable 5165 """ 5166 for cmd_name in list(self.disabled_commands): 5167 func = self.disabled_commands[cmd_name].command_function 5168 if getattr(func, constants.CMD_ATTR_HELP_CATEGORY, None) == category: 5169 self.enable_command(cmd_name) 5170 5171 def disable_command(self, command: str, message_to_print: str) -> None: 5172 """ 5173 Disable a command and overwrite its functions 5174 5175 :param command: the command being disabled 5176 :param message_to_print: what to print when this command is run or help is called on it while disabled 5177 5178 The variable cmd2.COMMAND_NAME can be used as a placeholder for the name of the 5179 command being disabled. 5180 ex: message_to_print = f"{cmd2.COMMAND_NAME} is currently disabled" 5181 """ 5182 # If the commands is already disabled, then return 5183 if command in self.disabled_commands: 5184 return 5185 5186 # Make sure this is an actual command 5187 command_function = self.cmd_func(command) 5188 if command_function is None: 5189 raise AttributeError(f"'{command}' does not refer to a command") 5190 5191 help_func_name = constants.HELP_FUNC_PREFIX + command 5192 completer_func_name = constants.COMPLETER_FUNC_PREFIX + command 5193 5194 # Add the disabled command record 5195 self.disabled_commands[command] = DisabledCommand( 5196 command_function=command_function, 5197 help_function=getattr(self, help_func_name, None), 5198 completer_function=getattr(self, completer_func_name, None), 5199 ) 5200 5201 # Overwrite the command and help functions to print the message 5202 new_func = functools.partial( 5203 self._report_disabled_command_usage, message_to_print=message_to_print.replace(constants.COMMAND_NAME, command) 5204 ) 5205 setattr(self, self._cmd_func_name(command), new_func) 5206 setattr(self, help_func_name, new_func) 5207 5208 # Set the completer to a function that returns a blank list 5209 setattr(self, completer_func_name, lambda *args, **kwargs: []) 5210 5211 def disable_category(self, category: str, message_to_print: str) -> None: 5212 """Disable an entire category of commands. 5213 5214 :param category: the category to disable 5215 :param message_to_print: what to print when anything in this category is run or help is called on it 5216 while disabled. The variable cmd2.COMMAND_NAME can be used as a placeholder for the name 5217 of the command being disabled. 5218 ex: message_to_print = f"{cmd2.COMMAND_NAME} is currently disabled" 5219 """ 5220 all_commands = self.get_all_commands() 5221 5222 for cmd_name in all_commands: 5223 func = self.cmd_func(cmd_name) 5224 if getattr(func, constants.CMD_ATTR_HELP_CATEGORY, None) == category: 5225 self.disable_command(cmd_name, message_to_print) 5226 5227 def _report_disabled_command_usage(self, *_args: Any, message_to_print: str, **_kwargs: Any) -> None: 5228 """ 5229 Report when a disabled command has been run or had help called on it 5230 5231 :param args: not used 5232 :param message_to_print: the message reporting that the command is disabled 5233 :param kwargs: not used 5234 """ 5235 # Set apply_style to False so message_to_print's style is not overridden 5236 self.perror(message_to_print, apply_style=False) 5237 5238 def cmdloop(self, intro: Optional[str] = None) -> int: # type: ignore[override] 5239 """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. 5240 5241 _cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with 5242 the following extra features provided by cmd2: 5243 - transcript testing 5244 - intro banner 5245 - exit code 5246 5247 :param intro: if provided this overrides self.intro and serves as the intro banner printed once at start 5248 """ 5249 # cmdloop() expects to be run in the main thread to support extensive use of KeyboardInterrupts throughout the 5250 # other built-in functions. You are free to override cmdloop, but much of cmd2's features will be limited. 5251 if not threading.current_thread() is threading.main_thread(): 5252 raise RuntimeError("cmdloop must be run in the main thread") 5253 5254 # Register a SIGINT signal handler for Ctrl+C 5255 import signal 5256 5257 original_sigint_handler = signal.getsignal(signal.SIGINT) 5258 signal.signal(signal.SIGINT, self.sigint_handler) 5259 5260 # Grab terminal lock before the command line prompt has been drawn by readline 5261 self.terminal_lock.acquire() 5262 5263 # Always run the preloop first 5264 for func in self._preloop_hooks: 5265 func() 5266 self.preloop() 5267 5268 # If transcript-based regression testing was requested, then do that instead of the main loop 5269 if self._transcript_files is not None: 5270 self._run_transcript_tests([os.path.expanduser(tf) for tf in self._transcript_files]) 5271 else: 5272 # If an intro was supplied in the method call, allow it to override the default 5273 if intro is not None: 5274 self.intro = intro 5275 5276 # Print the intro, if there is one, right after the preloop 5277 if self.intro is not None: 5278 self.poutput(self.intro) 5279 5280 # And then call _cmdloop() to enter the main loop 5281 self._cmdloop() 5282 5283 # Run the postloop() no matter what 5284 for func in self._postloop_hooks: 5285 func() 5286 self.postloop() 5287 5288 # Release terminal lock now that postloop code should have stopped any terminal updater threads 5289 # This will also zero the lock count in case cmdloop() is called again 5290 self.terminal_lock.release() 5291 5292 # Restore the original signal handler 5293 signal.signal(signal.SIGINT, original_sigint_handler) 5294 5295 return self.exit_code 5296 5297 ### 5298 # 5299 # plugin related functions 5300 # 5301 ### 5302 def _initialize_plugin_system(self) -> None: 5303 """Initialize the plugin system""" 5304 self._preloop_hooks: List[Callable[[], None]] = [] 5305 self._postloop_hooks: List[Callable[[], None]] = [] 5306 self._postparsing_hooks: List[Callable[[plugin.PostparsingData], plugin.PostparsingData]] = [] 5307 self._precmd_hooks: List[Callable[[plugin.PrecommandData], plugin.PrecommandData]] = [] 5308 self._postcmd_hooks: List[Callable[[plugin.PostcommandData], plugin.PostcommandData]] = [] 5309 self._cmdfinalization_hooks: List[Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData]] = [] 5310 5311 @classmethod 5312 def _validate_callable_param_count(cls, func: Callable[..., Any], count: int) -> None: 5313 """Ensure a function has the given number of parameters.""" 5314 signature = inspect.signature(func) 5315 # validate that the callable has the right number of parameters 5316 nparam = len(signature.parameters) 5317 if nparam != count: 5318 plural = '' if nparam == 1 else 's' 5319 raise TypeError(f'{func.__name__} has {nparam} positional argument{plural}, expected {count}') 5320 5321 @classmethod 5322 def _validate_prepostloop_callable(cls, func: Callable[[], None]) -> None: 5323 """Check parameter and return types for preloop and postloop hooks.""" 5324 cls._validate_callable_param_count(func, 0) 5325 # make sure there is no return notation 5326 signature = inspect.signature(func) 5327 if signature.return_annotation is not None: 5328 raise TypeError(f"{func.__name__} must declare return a return type of 'None'") 5329 5330 def register_preloop_hook(self, func: Callable[[], None]) -> None: 5331 """Register a function to be called at the beginning of the command loop.""" 5332 self._validate_prepostloop_callable(func) 5333 self._preloop_hooks.append(func) 5334 5335 def register_postloop_hook(self, func: Callable[[], None]) -> None: 5336 """Register a function to be called at the end of the command loop.""" 5337 self._validate_prepostloop_callable(func) 5338 self._postloop_hooks.append(func) 5339 5340 @classmethod 5341 def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: 5342 """Check parameter and return types for postparsing hooks""" 5343 cls._validate_callable_param_count(cast(Callable[..., Any], func), 1) 5344 signature = inspect.signature(func) 5345 _, param = list(signature.parameters.items())[0] 5346 if param.annotation != plugin.PostparsingData: 5347 raise TypeError(f"{func.__name__} must have one parameter declared with type 'cmd2.plugin.PostparsingData'") 5348 if signature.return_annotation != plugin.PostparsingData: 5349 raise TypeError(f"{func.__name__} must declare return a return type of 'cmd2.plugin.PostparsingData'") 5350 5351 def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: 5352 """Register a function to be called after parsing user input but before running the command""" 5353 self._validate_postparsing_callable(func) 5354 self._postparsing_hooks.append(func) 5355 5356 CommandDataType = TypeVar('CommandDataType') 5357 5358 @classmethod 5359 def _validate_prepostcmd_hook( 5360 cls, func: Callable[[CommandDataType], CommandDataType], data_type: Type[CommandDataType] 5361 ) -> None: 5362 """Check parameter and return types for pre and post command hooks.""" 5363 signature = inspect.signature(func) 5364 # validate that the callable has the right number of parameters 5365 cls._validate_callable_param_count(cast(Callable[..., Any], func), 1) 5366 # validate the parameter has the right annotation 5367 paramname = list(signature.parameters.keys())[0] 5368 param = signature.parameters[paramname] 5369 if param.annotation != data_type: 5370 raise TypeError(f'argument 1 of {func.__name__} has incompatible type {param.annotation}, expected {data_type}') 5371 # validate the return value has the right annotation 5372 if signature.return_annotation == signature.empty: 5373 raise TypeError(f'{func.__name__} does not have a declared return type, expected {data_type}') 5374 if signature.return_annotation != data_type: 5375 raise TypeError( 5376 f'{func.__name__} has incompatible return type {signature.return_annotation}, expected ' f'{data_type}' 5377 ) 5378 5379 def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None: 5380 """Register a hook to be called before the command function.""" 5381 self._validate_prepostcmd_hook(func, plugin.PrecommandData) 5382 self._precmd_hooks.append(func) 5383 5384 def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None: 5385 """Register a hook to be called after the command function.""" 5386 self._validate_prepostcmd_hook(func, plugin.PostcommandData) 5387 self._postcmd_hooks.append(func) 5388 5389 @classmethod 5390 def _validate_cmdfinalization_callable( 5391 cls, func: Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData] 5392 ) -> None: 5393 """Check parameter and return types for command finalization hooks.""" 5394 cls._validate_callable_param_count(func, 1) 5395 signature = inspect.signature(func) 5396 _, param = list(signature.parameters.items())[0] 5397 if param.annotation != plugin.CommandFinalizationData: 5398 raise TypeError(f"{func.__name__} must have one parameter declared with type {plugin.CommandFinalizationData}") 5399 if signature.return_annotation != plugin.CommandFinalizationData: 5400 raise TypeError("{func.__name__} must declare return a return type of {plugin.CommandFinalizationData}") 5401 5402 def register_cmdfinalization_hook( 5403 self, func: Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData] 5404 ) -> None: 5405 """Register a hook to be called after a command is completed, whether it completes successfully or not.""" 5406 self._validate_cmdfinalization_callable(func) 5407 self._cmdfinalization_hooks.append(func) 5408 5409 def _resolve_func_self( 5410 self, 5411 cmd_support_func: Callable[..., Any], 5412 cmd_self: Union[CommandSet, 'Cmd', None], 5413 ) -> Optional[object]: 5414 """ 5415 Attempt to resolve a candidate instance to pass as 'self' for an unbound class method that was 5416 used when defining command's argparse object. Since we restrict registration to only a single CommandSet 5417 instance of each type, using type is a reasonably safe way to resolve the correct object instance 5418 5419 :param cmd_support_func: command support function. This could be a completer or namespace provider 5420 :param cmd_self: The `self` associated with the command or subcommand 5421 """ 5422 # figure out what class the command support function was defined in 5423 func_class: Optional[Type[Any]] = get_defining_class(cmd_support_func) 5424 5425 # Was there a defining class identified? If so, is it a sub-class of CommandSet? 5426 if func_class is not None and issubclass(func_class, CommandSet): 5427 # Since the support function is provided as an unbound function, we need to locate the instance 5428 # of the CommandSet to pass in as `self` to emulate a bound method call. 5429 # We're searching for candidates that match the support function's defining class type in this order: 5430 # 1. Is the command's CommandSet a sub-class of the support function's class? 5431 # 2. Do any of the registered CommandSets in the Cmd2 application exactly match the type? 5432 # 3. Is there a registered CommandSet that is is the only matching subclass? 5433 5434 func_self: Optional[Union[CommandSet, 'Cmd']] 5435 5436 # check if the command's CommandSet is a sub-class of the support function's defining class 5437 if isinstance(cmd_self, func_class): 5438 # Case 1: Command's CommandSet is a sub-class of the support function's CommandSet 5439 func_self = cmd_self 5440 else: 5441 # Search all registered CommandSets 5442 func_self = None 5443 candidate_sets: List[CommandSet] = [] 5444 for installed_cmd_set in self._installed_command_sets: 5445 if type(installed_cmd_set) == func_class: 5446 # Case 2: CommandSet is an exact type match for the function's CommandSet 5447 func_self = installed_cmd_set 5448 break 5449 5450 # Add candidate for Case 3: 5451 if isinstance(installed_cmd_set, func_class): 5452 candidate_sets.append(installed_cmd_set) 5453 if func_self is None and len(candidate_sets) == 1: 5454 # Case 3: There exists exactly 1 CommandSet that is a sub-class match of the function's CommandSet 5455 func_self = candidate_sets[0] 5456 return func_self 5457 else: 5458 return self 5459