1# coding=utf-8 2"""Shared utility functions""" 3import argparse 4import collections 5import functools 6import glob 7import inspect 8import itertools 9import os 10import re 11import subprocess 12import sys 13import threading 14import unicodedata 15from enum import ( 16 Enum, 17) 18from typing import ( 19 TYPE_CHECKING, 20 Any, 21 Callable, 22 Dict, 23 Iterable, 24 List, 25 Optional, 26 TextIO, 27 Type, 28 TypeVar, 29 Union, 30 cast, 31) 32 33from . import ( 34 constants, 35) 36from .argparse_custom import ( 37 ChoicesProviderFunc, 38 CompleterFunc, 39) 40 41if TYPE_CHECKING: # pragma: no cover 42 import cmd2 # noqa: F401 43 44 PopenTextIO = subprocess.Popen[bytes] 45 46else: 47 PopenTextIO = subprocess.Popen 48 49 50_T = TypeVar('_T') 51 52 53def is_quoted(arg: str) -> bool: 54 """ 55 Checks if a string is quoted 56 57 :param arg: the string being checked for quotes 58 :return: True if a string is quoted 59 """ 60 return len(arg) > 1 and arg[0] == arg[-1] and arg[0] in constants.QUOTES 61 62 63def quote_string(arg: str) -> str: 64 """Quote a string""" 65 if '"' in arg: 66 quote = "'" 67 else: 68 quote = '"' 69 70 return quote + arg + quote 71 72 73def quote_string_if_needed(arg: str) -> str: 74 """Quote a string if it contains spaces and isn't already quoted""" 75 if is_quoted(arg) or ' ' not in arg: 76 return arg 77 78 return quote_string(arg) 79 80 81def strip_quotes(arg: str) -> str: 82 """Strip outer quotes from a string. 83 84 Applies to both single and double quotes. 85 86 :param arg: string to strip outer quotes from 87 :return: same string with potentially outer quotes stripped 88 """ 89 if is_quoted(arg): 90 arg = arg[1:-1] 91 return arg 92 93 94def str_to_bool(val: str) -> bool: 95 """Converts a string to a boolean based on its value. 96 97 :param val: string being converted 98 :return: boolean value expressed in the string 99 :raises: ValueError if the string does not contain a value corresponding to a boolean value 100 """ 101 if isinstance(val, str): 102 if val.capitalize() == str(True): 103 return True 104 elif val.capitalize() == str(False): 105 return False 106 raise ValueError("must be True or False (case-insensitive)") 107 108 109class Settable: 110 """Used to configure an attribute to be settable via the set command in the CLI""" 111 112 def __init__( 113 self, 114 name: str, 115 val_type: Union[Type[Any], Callable[[Any], Any]], 116 description: str, 117 settable_object: object, 118 *, 119 settable_attrib_name: Optional[str] = None, 120 onchange_cb: Optional[Callable[[str, _T, _T], Any]] = None, 121 choices: Optional[Iterable[Any]] = None, 122 choices_provider: Optional[ChoicesProviderFunc] = None, 123 completer: Optional[CompleterFunc] = None, 124 ) -> None: 125 """ 126 Settable Initializer 127 128 :param name: name of the instance attribute being made settable 129 :param val_type: callable used to cast the string value from the command line into its proper type and 130 even validate its value. Setting this to bool provides tab completion for true/false and 131 validation using str_to_bool(). The val_type function should raise an exception if it fails. 132 This exception will be caught and printed by Cmd.do_set(). 133 :param description: string describing this setting 134 :param settable_object: object to which the instance attribute belongs (e.g. self) 135 :param settable_attrib_name: name which displays to the user in the output of the set command. 136 Defaults to `name` if not specified. 137 :param onchange_cb: optional function or method to call when the value of this settable is altered 138 by the set command. (e.g. onchange_cb=self.debug_changed) 139 140 Cmd.do_set() passes the following 3 arguments to onchange_cb: 141 param_name: str - name of the changed parameter 142 old_value: Any - the value before being changed 143 new_value: Any - the value after being changed 144 145 The following optional settings provide tab completion for a parameter's values. They correspond to the 146 same settings in argparse-based tab completion. A maximum of one of these should be provided. 147 148 :param choices: iterable of accepted values 149 :param choices_provider: function that provides choices for this argument 150 :param completer: tab completion function that provides choices for this argument 151 """ 152 if val_type == bool: 153 154 def get_bool_choices(_) -> List[str]: # type: ignore[no-untyped-def] 155 """Used to tab complete lowercase boolean values""" 156 return ['true', 'false'] 157 158 val_type = str_to_bool 159 choices_provider = cast(ChoicesProviderFunc, get_bool_choices) 160 161 self.name = name 162 self.val_type = val_type 163 self.description = description 164 self.settable_obj = settable_object 165 self.settable_attrib_name = settable_attrib_name if settable_attrib_name is not None else name 166 self.onchange_cb = onchange_cb 167 self.choices = choices 168 self.choices_provider = choices_provider 169 self.completer = completer 170 171 def get_value(self) -> Any: 172 """ 173 Get the value of the settable attribute 174 :return: 175 """ 176 return getattr(self.settable_obj, self.settable_attrib_name) 177 178 def set_value(self, value: Any) -> Any: 179 """ 180 Set the settable attribute on the specified destination object 181 :param value: New value to set 182 :return: New value that the attribute was set to 183 """ 184 # Run the value through its type function to handle any conversion or validation 185 new_value = self.val_type(value) 186 187 # Make sure new_value is a valid choice 188 if self.choices is not None and new_value not in self.choices: 189 choices_str = ', '.join(map(repr, self.choices)) 190 raise ValueError(f"invalid choice: {new_value!r} (choose from {choices_str})") 191 192 # Try to update the settable's value 193 orig_value = self.get_value() 194 setattr(self.settable_obj, self.settable_attrib_name, new_value) 195 196 # Check if we need to call an onchange callback 197 if orig_value != new_value and self.onchange_cb: 198 self.onchange_cb(self.name, orig_value, new_value) 199 return new_value 200 201 202def is_text_file(file_path: str) -> bool: 203 """Returns if a file contains only ASCII or UTF-8 encoded text and isn't empty. 204 205 :param file_path: path to the file being checked 206 :return: True if the file is a non-empty text file, otherwise False 207 :raises OSError if file can't be read 208 """ 209 import codecs 210 211 expanded_path = os.path.abspath(os.path.expanduser(file_path.strip())) 212 valid_text_file = False 213 214 # Only need to check for utf-8 compliance since that covers ASCII, too 215 try: 216 with codecs.open(expanded_path, encoding='utf-8', errors='strict') as f: 217 # Make sure the file has only utf-8 text and is not empty 218 if sum(1 for _ in f) > 0: 219 valid_text_file = True 220 except OSError: 221 raise 222 except UnicodeDecodeError: 223 # Not UTF-8 224 pass 225 226 return valid_text_file 227 228 229def remove_duplicates(list_to_prune: List[_T]) -> List[_T]: 230 """Removes duplicates from a list while preserving order of the items. 231 232 :param list_to_prune: the list being pruned of duplicates 233 :return: The pruned list 234 """ 235 temp_dict: collections.OrderedDict[_T, Any] = collections.OrderedDict() 236 for item in list_to_prune: 237 temp_dict[item] = None 238 239 return list(temp_dict.keys()) 240 241 242def norm_fold(astr: str) -> str: 243 """Normalize and casefold Unicode strings for saner comparisons. 244 245 :param astr: input unicode string 246 :return: a normalized and case-folded version of the input string 247 """ 248 return unicodedata.normalize('NFC', astr).casefold() 249 250 251def alphabetical_sort(list_to_sort: Iterable[str]) -> List[str]: 252 """Sorts a list of strings alphabetically. 253 254 For example: ['a1', 'A11', 'A2', 'a22', 'a3'] 255 256 To sort a list in place, don't call this method, which makes a copy. Instead, do this: 257 258 my_list.sort(key=norm_fold) 259 260 :param list_to_sort: the list being sorted 261 :return: the sorted list 262 """ 263 return sorted(list_to_sort, key=norm_fold) 264 265 266def try_int_or_force_to_lower_case(input_str: str) -> Union[int, str]: 267 """ 268 Tries to convert the passed-in string to an integer. If that fails, it converts it to lower case using norm_fold. 269 :param input_str: string to convert 270 :return: the string as an integer or a lower case version of the string 271 """ 272 try: 273 return int(input_str) 274 except ValueError: 275 return norm_fold(input_str) 276 277 278def natural_keys(input_str: str) -> List[Union[int, str]]: 279 """ 280 Converts a string into a list of integers and strings to support natural sorting (see natural_sort). 281 282 For example: natural_keys('abc123def') -> ['abc', '123', 'def'] 283 :param input_str: string to convert 284 :return: list of strings and integers 285 """ 286 return [try_int_or_force_to_lower_case(substr) for substr in re.split(r'(\d+)', input_str)] 287 288 289def natural_sort(list_to_sort: Iterable[str]) -> List[str]: 290 """ 291 Sorts a list of strings case insensitively as well as numerically. 292 293 For example: ['a1', 'A2', 'a3', 'A11', 'a22'] 294 295 To sort a list in place, don't call this method, which makes a copy. Instead, do this: 296 297 my_list.sort(key=natural_keys) 298 299 :param list_to_sort: the list being sorted 300 :return: the list sorted naturally 301 """ 302 return sorted(list_to_sort, key=natural_keys) 303 304 305def quote_specific_tokens(tokens: List[str], tokens_to_quote: List[str]) -> None: 306 """ 307 Quote specific tokens in a list 308 309 :param tokens: token list being edited 310 :param tokens_to_quote: the tokens, which if present in tokens, to quote 311 """ 312 for i, token in enumerate(tokens): 313 if token in tokens_to_quote: 314 tokens[i] = quote_string(token) 315 316 317def unquote_specific_tokens(tokens: List[str], tokens_to_unquote: List[str]) -> None: 318 """ 319 Unquote specific tokens in a list 320 321 :param tokens: token list being edited 322 :param tokens_to_unquote: the tokens, which if present in tokens, to unquote 323 """ 324 for i, token in enumerate(tokens): 325 unquoted_token = strip_quotes(token) 326 if unquoted_token in tokens_to_unquote: 327 tokens[i] = unquoted_token 328 329 330def expand_user(token: str) -> str: 331 """ 332 Wrap os.expanduser() to support expanding ~ in quoted strings 333 :param token: the string to expand 334 """ 335 if token: 336 if is_quoted(token): 337 quote_char = token[0] 338 token = strip_quotes(token) 339 else: 340 quote_char = '' 341 342 token = os.path.expanduser(token) 343 344 # Restore the quotes even if not needed to preserve what the user typed 345 if quote_char: 346 token = quote_char + token + quote_char 347 348 return token 349 350 351def expand_user_in_tokens(tokens: List[str]) -> None: 352 """ 353 Call expand_user() on all tokens in a list of strings 354 :param tokens: tokens to expand 355 """ 356 for index, _ in enumerate(tokens): 357 tokens[index] = expand_user(tokens[index]) 358 359 360def find_editor() -> Optional[str]: 361 """ 362 Used to set cmd2.Cmd.DEFAULT_EDITOR. If EDITOR env variable is set, that will be used. 363 Otherwise the function will look for a known editor in directories specified by PATH env variable. 364 :return: Default editor or None 365 """ 366 editor = os.environ.get('EDITOR') 367 if not editor: 368 if sys.platform[:3] == 'win': 369 editors = ['code.cmd', 'notepad++.exe', 'notepad.exe'] 370 else: 371 editors = ['vim', 'vi', 'emacs', 'nano', 'pico', 'joe', 'code', 'subl', 'atom', 'gedit', 'geany', 'kate'] 372 373 # Get a list of every directory in the PATH environment variable and ignore symbolic links 374 env_path = os.getenv('PATH') 375 if env_path is None: 376 paths = [] 377 else: 378 paths = [p for p in env_path.split(os.path.pathsep) if not os.path.islink(p)] 379 380 for editor, path in itertools.product(editors, paths): 381 editor_path = os.path.join(path, editor) 382 if os.path.isfile(editor_path) and os.access(editor_path, os.X_OK): 383 if sys.platform[:3] == 'win': 384 # Remove extension from Windows file names 385 editor = os.path.splitext(editor)[0] 386 break 387 else: 388 editor = None 389 390 return editor 391 392 393def files_from_glob_pattern(pattern: str, access: int = os.F_OK) -> List[str]: 394 """Return a list of file paths based on a glob pattern. 395 396 Only files are returned, not directories, and optionally only files for which the user has a specified access to. 397 398 :param pattern: file name or glob pattern 399 :param access: file access type to verify (os.* where * is F_OK, R_OK, W_OK, or X_OK) 400 :return: list of files matching the name or glob pattern 401 """ 402 return [f for f in glob.glob(pattern) if os.path.isfile(f) and os.access(f, access)] 403 404 405def files_from_glob_patterns(patterns: List[str], access: int = os.F_OK) -> List[str]: 406 """Return a list of file paths based on a list of glob patterns. 407 408 Only files are returned, not directories, and optionally only files for which the user has a specified access to. 409 410 :param patterns: list of file names and/or glob patterns 411 :param access: file access type to verify (os.* where * is F_OK, R_OK, W_OK, or X_OK) 412 :return: list of files matching the names and/or glob patterns 413 """ 414 files = [] 415 for pattern in patterns: 416 matches = files_from_glob_pattern(pattern, access=access) 417 files.extend(matches) 418 return files 419 420 421def get_exes_in_path(starts_with: str) -> List[str]: 422 """Returns names of executables in a user's path 423 424 :param starts_with: what the exes should start with. leave blank for all exes in path. 425 :return: a list of matching exe names 426 """ 427 # Purposely don't match any executable containing wildcards 428 wildcards = ['*', '?'] 429 for wildcard in wildcards: 430 if wildcard in starts_with: 431 return [] 432 433 # Get a list of every directory in the PATH environment variable and ignore symbolic links 434 env_path = os.getenv('PATH') 435 if env_path is None: 436 paths = [] 437 else: 438 paths = [p for p in env_path.split(os.path.pathsep) if not os.path.islink(p)] 439 440 # Use a set to store exe names since there can be duplicates 441 exes_set = set() 442 443 # Find every executable file in the user's path that matches the pattern 444 for path in paths: 445 full_path = os.path.join(path, starts_with) 446 matches = files_from_glob_pattern(full_path + '*', access=os.X_OK) 447 448 for match in matches: 449 exes_set.add(os.path.basename(match)) 450 451 return list(exes_set) 452 453 454class StdSim: 455 """ 456 Class to simulate behavior of sys.stdout or sys.stderr. 457 Stores contents in internal buffer and optionally echos to the inner stream it is simulating. 458 """ 459 460 def __init__( 461 self, 462 inner_stream: Union[TextIO, 'StdSim'], 463 *, 464 echo: bool = False, 465 encoding: str = 'utf-8', 466 errors: str = 'replace', 467 ) -> None: 468 """ 469 StdSim Initializer 470 471 :param inner_stream: the wrapped stream. Should be a TextIO or StdSim instance. 472 :param echo: if True, then all input will be echoed to inner_stream 473 :param encoding: codec for encoding/decoding strings (defaults to utf-8) 474 :param errors: how to handle encoding/decoding errors (defaults to replace) 475 """ 476 self.inner_stream = inner_stream 477 self.echo = echo 478 self.encoding = encoding 479 self.errors = errors 480 self.pause_storage = False 481 self.buffer = ByteBuf(self) 482 483 def write(self, s: str) -> None: 484 """ 485 Add str to internal bytes buffer and if echo is True, echo contents to inner stream 486 487 :param s: String to write to the stream 488 """ 489 if not isinstance(s, str): 490 raise TypeError(f'write() argument must be str, not {type(s)}') 491 492 if not self.pause_storage: 493 self.buffer.byte_buf += s.encode(encoding=self.encoding, errors=self.errors) 494 if self.echo: 495 self.inner_stream.write(s) 496 497 def getvalue(self) -> str: 498 """Get the internal contents as a str""" 499 return self.buffer.byte_buf.decode(encoding=self.encoding, errors=self.errors) 500 501 def getbytes(self) -> bytes: 502 """Get the internal contents as bytes""" 503 return bytes(self.buffer.byte_buf) 504 505 def read(self, size: Optional[int] = -1) -> str: 506 """ 507 Read from the internal contents as a str and then clear them out 508 509 :param size: Number of bytes to read from the stream 510 """ 511 if size is None or size == -1: 512 result = self.getvalue() 513 self.clear() 514 else: 515 result = self.buffer.byte_buf[:size].decode(encoding=self.encoding, errors=self.errors) 516 self.buffer.byte_buf = self.buffer.byte_buf[size:] 517 518 return result 519 520 def readbytes(self) -> bytes: 521 """Read from the internal contents as bytes and then clear them out""" 522 result = self.getbytes() 523 self.clear() 524 return result 525 526 def clear(self) -> None: 527 """Clear the internal contents""" 528 self.buffer.byte_buf.clear() 529 530 def isatty(self) -> bool: 531 """StdSim only considered an interactive stream if `echo` is True and `inner_stream` is a tty.""" 532 if self.echo: 533 return self.inner_stream.isatty() 534 else: 535 return False 536 537 @property 538 def line_buffering(self) -> bool: 539 """ 540 Handle when the inner stream doesn't have a line_buffering attribute which is the case 541 when running unit tests because pytest sets stdout to a pytest EncodedFile object. 542 """ 543 try: 544 return bool(self.inner_stream.line_buffering) 545 except AttributeError: 546 return False 547 548 def __getattr__(self, item: str) -> Any: 549 if item in self.__dict__: 550 return self.__dict__[item] 551 else: 552 return getattr(self.inner_stream, item) 553 554 555class ByteBuf: 556 """ 557 Used by StdSim to write binary data and stores the actual bytes written 558 """ 559 560 # Used to know when to flush the StdSim 561 NEWLINES = [b'\n', b'\r'] 562 563 def __init__(self, std_sim_instance: StdSim) -> None: 564 self.byte_buf = bytearray() 565 self.std_sim_instance = std_sim_instance 566 567 def write(self, b: bytes) -> None: 568 """Add bytes to internal bytes buffer and if echo is True, echo contents to inner stream.""" 569 if not isinstance(b, bytes): 570 raise TypeError(f'a bytes-like object is required, not {type(b)}') 571 if not self.std_sim_instance.pause_storage: 572 self.byte_buf += b 573 if self.std_sim_instance.echo: 574 self.std_sim_instance.inner_stream.buffer.write(b) 575 576 # Since StdSim wraps TextIO streams, we will flush the stream if line buffering is on 577 # and the bytes being written contain a new line character. This is helpful when StdSim 578 # is being used to capture output of a shell command because it causes the output to print 579 # to the screen more often than if we waited for the stream to flush its buffer. 580 if self.std_sim_instance.line_buffering: 581 if any(newline in b for newline in ByteBuf.NEWLINES): 582 self.std_sim_instance.flush() 583 584 585class ProcReader: 586 """ 587 Used to capture stdout and stderr from a Popen process if any of those were set to subprocess.PIPE. 588 If neither are pipes, then the process will run normally and no output will be captured. 589 """ 590 591 def __init__(self, proc: PopenTextIO, stdout: Union[StdSim, TextIO], stderr: Union[StdSim, TextIO]) -> None: 592 """ 593 ProcReader initializer 594 :param proc: the Popen process being read from 595 :param stdout: the stream to write captured stdout 596 :param stderr: the stream to write captured stderr 597 """ 598 self._proc = proc 599 self._stdout = stdout 600 self._stderr = stderr 601 602 self._out_thread = threading.Thread(name='out_thread', target=self._reader_thread_func, kwargs={'read_stdout': True}) 603 604 self._err_thread = threading.Thread(name='err_thread', target=self._reader_thread_func, kwargs={'read_stdout': False}) 605 606 # Start the reader threads for pipes only 607 if self._proc.stdout is not None: 608 self._out_thread.start() 609 if self._proc.stderr is not None: 610 self._err_thread.start() 611 612 def send_sigint(self) -> None: 613 """Send a SIGINT to the process similar to if <Ctrl>+C were pressed""" 614 import signal 615 616 if sys.platform.startswith('win'): 617 # cmd2 started the Windows process in a new process group. Therefore we must send 618 # a CTRL_BREAK_EVENT since CTRL_C_EVENT signals cannot be generated for process groups. 619 self._proc.send_signal(signal.CTRL_BREAK_EVENT) 620 else: 621 # Since cmd2 uses shell=True in its Popen calls, we need to send the SIGINT to 622 # the whole process group to make sure it propagates further than the shell 623 try: 624 group_id = os.getpgid(self._proc.pid) 625 os.killpg(group_id, signal.SIGINT) 626 except ProcessLookupError: 627 return 628 629 def terminate(self) -> None: 630 """Terminate the process""" 631 self._proc.terminate() 632 633 def wait(self) -> None: 634 """Wait for the process to finish""" 635 if self._out_thread.is_alive(): 636 self._out_thread.join() 637 if self._err_thread.is_alive(): 638 self._err_thread.join() 639 640 # Handle case where the process ended before the last read could be done. 641 # This will return None for the streams that weren't pipes. 642 out, err = self._proc.communicate() 643 644 if out: 645 self._write_bytes(self._stdout, out) 646 if err: 647 self._write_bytes(self._stderr, err) 648 649 def _reader_thread_func(self, read_stdout: bool) -> None: 650 """ 651 Thread function that reads a stream from the process 652 :param read_stdout: if True, then this thread deals with stdout. Otherwise it deals with stderr. 653 """ 654 if read_stdout: 655 read_stream = self._proc.stdout 656 write_stream = self._stdout 657 else: 658 read_stream = self._proc.stderr 659 write_stream = self._stderr 660 661 # The thread should have been started only if this stream was a pipe 662 assert read_stream is not None 663 664 # Run until process completes 665 while self._proc.poll() is None: 666 # noinspection PyUnresolvedReferences 667 available = read_stream.peek() # type: ignore[attr-defined] 668 if available: 669 read_stream.read(len(available)) 670 self._write_bytes(write_stream, available) 671 672 @staticmethod 673 def _write_bytes(stream: Union[StdSim, TextIO], to_write: bytes) -> None: 674 """ 675 Write bytes to a stream 676 :param stream: the stream being written to 677 :param to_write: the bytes being written 678 """ 679 try: 680 stream.buffer.write(to_write) 681 except BrokenPipeError: 682 # This occurs if output is being piped to a process that closed 683 pass 684 685 686class ContextFlag: 687 """A context manager which is also used as a boolean flag value within the default sigint handler. 688 689 Its main use is as a flag to prevent the SIGINT handler in cmd2 from raising a KeyboardInterrupt 690 while a critical code section has set the flag to True. Because signal handling is always done on the 691 main thread, this class is not thread-safe since there is no need. 692 """ 693 694 def __init__(self) -> None: 695 # When this flag has a positive value, it is considered set. 696 # When it is 0, it is not set. It should never go below 0. 697 self.__count = 0 698 699 def __bool__(self) -> bool: 700 return self.__count > 0 701 702 def __enter__(self) -> None: 703 self.__count += 1 704 705 def __exit__(self, *args: Any) -> None: 706 self.__count -= 1 707 if self.__count < 0: 708 raise ValueError("count has gone below 0") 709 710 711class RedirectionSavedState: 712 """Created by each command to store information required to restore state after redirection""" 713 714 def __init__( 715 self, 716 self_stdout: Union[StdSim, TextIO], 717 sys_stdout: Union[StdSim, TextIO], 718 pipe_proc_reader: Optional[ProcReader], 719 saved_redirecting: bool, 720 ) -> None: 721 """ 722 RedirectionSavedState initializer 723 :param self_stdout: saved value of Cmd.stdout 724 :param sys_stdout: saved value of sys.stdout 725 :param pipe_proc_reader: saved value of Cmd._cur_pipe_proc_reader 726 :param saved_redirecting: saved value of Cmd._redirecting 727 """ 728 # Tells if command is redirecting 729 self.redirecting = False 730 731 # Used to restore values after redirection ends 732 self.saved_self_stdout = self_stdout 733 self.saved_sys_stdout = sys_stdout 734 735 # Used to restore values after command ends regardless of whether the command redirected 736 self.saved_pipe_proc_reader = pipe_proc_reader 737 self.saved_redirecting = saved_redirecting 738 739 740class TextAlignment(Enum): 741 """Horizontal text alignment""" 742 743 LEFT = 1 744 CENTER = 2 745 RIGHT = 3 746 747 748def align_text( 749 text: str, 750 alignment: TextAlignment, 751 *, 752 fill_char: str = ' ', 753 width: Optional[int] = None, 754 tab_width: int = 4, 755 truncate: bool = False, 756) -> str: 757 """ 758 Align text for display within a given width. Supports characters with display widths greater than 1. 759 ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned 760 independently. 761 762 There are convenience wrappers around this function: align_left(), align_center(), and align_right() 763 764 :param text: text to align (can contain multiple lines) 765 :param alignment: how to align the text 766 :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character) 767 :param width: display width of the aligned text. Defaults to width of the terminal. 768 :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will 769 be converted to one space. 770 :param truncate: if True, then each line will be shortened to fit within the display width. The truncated 771 portions are replaced by a '…' character. Defaults to False. 772 :return: aligned text 773 :raises: TypeError if fill_char is more than one character (not including ANSI style sequences) 774 :raises: ValueError if text or fill_char contains an unprintable character 775 :raises: ValueError if width is less than 1 776 """ 777 import io 778 import shutil 779 780 from . import ( 781 ansi, 782 ) 783 784 if width is None: 785 width = shutil.get_terminal_size().columns 786 787 if width < 1: 788 raise ValueError("width must be at least 1") 789 790 # Convert tabs to spaces 791 text = text.replace('\t', ' ' * tab_width) 792 fill_char = fill_char.replace('\t', ' ') 793 794 # Save fill_char with no styles for use later 795 stripped_fill_char = ansi.strip_style(fill_char) 796 if len(stripped_fill_char) != 1: 797 raise TypeError("Fill character must be exactly one character long") 798 799 fill_char_width = ansi.style_aware_wcswidth(fill_char) 800 if fill_char_width == -1: 801 raise (ValueError("Fill character is an unprintable character")) 802 803 # Isolate the style chars before and after the fill character. We will use them when building sequences of 804 # of fill characters. Instead of repeating the style characters for each fill character, we'll wrap each sequence. 805 fill_char_style_begin, fill_char_style_end = fill_char.split(stripped_fill_char) 806 807 if text: 808 lines = text.splitlines() 809 else: 810 lines = [''] 811 812 text_buf = io.StringIO() 813 814 # ANSI style sequences that may affect future lines will be cancelled by the fill_char's style. 815 # To avoid this, we save the state of a line's style so we can restore it when beginning the next line. 816 # This also allows the lines to be used independently and still have their style. TableCreator does this. 817 aggregate_styles = '' 818 819 for index, line in enumerate(lines): 820 if index > 0: 821 text_buf.write('\n') 822 823 if truncate: 824 line = truncate_line(line, width) 825 826 line_width = ansi.style_aware_wcswidth(line) 827 if line_width == -1: 828 raise (ValueError("Text to align contains an unprintable character")) 829 830 # Get the styles in this line 831 line_styles = get_styles_in_text(line) 832 833 # Calculate how wide each side of filling needs to be 834 if line_width >= width: 835 # Don't return here even though the line needs no fill chars. 836 # There may be styles sequences to restore. 837 total_fill_width = 0 838 else: 839 total_fill_width = width - line_width 840 841 if alignment == TextAlignment.LEFT: 842 left_fill_width = 0 843 right_fill_width = total_fill_width 844 elif alignment == TextAlignment.CENTER: 845 left_fill_width = total_fill_width // 2 846 right_fill_width = total_fill_width - left_fill_width 847 else: 848 left_fill_width = total_fill_width 849 right_fill_width = 0 850 851 # Determine how many fill characters are needed to cover the width 852 left_fill = (left_fill_width // fill_char_width) * stripped_fill_char 853 right_fill = (right_fill_width // fill_char_width) * stripped_fill_char 854 855 # In cases where the fill character display width didn't divide evenly into 856 # the gap being filled, pad the remainder with space. 857 left_fill += ' ' * (left_fill_width - ansi.style_aware_wcswidth(left_fill)) 858 right_fill += ' ' * (right_fill_width - ansi.style_aware_wcswidth(right_fill)) 859 860 # Don't allow styles in fill characters and text to affect one another 861 if fill_char_style_begin or fill_char_style_end or aggregate_styles or line_styles: 862 if left_fill: 863 left_fill = ansi.TextStyle.RESET_ALL + fill_char_style_begin + left_fill + fill_char_style_end 864 left_fill += ansi.TextStyle.RESET_ALL 865 866 if right_fill: 867 right_fill = ansi.TextStyle.RESET_ALL + fill_char_style_begin + right_fill + fill_char_style_end 868 right_fill += ansi.TextStyle.RESET_ALL 869 870 # Write the line and restore any styles from previous lines 871 text_buf.write(left_fill + aggregate_styles + line + right_fill) 872 873 # Update the aggregate with styles in this line 874 aggregate_styles += ''.join(line_styles.values()) 875 876 return text_buf.getvalue() 877 878 879def align_left( 880 text: str, *, fill_char: str = ' ', width: Optional[int] = None, tab_width: int = 4, truncate: bool = False 881) -> str: 882 """ 883 Left align text for display within a given width. Supports characters with display widths greater than 1. 884 ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned 885 independently. 886 887 :param text: text to left align (can contain multiple lines) 888 :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character) 889 :param width: display width of the aligned text. Defaults to width of the terminal. 890 :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will 891 be converted to one space. 892 :param truncate: if True, then text will be shortened to fit within the display width. The truncated portion is 893 replaced by a '…' character. Defaults to False. 894 :return: left-aligned text 895 :raises: TypeError if fill_char is more than one character (not including ANSI style sequences) 896 :raises: ValueError if text or fill_char contains an unprintable character 897 :raises: ValueError if width is less than 1 898 """ 899 return align_text(text, TextAlignment.LEFT, fill_char=fill_char, width=width, tab_width=tab_width, truncate=truncate) 900 901 902def align_center( 903 text: str, *, fill_char: str = ' ', width: Optional[int] = None, tab_width: int = 4, truncate: bool = False 904) -> str: 905 """ 906 Center text for display within a given width. Supports characters with display widths greater than 1. 907 ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned 908 independently. 909 910 :param text: text to center (can contain multiple lines) 911 :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character) 912 :param width: display width of the aligned text. Defaults to width of the terminal. 913 :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will 914 be converted to one space. 915 :param truncate: if True, then text will be shortened to fit within the display width. The truncated portion is 916 replaced by a '…' character. Defaults to False. 917 :return: centered text 918 :raises: TypeError if fill_char is more than one character (not including ANSI style sequences) 919 :raises: ValueError if text or fill_char contains an unprintable character 920 :raises: ValueError if width is less than 1 921 """ 922 return align_text(text, TextAlignment.CENTER, fill_char=fill_char, width=width, tab_width=tab_width, truncate=truncate) 923 924 925def align_right( 926 text: str, *, fill_char: str = ' ', width: Optional[int] = None, tab_width: int = 4, truncate: bool = False 927) -> str: 928 """ 929 Right align text for display within a given width. Supports characters with display widths greater than 1. 930 ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned 931 independently. 932 933 :param text: text to right align (can contain multiple lines) 934 :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character) 935 :param width: display width of the aligned text. Defaults to width of the terminal. 936 :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will 937 be converted to one space. 938 :param truncate: if True, then text will be shortened to fit within the display width. The truncated portion is 939 replaced by a '…' character. Defaults to False. 940 :return: right-aligned text 941 :raises: TypeError if fill_char is more than one character (not including ANSI style sequences) 942 :raises: ValueError if text or fill_char contains an unprintable character 943 :raises: ValueError if width is less than 1 944 """ 945 return align_text(text, TextAlignment.RIGHT, fill_char=fill_char, width=width, tab_width=tab_width, truncate=truncate) 946 947 948def truncate_line(line: str, max_width: int, *, tab_width: int = 4) -> str: 949 """ 950 Truncate a single line to fit within a given display width. Any portion of the string that is truncated 951 is replaced by a '…' character. Supports characters with display widths greater than 1. ANSI style sequences 952 do not count toward the display width. 953 954 If there are ANSI style sequences in the string after where truncation occurs, this function will append them 955 to the returned string. 956 957 This is done to prevent issues caused in cases like: truncate_line(Fg.BLUE + hello + Fg.RESET, 3) 958 In this case, "hello" would be truncated before Fg.RESET resets the color from blue. Appending the remaining style 959 sequences makes sure the style is in the same state had the entire string been printed. align_text() relies on this 960 behavior when preserving style over multiple lines. 961 962 :param line: text to truncate 963 :param max_width: the maximum display width the resulting string is allowed to have 964 :param tab_width: any tabs in the text will be replaced with this many spaces 965 :return: line that has a display width less than or equal to width 966 :raises: ValueError if text contains an unprintable character like a newline 967 :raises: ValueError if max_width is less than 1 968 """ 969 import io 970 971 from . import ( 972 ansi, 973 ) 974 975 # Handle tabs 976 line = line.replace('\t', ' ' * tab_width) 977 978 if ansi.style_aware_wcswidth(line) == -1: 979 raise (ValueError("text contains an unprintable character")) 980 981 if max_width < 1: 982 raise ValueError("max_width must be at least 1") 983 984 if ansi.style_aware_wcswidth(line) <= max_width: 985 return line 986 987 # Find all style sequences in the line 988 styles = get_styles_in_text(line) 989 990 # Add characters one by one and preserve all style sequences 991 done = False 992 index = 0 993 total_width = 0 994 truncated_buf = io.StringIO() 995 996 while not done: 997 # Check if a style sequence is at this index. These don't count toward display width. 998 if index in styles: 999 truncated_buf.write(styles[index]) 1000 style_len = len(styles[index]) 1001 styles.pop(index) 1002 index += style_len 1003 continue 1004 1005 char = line[index] 1006 char_width = ansi.style_aware_wcswidth(char) 1007 1008 # This char will make the text too wide, add the ellipsis instead 1009 if char_width + total_width >= max_width: 1010 char = constants.HORIZONTAL_ELLIPSIS 1011 char_width = ansi.style_aware_wcswidth(char) 1012 done = True 1013 1014 total_width += char_width 1015 truncated_buf.write(char) 1016 index += 1 1017 1018 # Append remaining style sequences from original string 1019 truncated_buf.write(''.join(styles.values())) 1020 1021 return truncated_buf.getvalue() 1022 1023 1024def get_styles_in_text(text: str) -> Dict[int, str]: 1025 """ 1026 Return an OrderedDict containing all ANSI style sequences found in a string 1027 1028 The structure of the dictionary is: 1029 key: index where sequences begins 1030 value: ANSI style sequence found at index in text 1031 1032 Keys are in ascending order 1033 1034 :param text: text to search for style sequences 1035 """ 1036 from . import ( 1037 ansi, 1038 ) 1039 1040 start = 0 1041 styles = collections.OrderedDict() 1042 1043 while True: 1044 match = ansi.ANSI_STYLE_RE.search(text, start) 1045 if match is None: 1046 break 1047 styles[match.start()] = match.group() 1048 start += len(match.group()) 1049 1050 return styles 1051 1052 1053def categorize(func: Union[Callable[..., Any], Iterable[Callable[..., Any]]], category: str) -> None: 1054 """Categorize a function. 1055 1056 The help command output will group the passed function under the 1057 specified category heading 1058 1059 :param func: function or list of functions to categorize 1060 :param category: category to put it in 1061 1062 :Example: 1063 1064 >>> import cmd2 1065 >>> class MyApp(cmd2.Cmd): 1066 >>> def do_echo(self, arglist): 1067 >>> self.poutput(' '.join(arglist) 1068 >>> 1069 >>> cmd2.utils.categorize(do_echo, "Text Processing") 1070 1071 For an alternative approach to categorizing commands using a decorator, see 1072 :func:`~cmd2.decorators.with_category` 1073 """ 1074 if isinstance(func, Iterable): 1075 for item in func: 1076 setattr(item, constants.CMD_ATTR_HELP_CATEGORY, category) 1077 else: 1078 if inspect.ismethod(func): 1079 setattr(func.__func__, constants.CMD_ATTR_HELP_CATEGORY, category) # type: ignore[attr-defined] 1080 else: 1081 setattr(func, constants.CMD_ATTR_HELP_CATEGORY, category) 1082 1083 1084def get_defining_class(meth: Callable[..., Any]) -> Optional[Type[Any]]: 1085 """ 1086 Attempts to resolve the class that defined a method. 1087 1088 Inspired by implementation published here: 1089 https://stackoverflow.com/a/25959545/1956611 1090 1091 :param meth: method to inspect 1092 :return: class type in which the supplied method was defined. None if it couldn't be resolved. 1093 """ 1094 if isinstance(meth, functools.partial): 1095 return get_defining_class(meth.func) 1096 if inspect.ismethod(meth) or ( 1097 inspect.isbuiltin(meth) 1098 and getattr(meth, '__self__') is not None 1099 and getattr(meth.__self__, '__class__') # type: ignore[attr-defined] 1100 ): 1101 for cls in inspect.getmro(meth.__self__.__class__): # type: ignore[attr-defined] 1102 if meth.__name__ in cls.__dict__: 1103 return cls 1104 meth = getattr(meth, '__func__', meth) # fallback to __qualname__ parsing 1105 if inspect.isfunction(meth): 1106 cls = getattr(inspect.getmodule(meth), meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]) 1107 if isinstance(cls, type): 1108 return cls 1109 return cast(type, getattr(meth, '__objclass__', None)) # handle special descriptor objects 1110 1111 1112class CompletionMode(Enum): 1113 """Enum for what type of tab completion to perform in cmd2.Cmd.read_input()""" 1114 1115 # Tab completion will be disabled during read_input() call 1116 # Use of custom up-arrow history supported 1117 NONE = 1 1118 1119 # read_input() will tab complete cmd2 commands and their arguments 1120 # cmd2's command line history will be used for up arrow if history is not provided. 1121 # Otherwise use of custom up-arrow history supported. 1122 COMMANDS = 2 1123 1124 # read_input() will tab complete based on one of its following parameters: 1125 # choices, choices_provider, completer, parser 1126 # Use of custom up-arrow history supported 1127 CUSTOM = 3 1128 1129 1130class CustomCompletionSettings: 1131 """Used by cmd2.Cmd.complete() to tab complete strings other than command arguments""" 1132 1133 def __init__(self, parser: argparse.ArgumentParser, *, preserve_quotes: bool = False) -> None: 1134 """ 1135 Initializer 1136 1137 :param parser: arg parser defining format of string being tab completed 1138 :param preserve_quotes: if True, then quoted tokens will keep their quotes when processed by 1139 ArgparseCompleter. This is helpful in cases when you're tab completing 1140 flag-like tokens (e.g. -o, --option) and you don't want them to be 1141 treated as argparse flags when quoted. Set this to True if you plan 1142 on passing the string to argparse with the tokens still quoted. 1143 """ 1144 self.parser = parser 1145 self.preserve_quotes = preserve_quotes 1146