1# coding=utf-8
2"""Shared utility functions"""
3import argparse
4import collections
5import functools
6import glob
7import inspect
8import itertools
9import os
10import re
11import subprocess
12import sys
13import threading
14import unicodedata
15from enum import (
16    Enum,
17)
18from typing import (
19    TYPE_CHECKING,
20    Any,
21    Callable,
22    Dict,
23    Iterable,
24    List,
25    Optional,
26    TextIO,
27    Type,
28    TypeVar,
29    Union,
30    cast,
31)
32
33from . import (
34    constants,
35)
36from .argparse_custom import (
37    ChoicesProviderFunc,
38    CompleterFunc,
39)
40
41if TYPE_CHECKING:  # pragma: no cover
42    import cmd2  # noqa: F401
43
44    PopenTextIO = subprocess.Popen[bytes]
45
46else:
47    PopenTextIO = subprocess.Popen
48
49
50_T = TypeVar('_T')
51
52
53def is_quoted(arg: str) -> bool:
54    """
55    Checks if a string is quoted
56
57    :param arg: the string being checked for quotes
58    :return: True if a string is quoted
59    """
60    return len(arg) > 1 and arg[0] == arg[-1] and arg[0] in constants.QUOTES
61
62
63def quote_string(arg: str) -> str:
64    """Quote a string"""
65    if '"' in arg:
66        quote = "'"
67    else:
68        quote = '"'
69
70    return quote + arg + quote
71
72
73def quote_string_if_needed(arg: str) -> str:
74    """Quote a string if it contains spaces and isn't already quoted"""
75    if is_quoted(arg) or ' ' not in arg:
76        return arg
77
78    return quote_string(arg)
79
80
81def strip_quotes(arg: str) -> str:
82    """Strip outer quotes from a string.
83
84     Applies to both single and double quotes.
85
86    :param arg:  string to strip outer quotes from
87    :return: same string with potentially outer quotes stripped
88    """
89    if is_quoted(arg):
90        arg = arg[1:-1]
91    return arg
92
93
94def str_to_bool(val: str) -> bool:
95    """Converts a string to a boolean based on its value.
96
97    :param val: string being converted
98    :return: boolean value expressed in the string
99    :raises: ValueError if the string does not contain a value corresponding to a boolean value
100    """
101    if isinstance(val, str):
102        if val.capitalize() == str(True):
103            return True
104        elif val.capitalize() == str(False):
105            return False
106    raise ValueError("must be True or False (case-insensitive)")
107
108
109class Settable:
110    """Used to configure an attribute to be settable via the set command in the CLI"""
111
112    def __init__(
113        self,
114        name: str,
115        val_type: Union[Type[Any], Callable[[Any], Any]],
116        description: str,
117        settable_object: object,
118        *,
119        settable_attrib_name: Optional[str] = None,
120        onchange_cb: Optional[Callable[[str, _T, _T], Any]] = None,
121        choices: Optional[Iterable[Any]] = None,
122        choices_provider: Optional[ChoicesProviderFunc] = None,
123        completer: Optional[CompleterFunc] = None,
124    ) -> None:
125        """
126        Settable Initializer
127
128        :param name: name of the instance attribute being made settable
129        :param val_type: callable used to cast the string value from the command line into its proper type and
130                         even validate its value. Setting this to bool provides tab completion for true/false and
131                         validation using str_to_bool(). The val_type function should raise an exception if it fails.
132                         This exception will be caught and printed by Cmd.do_set().
133        :param description: string describing this setting
134        :param settable_object: object to which the instance attribute belongs (e.g. self)
135        :param settable_attrib_name: name which displays to the user in the output of the set command.
136                                     Defaults to `name` if not specified.
137        :param onchange_cb: optional function or method to call when the value of this settable is altered
138                            by the set command. (e.g. onchange_cb=self.debug_changed)
139
140                            Cmd.do_set() passes the following 3 arguments to onchange_cb:
141                                param_name: str - name of the changed parameter
142                                old_value: Any - the value before being changed
143                                new_value: Any - the value after being changed
144
145        The following optional settings provide tab completion for a parameter's values. They correspond to the
146        same settings in argparse-based tab completion. A maximum of one of these should be provided.
147
148        :param choices: iterable of accepted values
149        :param choices_provider: function that provides choices for this argument
150        :param completer: tab completion function that provides choices for this argument
151        """
152        if val_type == bool:
153
154            def get_bool_choices(_) -> List[str]:  # type: ignore[no-untyped-def]
155                """Used to tab complete lowercase boolean values"""
156                return ['true', 'false']
157
158            val_type = str_to_bool
159            choices_provider = cast(ChoicesProviderFunc, get_bool_choices)
160
161        self.name = name
162        self.val_type = val_type
163        self.description = description
164        self.settable_obj = settable_object
165        self.settable_attrib_name = settable_attrib_name if settable_attrib_name is not None else name
166        self.onchange_cb = onchange_cb
167        self.choices = choices
168        self.choices_provider = choices_provider
169        self.completer = completer
170
171    def get_value(self) -> Any:
172        """
173        Get the value of the settable attribute
174        :return:
175        """
176        return getattr(self.settable_obj, self.settable_attrib_name)
177
178    def set_value(self, value: Any) -> Any:
179        """
180        Set the settable attribute on the specified destination object
181        :param value: New value to set
182        :return: New value that the attribute was set to
183        """
184        # Run the value through its type function to handle any conversion or validation
185        new_value = self.val_type(value)
186
187        # Make sure new_value is a valid choice
188        if self.choices is not None and new_value not in self.choices:
189            choices_str = ', '.join(map(repr, self.choices))
190            raise ValueError(f"invalid choice: {new_value!r} (choose from {choices_str})")
191
192        # Try to update the settable's value
193        orig_value = self.get_value()
194        setattr(self.settable_obj, self.settable_attrib_name, new_value)
195
196        # Check if we need to call an onchange callback
197        if orig_value != new_value and self.onchange_cb:
198            self.onchange_cb(self.name, orig_value, new_value)
199        return new_value
200
201
202def is_text_file(file_path: str) -> bool:
203    """Returns if a file contains only ASCII or UTF-8 encoded text and isn't empty.
204
205    :param file_path: path to the file being checked
206    :return: True if the file is a non-empty text file, otherwise False
207    :raises OSError if file can't be read
208    """
209    import codecs
210
211    expanded_path = os.path.abspath(os.path.expanduser(file_path.strip()))
212    valid_text_file = False
213
214    # Only need to check for utf-8 compliance since that covers ASCII, too
215    try:
216        with codecs.open(expanded_path, encoding='utf-8', errors='strict') as f:
217            # Make sure the file has only utf-8 text and is not empty
218            if sum(1 for _ in f) > 0:
219                valid_text_file = True
220    except OSError:
221        raise
222    except UnicodeDecodeError:
223        # Not UTF-8
224        pass
225
226    return valid_text_file
227
228
229def remove_duplicates(list_to_prune: List[_T]) -> List[_T]:
230    """Removes duplicates from a list while preserving order of the items.
231
232    :param list_to_prune: the list being pruned of duplicates
233    :return: The pruned list
234    """
235    temp_dict: collections.OrderedDict[_T, Any] = collections.OrderedDict()
236    for item in list_to_prune:
237        temp_dict[item] = None
238
239    return list(temp_dict.keys())
240
241
242def norm_fold(astr: str) -> str:
243    """Normalize and casefold Unicode strings for saner comparisons.
244
245    :param astr: input unicode string
246    :return: a normalized and case-folded version of the input string
247    """
248    return unicodedata.normalize('NFC', astr).casefold()
249
250
251def alphabetical_sort(list_to_sort: Iterable[str]) -> List[str]:
252    """Sorts a list of strings alphabetically.
253
254    For example: ['a1', 'A11', 'A2', 'a22', 'a3']
255
256    To sort a list in place, don't call this method, which makes a copy. Instead, do this:
257
258    my_list.sort(key=norm_fold)
259
260    :param list_to_sort: the list being sorted
261    :return: the sorted list
262    """
263    return sorted(list_to_sort, key=norm_fold)
264
265
266def try_int_or_force_to_lower_case(input_str: str) -> Union[int, str]:
267    """
268    Tries to convert the passed-in string to an integer. If that fails, it converts it to lower case using norm_fold.
269    :param input_str: string to convert
270    :return: the string as an integer or a lower case version of the string
271    """
272    try:
273        return int(input_str)
274    except ValueError:
275        return norm_fold(input_str)
276
277
278def natural_keys(input_str: str) -> List[Union[int, str]]:
279    """
280    Converts a string into a list of integers and strings to support natural sorting (see natural_sort).
281
282    For example: natural_keys('abc123def') -> ['abc', '123', 'def']
283    :param input_str: string to convert
284    :return: list of strings and integers
285    """
286    return [try_int_or_force_to_lower_case(substr) for substr in re.split(r'(\d+)', input_str)]
287
288
289def natural_sort(list_to_sort: Iterable[str]) -> List[str]:
290    """
291    Sorts a list of strings case insensitively as well as numerically.
292
293    For example: ['a1', 'A2', 'a3', 'A11', 'a22']
294
295    To sort a list in place, don't call this method, which makes a copy. Instead, do this:
296
297    my_list.sort(key=natural_keys)
298
299    :param list_to_sort: the list being sorted
300    :return: the list sorted naturally
301    """
302    return sorted(list_to_sort, key=natural_keys)
303
304
305def quote_specific_tokens(tokens: List[str], tokens_to_quote: List[str]) -> None:
306    """
307    Quote specific tokens in a list
308
309    :param tokens: token list being edited
310    :param tokens_to_quote: the tokens, which if present in tokens, to quote
311    """
312    for i, token in enumerate(tokens):
313        if token in tokens_to_quote:
314            tokens[i] = quote_string(token)
315
316
317def unquote_specific_tokens(tokens: List[str], tokens_to_unquote: List[str]) -> None:
318    """
319    Unquote specific tokens in a list
320
321    :param tokens: token list being edited
322    :param tokens_to_unquote: the tokens, which if present in tokens, to unquote
323    """
324    for i, token in enumerate(tokens):
325        unquoted_token = strip_quotes(token)
326        if unquoted_token in tokens_to_unquote:
327            tokens[i] = unquoted_token
328
329
330def expand_user(token: str) -> str:
331    """
332    Wrap os.expanduser() to support expanding ~ in quoted strings
333    :param token: the string to expand
334    """
335    if token:
336        if is_quoted(token):
337            quote_char = token[0]
338            token = strip_quotes(token)
339        else:
340            quote_char = ''
341
342        token = os.path.expanduser(token)
343
344        # Restore the quotes even if not needed to preserve what the user typed
345        if quote_char:
346            token = quote_char + token + quote_char
347
348    return token
349
350
351def expand_user_in_tokens(tokens: List[str]) -> None:
352    """
353    Call expand_user() on all tokens in a list of strings
354    :param tokens: tokens to expand
355    """
356    for index, _ in enumerate(tokens):
357        tokens[index] = expand_user(tokens[index])
358
359
360def find_editor() -> Optional[str]:
361    """
362    Used to set cmd2.Cmd.DEFAULT_EDITOR. If EDITOR env variable is set, that will be used.
363    Otherwise the function will look for a known editor in directories specified by PATH env variable.
364    :return: Default editor or None
365    """
366    editor = os.environ.get('EDITOR')
367    if not editor:
368        if sys.platform[:3] == 'win':
369            editors = ['code.cmd', 'notepad++.exe', 'notepad.exe']
370        else:
371            editors = ['vim', 'vi', 'emacs', 'nano', 'pico', 'joe', 'code', 'subl', 'atom', 'gedit', 'geany', 'kate']
372
373        # Get a list of every directory in the PATH environment variable and ignore symbolic links
374        env_path = os.getenv('PATH')
375        if env_path is None:
376            paths = []
377        else:
378            paths = [p for p in env_path.split(os.path.pathsep) if not os.path.islink(p)]
379
380        for editor, path in itertools.product(editors, paths):
381            editor_path = os.path.join(path, editor)
382            if os.path.isfile(editor_path) and os.access(editor_path, os.X_OK):
383                if sys.platform[:3] == 'win':
384                    # Remove extension from Windows file names
385                    editor = os.path.splitext(editor)[0]
386                break
387        else:
388            editor = None
389
390    return editor
391
392
393def files_from_glob_pattern(pattern: str, access: int = os.F_OK) -> List[str]:
394    """Return a list of file paths based on a glob pattern.
395
396    Only files are returned, not directories, and optionally only files for which the user has a specified access to.
397
398    :param pattern: file name or glob pattern
399    :param access: file access type to verify (os.* where * is F_OK, R_OK, W_OK, or X_OK)
400    :return: list of files matching the name or glob pattern
401    """
402    return [f for f in glob.glob(pattern) if os.path.isfile(f) and os.access(f, access)]
403
404
405def files_from_glob_patterns(patterns: List[str], access: int = os.F_OK) -> List[str]:
406    """Return a list of file paths based on a list of glob patterns.
407
408    Only files are returned, not directories, and optionally only files for which the user has a specified access to.
409
410    :param patterns: list of file names and/or glob patterns
411    :param access: file access type to verify (os.* where * is F_OK, R_OK, W_OK, or X_OK)
412    :return: list of files matching the names and/or glob patterns
413    """
414    files = []
415    for pattern in patterns:
416        matches = files_from_glob_pattern(pattern, access=access)
417        files.extend(matches)
418    return files
419
420
421def get_exes_in_path(starts_with: str) -> List[str]:
422    """Returns names of executables in a user's path
423
424    :param starts_with: what the exes should start with. leave blank for all exes in path.
425    :return: a list of matching exe names
426    """
427    # Purposely don't match any executable containing wildcards
428    wildcards = ['*', '?']
429    for wildcard in wildcards:
430        if wildcard in starts_with:
431            return []
432
433    # Get a list of every directory in the PATH environment variable and ignore symbolic links
434    env_path = os.getenv('PATH')
435    if env_path is None:
436        paths = []
437    else:
438        paths = [p for p in env_path.split(os.path.pathsep) if not os.path.islink(p)]
439
440    # Use a set to store exe names since there can be duplicates
441    exes_set = set()
442
443    # Find every executable file in the user's path that matches the pattern
444    for path in paths:
445        full_path = os.path.join(path, starts_with)
446        matches = files_from_glob_pattern(full_path + '*', access=os.X_OK)
447
448        for match in matches:
449            exes_set.add(os.path.basename(match))
450
451    return list(exes_set)
452
453
454class StdSim:
455    """
456    Class to simulate behavior of sys.stdout or sys.stderr.
457    Stores contents in internal buffer and optionally echos to the inner stream it is simulating.
458    """
459
460    def __init__(
461        self,
462        inner_stream: Union[TextIO, 'StdSim'],
463        *,
464        echo: bool = False,
465        encoding: str = 'utf-8',
466        errors: str = 'replace',
467    ) -> None:
468        """
469        StdSim Initializer
470
471        :param inner_stream: the wrapped stream. Should be a TextIO or StdSim instance.
472        :param echo: if True, then all input will be echoed to inner_stream
473        :param encoding: codec for encoding/decoding strings (defaults to utf-8)
474        :param errors: how to handle encoding/decoding errors (defaults to replace)
475        """
476        self.inner_stream = inner_stream
477        self.echo = echo
478        self.encoding = encoding
479        self.errors = errors
480        self.pause_storage = False
481        self.buffer = ByteBuf(self)
482
483    def write(self, s: str) -> None:
484        """
485        Add str to internal bytes buffer and if echo is True, echo contents to inner stream
486
487        :param s: String to write to the stream
488        """
489        if not isinstance(s, str):
490            raise TypeError(f'write() argument must be str, not {type(s)}')
491
492        if not self.pause_storage:
493            self.buffer.byte_buf += s.encode(encoding=self.encoding, errors=self.errors)
494        if self.echo:
495            self.inner_stream.write(s)
496
497    def getvalue(self) -> str:
498        """Get the internal contents as a str"""
499        return self.buffer.byte_buf.decode(encoding=self.encoding, errors=self.errors)
500
501    def getbytes(self) -> bytes:
502        """Get the internal contents as bytes"""
503        return bytes(self.buffer.byte_buf)
504
505    def read(self, size: Optional[int] = -1) -> str:
506        """
507        Read from the internal contents as a str and then clear them out
508
509        :param size: Number of bytes to read from the stream
510        """
511        if size is None or size == -1:
512            result = self.getvalue()
513            self.clear()
514        else:
515            result = self.buffer.byte_buf[:size].decode(encoding=self.encoding, errors=self.errors)
516            self.buffer.byte_buf = self.buffer.byte_buf[size:]
517
518        return result
519
520    def readbytes(self) -> bytes:
521        """Read from the internal contents as bytes and then clear them out"""
522        result = self.getbytes()
523        self.clear()
524        return result
525
526    def clear(self) -> None:
527        """Clear the internal contents"""
528        self.buffer.byte_buf.clear()
529
530    def isatty(self) -> bool:
531        """StdSim only considered an interactive stream if `echo` is True and `inner_stream` is a tty."""
532        if self.echo:
533            return self.inner_stream.isatty()
534        else:
535            return False
536
537    @property
538    def line_buffering(self) -> bool:
539        """
540        Handle when the inner stream doesn't have a line_buffering attribute which is the case
541        when running unit tests because pytest sets stdout to a pytest EncodedFile object.
542        """
543        try:
544            return bool(self.inner_stream.line_buffering)
545        except AttributeError:
546            return False
547
548    def __getattr__(self, item: str) -> Any:
549        if item in self.__dict__:
550            return self.__dict__[item]
551        else:
552            return getattr(self.inner_stream, item)
553
554
555class ByteBuf:
556    """
557    Used by StdSim to write binary data and stores the actual bytes written
558    """
559
560    # Used to know when to flush the StdSim
561    NEWLINES = [b'\n', b'\r']
562
563    def __init__(self, std_sim_instance: StdSim) -> None:
564        self.byte_buf = bytearray()
565        self.std_sim_instance = std_sim_instance
566
567    def write(self, b: bytes) -> None:
568        """Add bytes to internal bytes buffer and if echo is True, echo contents to inner stream."""
569        if not isinstance(b, bytes):
570            raise TypeError(f'a bytes-like object is required, not {type(b)}')
571        if not self.std_sim_instance.pause_storage:
572            self.byte_buf += b
573        if self.std_sim_instance.echo:
574            self.std_sim_instance.inner_stream.buffer.write(b)
575
576            # Since StdSim wraps TextIO streams, we will flush the stream if line buffering is on
577            # and the bytes being written contain a new line character. This is helpful when StdSim
578            # is being used to capture output of a shell command because it causes the output to print
579            # to the screen more often than if we waited for the stream to flush its buffer.
580            if self.std_sim_instance.line_buffering:
581                if any(newline in b for newline in ByteBuf.NEWLINES):
582                    self.std_sim_instance.flush()
583
584
585class ProcReader:
586    """
587    Used to capture stdout and stderr from a Popen process if any of those were set to subprocess.PIPE.
588    If neither are pipes, then the process will run normally and no output will be captured.
589    """
590
591    def __init__(self, proc: PopenTextIO, stdout: Union[StdSim, TextIO], stderr: Union[StdSim, TextIO]) -> None:
592        """
593        ProcReader initializer
594        :param proc: the Popen process being read from
595        :param stdout: the stream to write captured stdout
596        :param stderr: the stream to write captured stderr
597        """
598        self._proc = proc
599        self._stdout = stdout
600        self._stderr = stderr
601
602        self._out_thread = threading.Thread(name='out_thread', target=self._reader_thread_func, kwargs={'read_stdout': True})
603
604        self._err_thread = threading.Thread(name='err_thread', target=self._reader_thread_func, kwargs={'read_stdout': False})
605
606        # Start the reader threads for pipes only
607        if self._proc.stdout is not None:
608            self._out_thread.start()
609        if self._proc.stderr is not None:
610            self._err_thread.start()
611
612    def send_sigint(self) -> None:
613        """Send a SIGINT to the process similar to if <Ctrl>+C were pressed"""
614        import signal
615
616        if sys.platform.startswith('win'):
617            # cmd2 started the Windows process in a new process group. Therefore we must send
618            # a CTRL_BREAK_EVENT since CTRL_C_EVENT signals cannot be generated for process groups.
619            self._proc.send_signal(signal.CTRL_BREAK_EVENT)
620        else:
621            # Since cmd2 uses shell=True in its Popen calls, we need to send the SIGINT to
622            # the whole process group to make sure it propagates further than the shell
623            try:
624                group_id = os.getpgid(self._proc.pid)
625                os.killpg(group_id, signal.SIGINT)
626            except ProcessLookupError:
627                return
628
629    def terminate(self) -> None:
630        """Terminate the process"""
631        self._proc.terminate()
632
633    def wait(self) -> None:
634        """Wait for the process to finish"""
635        if self._out_thread.is_alive():
636            self._out_thread.join()
637        if self._err_thread.is_alive():
638            self._err_thread.join()
639
640        # Handle case where the process ended before the last read could be done.
641        # This will return None for the streams that weren't pipes.
642        out, err = self._proc.communicate()
643
644        if out:
645            self._write_bytes(self._stdout, out)
646        if err:
647            self._write_bytes(self._stderr, err)
648
649    def _reader_thread_func(self, read_stdout: bool) -> None:
650        """
651        Thread function that reads a stream from the process
652        :param read_stdout: if True, then this thread deals with stdout. Otherwise it deals with stderr.
653        """
654        if read_stdout:
655            read_stream = self._proc.stdout
656            write_stream = self._stdout
657        else:
658            read_stream = self._proc.stderr
659            write_stream = self._stderr
660
661        # The thread should have been started only if this stream was a pipe
662        assert read_stream is not None
663
664        # Run until process completes
665        while self._proc.poll() is None:
666            # noinspection PyUnresolvedReferences
667            available = read_stream.peek()  # type: ignore[attr-defined]
668            if available:
669                read_stream.read(len(available))
670                self._write_bytes(write_stream, available)
671
672    @staticmethod
673    def _write_bytes(stream: Union[StdSim, TextIO], to_write: bytes) -> None:
674        """
675        Write bytes to a stream
676        :param stream: the stream being written to
677        :param to_write: the bytes being written
678        """
679        try:
680            stream.buffer.write(to_write)
681        except BrokenPipeError:
682            # This occurs if output is being piped to a process that closed
683            pass
684
685
686class ContextFlag:
687    """A context manager which is also used as a boolean flag value within the default sigint handler.
688
689    Its main use is as a flag to prevent the SIGINT handler in cmd2 from raising a KeyboardInterrupt
690    while a critical code section has set the flag to True. Because signal handling is always done on the
691    main thread, this class is not thread-safe since there is no need.
692    """
693
694    def __init__(self) -> None:
695        # When this flag has a positive value, it is considered set.
696        # When it is 0, it is not set. It should never go below 0.
697        self.__count = 0
698
699    def __bool__(self) -> bool:
700        return self.__count > 0
701
702    def __enter__(self) -> None:
703        self.__count += 1
704
705    def __exit__(self, *args: Any) -> None:
706        self.__count -= 1
707        if self.__count < 0:
708            raise ValueError("count has gone below 0")
709
710
711class RedirectionSavedState:
712    """Created by each command to store information required to restore state after redirection"""
713
714    def __init__(
715        self,
716        self_stdout: Union[StdSim, TextIO],
717        sys_stdout: Union[StdSim, TextIO],
718        pipe_proc_reader: Optional[ProcReader],
719        saved_redirecting: bool,
720    ) -> None:
721        """
722        RedirectionSavedState initializer
723        :param self_stdout: saved value of Cmd.stdout
724        :param sys_stdout: saved value of sys.stdout
725        :param pipe_proc_reader: saved value of Cmd._cur_pipe_proc_reader
726        :param saved_redirecting: saved value of Cmd._redirecting
727        """
728        # Tells if command is redirecting
729        self.redirecting = False
730
731        # Used to restore values after redirection ends
732        self.saved_self_stdout = self_stdout
733        self.saved_sys_stdout = sys_stdout
734
735        # Used to restore values after command ends regardless of whether the command redirected
736        self.saved_pipe_proc_reader = pipe_proc_reader
737        self.saved_redirecting = saved_redirecting
738
739
740class TextAlignment(Enum):
741    """Horizontal text alignment"""
742
743    LEFT = 1
744    CENTER = 2
745    RIGHT = 3
746
747
748def align_text(
749    text: str,
750    alignment: TextAlignment,
751    *,
752    fill_char: str = ' ',
753    width: Optional[int] = None,
754    tab_width: int = 4,
755    truncate: bool = False,
756) -> str:
757    """
758    Align text for display within a given width. Supports characters with display widths greater than 1.
759    ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned
760    independently.
761
762    There are convenience wrappers around this function: align_left(), align_center(), and align_right()
763
764    :param text: text to align (can contain multiple lines)
765    :param alignment: how to align the text
766    :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character)
767    :param width: display width of the aligned text. Defaults to width of the terminal.
768    :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will
769                      be converted to one space.
770    :param truncate: if True, then each line will be shortened to fit within the display width. The truncated
771                     portions are replaced by a '…' character. Defaults to False.
772    :return: aligned text
773    :raises: TypeError if fill_char is more than one character (not including ANSI style sequences)
774    :raises: ValueError if text or fill_char contains an unprintable character
775    :raises: ValueError if width is less than 1
776    """
777    import io
778    import shutil
779
780    from . import (
781        ansi,
782    )
783
784    if width is None:
785        width = shutil.get_terminal_size().columns
786
787    if width < 1:
788        raise ValueError("width must be at least 1")
789
790    # Convert tabs to spaces
791    text = text.replace('\t', ' ' * tab_width)
792    fill_char = fill_char.replace('\t', ' ')
793
794    # Save fill_char with no styles for use later
795    stripped_fill_char = ansi.strip_style(fill_char)
796    if len(stripped_fill_char) != 1:
797        raise TypeError("Fill character must be exactly one character long")
798
799    fill_char_width = ansi.style_aware_wcswidth(fill_char)
800    if fill_char_width == -1:
801        raise (ValueError("Fill character is an unprintable character"))
802
803    # Isolate the style chars before and after the fill character. We will use them when building sequences of
804    # of fill characters. Instead of repeating the style characters for each fill character, we'll wrap each sequence.
805    fill_char_style_begin, fill_char_style_end = fill_char.split(stripped_fill_char)
806
807    if text:
808        lines = text.splitlines()
809    else:
810        lines = ['']
811
812    text_buf = io.StringIO()
813
814    # ANSI style sequences that may affect future lines will be cancelled by the fill_char's style.
815    # To avoid this, we save the state of a line's style so we can restore it when beginning the next line.
816    # This also allows the lines to be used independently and still have their style. TableCreator does this.
817    aggregate_styles = ''
818
819    for index, line in enumerate(lines):
820        if index > 0:
821            text_buf.write('\n')
822
823        if truncate:
824            line = truncate_line(line, width)
825
826        line_width = ansi.style_aware_wcswidth(line)
827        if line_width == -1:
828            raise (ValueError("Text to align contains an unprintable character"))
829
830        # Get the styles in this line
831        line_styles = get_styles_in_text(line)
832
833        # Calculate how wide each side of filling needs to be
834        if line_width >= width:
835            # Don't return here even though the line needs no fill chars.
836            # There may be styles sequences to restore.
837            total_fill_width = 0
838        else:
839            total_fill_width = width - line_width
840
841        if alignment == TextAlignment.LEFT:
842            left_fill_width = 0
843            right_fill_width = total_fill_width
844        elif alignment == TextAlignment.CENTER:
845            left_fill_width = total_fill_width // 2
846            right_fill_width = total_fill_width - left_fill_width
847        else:
848            left_fill_width = total_fill_width
849            right_fill_width = 0
850
851        # Determine how many fill characters are needed to cover the width
852        left_fill = (left_fill_width // fill_char_width) * stripped_fill_char
853        right_fill = (right_fill_width // fill_char_width) * stripped_fill_char
854
855        # In cases where the fill character display width didn't divide evenly into
856        # the gap being filled, pad the remainder with space.
857        left_fill += ' ' * (left_fill_width - ansi.style_aware_wcswidth(left_fill))
858        right_fill += ' ' * (right_fill_width - ansi.style_aware_wcswidth(right_fill))
859
860        # Don't allow styles in fill characters and text to affect one another
861        if fill_char_style_begin or fill_char_style_end or aggregate_styles or line_styles:
862            if left_fill:
863                left_fill = ansi.TextStyle.RESET_ALL + fill_char_style_begin + left_fill + fill_char_style_end
864            left_fill += ansi.TextStyle.RESET_ALL
865
866            if right_fill:
867                right_fill = ansi.TextStyle.RESET_ALL + fill_char_style_begin + right_fill + fill_char_style_end
868            right_fill += ansi.TextStyle.RESET_ALL
869
870        # Write the line and restore any styles from previous lines
871        text_buf.write(left_fill + aggregate_styles + line + right_fill)
872
873        # Update the aggregate with styles in this line
874        aggregate_styles += ''.join(line_styles.values())
875
876    return text_buf.getvalue()
877
878
879def align_left(
880    text: str, *, fill_char: str = ' ', width: Optional[int] = None, tab_width: int = 4, truncate: bool = False
881) -> str:
882    """
883    Left align text for display within a given width. Supports characters with display widths greater than 1.
884    ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned
885    independently.
886
887    :param text: text to left align (can contain multiple lines)
888    :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character)
889    :param width: display width of the aligned text. Defaults to width of the terminal.
890    :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will
891                      be converted to one space.
892    :param truncate: if True, then text will be shortened to fit within the display width. The truncated portion is
893                     replaced by a '…' character. Defaults to False.
894    :return: left-aligned text
895    :raises: TypeError if fill_char is more than one character (not including ANSI style sequences)
896    :raises: ValueError if text or fill_char contains an unprintable character
897    :raises: ValueError if width is less than 1
898    """
899    return align_text(text, TextAlignment.LEFT, fill_char=fill_char, width=width, tab_width=tab_width, truncate=truncate)
900
901
902def align_center(
903    text: str, *, fill_char: str = ' ', width: Optional[int] = None, tab_width: int = 4, truncate: bool = False
904) -> str:
905    """
906    Center text for display within a given width. Supports characters with display widths greater than 1.
907    ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned
908    independently.
909
910    :param text: text to center (can contain multiple lines)
911    :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character)
912    :param width: display width of the aligned text. Defaults to width of the terminal.
913    :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will
914                      be converted to one space.
915    :param truncate: if True, then text will be shortened to fit within the display width. The truncated portion is
916                     replaced by a '…' character. Defaults to False.
917    :return: centered text
918    :raises: TypeError if fill_char is more than one character (not including ANSI style sequences)
919    :raises: ValueError if text or fill_char contains an unprintable character
920    :raises: ValueError if width is less than 1
921    """
922    return align_text(text, TextAlignment.CENTER, fill_char=fill_char, width=width, tab_width=tab_width, truncate=truncate)
923
924
925def align_right(
926    text: str, *, fill_char: str = ' ', width: Optional[int] = None, tab_width: int = 4, truncate: bool = False
927) -> str:
928    """
929    Right align text for display within a given width. Supports characters with display widths greater than 1.
930    ANSI style sequences do not count toward the display width. If text has line breaks, then each line is aligned
931    independently.
932
933    :param text: text to right align (can contain multiple lines)
934    :param fill_char: character that fills the alignment gap. Defaults to space. (Cannot be a line breaking character)
935    :param width: display width of the aligned text. Defaults to width of the terminal.
936    :param tab_width: any tabs in the text will be replaced with this many spaces. if fill_char is a tab, then it will
937                      be converted to one space.
938    :param truncate: if True, then text will be shortened to fit within the display width. The truncated portion is
939                     replaced by a '…' character. Defaults to False.
940    :return: right-aligned text
941    :raises: TypeError if fill_char is more than one character (not including ANSI style sequences)
942    :raises: ValueError if text or fill_char contains an unprintable character
943    :raises: ValueError if width is less than 1
944    """
945    return align_text(text, TextAlignment.RIGHT, fill_char=fill_char, width=width, tab_width=tab_width, truncate=truncate)
946
947
948def truncate_line(line: str, max_width: int, *, tab_width: int = 4) -> str:
949    """
950    Truncate a single line to fit within a given display width. Any portion of the string that is truncated
951    is replaced by a '…' character. Supports characters with display widths greater than 1. ANSI style sequences
952    do not count toward the display width.
953
954    If there are ANSI style sequences in the string after where truncation occurs, this function will append them
955    to the returned string.
956
957    This is done to prevent issues caused in cases like: truncate_line(Fg.BLUE + hello + Fg.RESET, 3)
958    In this case, "hello" would be truncated before Fg.RESET resets the color from blue. Appending the remaining style
959    sequences makes sure the style is in the same state had the entire string been printed. align_text() relies on this
960    behavior when preserving style over multiple lines.
961
962    :param line: text to truncate
963    :param max_width: the maximum display width the resulting string is allowed to have
964    :param tab_width: any tabs in the text will be replaced with this many spaces
965    :return: line that has a display width less than or equal to width
966    :raises: ValueError if text contains an unprintable character like a newline
967    :raises: ValueError if max_width is less than 1
968    """
969    import io
970
971    from . import (
972        ansi,
973    )
974
975    # Handle tabs
976    line = line.replace('\t', ' ' * tab_width)
977
978    if ansi.style_aware_wcswidth(line) == -1:
979        raise (ValueError("text contains an unprintable character"))
980
981    if max_width < 1:
982        raise ValueError("max_width must be at least 1")
983
984    if ansi.style_aware_wcswidth(line) <= max_width:
985        return line
986
987    # Find all style sequences in the line
988    styles = get_styles_in_text(line)
989
990    # Add characters one by one and preserve all style sequences
991    done = False
992    index = 0
993    total_width = 0
994    truncated_buf = io.StringIO()
995
996    while not done:
997        # Check if a style sequence is at this index. These don't count toward display width.
998        if index in styles:
999            truncated_buf.write(styles[index])
1000            style_len = len(styles[index])
1001            styles.pop(index)
1002            index += style_len
1003            continue
1004
1005        char = line[index]
1006        char_width = ansi.style_aware_wcswidth(char)
1007
1008        # This char will make the text too wide, add the ellipsis instead
1009        if char_width + total_width >= max_width:
1010            char = constants.HORIZONTAL_ELLIPSIS
1011            char_width = ansi.style_aware_wcswidth(char)
1012            done = True
1013
1014        total_width += char_width
1015        truncated_buf.write(char)
1016        index += 1
1017
1018    # Append remaining style sequences from original string
1019    truncated_buf.write(''.join(styles.values()))
1020
1021    return truncated_buf.getvalue()
1022
1023
1024def get_styles_in_text(text: str) -> Dict[int, str]:
1025    """
1026    Return an OrderedDict containing all ANSI style sequences found in a string
1027
1028    The structure of the dictionary is:
1029        key: index where sequences begins
1030        value: ANSI style sequence found at index in text
1031
1032    Keys are in ascending order
1033
1034    :param text: text to search for style sequences
1035    """
1036    from . import (
1037        ansi,
1038    )
1039
1040    start = 0
1041    styles = collections.OrderedDict()
1042
1043    while True:
1044        match = ansi.ANSI_STYLE_RE.search(text, start)
1045        if match is None:
1046            break
1047        styles[match.start()] = match.group()
1048        start += len(match.group())
1049
1050    return styles
1051
1052
1053def categorize(func: Union[Callable[..., Any], Iterable[Callable[..., Any]]], category: str) -> None:
1054    """Categorize a function.
1055
1056    The help command output will group the passed function under the
1057    specified category heading
1058
1059    :param func: function or list of functions to categorize
1060    :param category: category to put it in
1061
1062    :Example:
1063
1064    >>> import cmd2
1065    >>> class MyApp(cmd2.Cmd):
1066    >>>   def do_echo(self, arglist):
1067    >>>     self.poutput(' '.join(arglist)
1068    >>>
1069    >>>   cmd2.utils.categorize(do_echo, "Text Processing")
1070
1071    For an alternative approach to categorizing commands using a decorator, see
1072    :func:`~cmd2.decorators.with_category`
1073    """
1074    if isinstance(func, Iterable):
1075        for item in func:
1076            setattr(item, constants.CMD_ATTR_HELP_CATEGORY, category)
1077    else:
1078        if inspect.ismethod(func):
1079            setattr(func.__func__, constants.CMD_ATTR_HELP_CATEGORY, category)  # type: ignore[attr-defined]
1080        else:
1081            setattr(func, constants.CMD_ATTR_HELP_CATEGORY, category)
1082
1083
1084def get_defining_class(meth: Callable[..., Any]) -> Optional[Type[Any]]:
1085    """
1086    Attempts to resolve the class that defined a method.
1087
1088    Inspired by implementation published here:
1089        https://stackoverflow.com/a/25959545/1956611
1090
1091    :param meth: method to inspect
1092    :return: class type in which the supplied method was defined. None if it couldn't be resolved.
1093    """
1094    if isinstance(meth, functools.partial):
1095        return get_defining_class(meth.func)
1096    if inspect.ismethod(meth) or (
1097        inspect.isbuiltin(meth)
1098        and getattr(meth, '__self__') is not None
1099        and getattr(meth.__self__, '__class__')  # type: ignore[attr-defined]
1100    ):
1101        for cls in inspect.getmro(meth.__self__.__class__):  # type: ignore[attr-defined]
1102            if meth.__name__ in cls.__dict__:
1103                return cls
1104        meth = getattr(meth, '__func__', meth)  # fallback to __qualname__ parsing
1105    if inspect.isfunction(meth):
1106        cls = getattr(inspect.getmodule(meth), meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
1107        if isinstance(cls, type):
1108            return cls
1109    return cast(type, getattr(meth, '__objclass__', None))  # handle special descriptor objects
1110
1111
1112class CompletionMode(Enum):
1113    """Enum for what type of tab completion to perform in cmd2.Cmd.read_input()"""
1114
1115    # Tab completion will be disabled during read_input() call
1116    # Use of custom up-arrow history supported
1117    NONE = 1
1118
1119    # read_input() will tab complete cmd2 commands and their arguments
1120    # cmd2's command line history will be used for up arrow if history is not provided.
1121    # Otherwise use of custom up-arrow history supported.
1122    COMMANDS = 2
1123
1124    # read_input() will tab complete based on one of its following parameters:
1125    #     choices, choices_provider, completer, parser
1126    # Use of custom up-arrow history supported
1127    CUSTOM = 3
1128
1129
1130class CustomCompletionSettings:
1131    """Used by cmd2.Cmd.complete() to tab complete strings other than command arguments"""
1132
1133    def __init__(self, parser: argparse.ArgumentParser, *, preserve_quotes: bool = False) -> None:
1134        """
1135        Initializer
1136
1137        :param parser: arg parser defining format of string being tab completed
1138        :param preserve_quotes: if True, then quoted tokens will keep their quotes when processed by
1139                                ArgparseCompleter. This is helpful in cases when you're tab completing
1140                                flag-like tokens (e.g. -o, --option) and you don't want them to be
1141                                treated as argparse flags when quoted. Set this to True if you plan
1142                                on passing the string to argparse with the tokens still quoted.
1143        """
1144        self.parser = parser
1145        self.preserve_quotes = preserve_quotes
1146