1# -*- coding: utf-8 -*-
3Terminal-related utilities
7from __future__ import absolute_import, division, print_function
9import os
10import sys
12from plumbum import local
14from .progress import Progress
15from .termsize import get_terminal_size
17__all__ = (
18    "readline",
19    "ask",
20    "choose",
21    "prompt",
22    "get_terminal_size",
23    "Progress",
24    "get_terminal_size",
28def __dir__():
29    return __all__
32def readline(message=""):
33    """Gets a line of input from the user (stdin)"""
34    sys.stdout.write(message)
35    sys.stdout.flush()
36    return sys.stdin.readline()
39def ask(question, default=None):
40    """
41    Presents the user with a yes/no question.
43    :param question: The question to ask
44    :param default: If ``None``, the user must answer. If ``True`` or ``False``, lack of response is
45                    interpreted as the default option
47    :returns: the user's choice
48    """
49    question = question.rstrip().rstrip("?").rstrip() + "?"
50    if default is None:
51        question += " (y/n) "
52    elif default:
53        question += " [Y/n] "
54    else:
55        question += " [y/N] "
57    while True:
58        try:
59            answer = readline(question).strip().lower()
60        except EOFError:
61            answer = None
62        if answer in ("y", "yes"):
63            return True
64        elif answer in ("n", "no"):
65            return False
66        elif not answer and default is not None:
67            return default
68        else:
69            sys.stdout.write("Invalid response, please try again\n")
72def choose(question, options, default=None):
73    """Prompts the user with a question and a set of options, from which the user needs to choose.
75    :param question: The question to ask
76    :param options: A set of options. It can be a list (of strings or two-tuples, mapping text
77                    to returned-object) or a dict (mapping text to returned-object).``
78    :param default: If ``None``, the user must answer. Otherwise, lack of response is interpreted
79                    as this answer
81    :returns: The user's choice
83    Example::
85        ans = choose("What is your favorite color?", ["blue", "yellow", "green"], default = "yellow")
86        # `ans` will be one of "blue", "yellow" or "green"
88        ans = choose("What is your favorite color?",
89                {"blue" : 0x0000ff, "yellow" : 0xffff00 , "green" : 0x00ff00}, default = 0x00ff00)
90        # this will display "blue", "yellow" and "green" but return a numerical value
91    """
92    if hasattr(options, "items"):
93        options = options.items()
94    sys.stdout.write(question.rstrip() + "\n")
95    choices = {}
96    defindex = None
97    for i, item in enumerate(options):
98        i = i + 1  # python2.5
99        if isinstance(item, (tuple, list)) and len(item) == 2:
100            text = item[0]
101            val = item[1]
102        else:
103            text = item
104            val = item
105        choices[i] = val
106        if default is not None and default == val:
107            defindex = i
108        sys.stdout.write("(%d) %s\n" % (i, text))
109    if default is not None:
110        if defindex is None:
111            msg = "Choice [{}]: ".format(default)
112        else:
113            msg = "Choice [%d]: " % (defindex,)
114    else:
115        msg = "Choice: "
116    while True:
117        try:
118            choice = readline(msg).strip()
119        except EOFError:
120            choice = ""
121        if not choice and default:
122            return default
123        try:
124            choice = int(choice)
125            if choice not in choices:
126                raise ValueError()
127        except ValueError:
128            sys.stdout.write("Invalid choice, please try again\n")
129            continue
130        return choices[choice]
133def prompt(question, type=str, default=NotImplemented, validator=lambda val: True):
134    """
135    Presents the user with a validated question, keeps asking if validation does not pass.
137    :param question: The question to ask
138    :param type: The type of the answer, defaults to str
139    :param default: The default choice
140    :param validator: An extra validator called after type conversion, can raise ValueError or return False to trigger a retry.
142    :returns: the user's choice
143    """
144    question = question.rstrip(" \t:")
145    if default is not NotImplemented:
146        question += " [{}]".format(default)
147    question += ": "
148    while True:
149        try:
150            ans = readline(question).strip()
151        except EOFError:
152            ans = ""
153        if not ans:
154            if default is not NotImplemented:
155                # sys.stdout.write("\b%s\n" % (default,))
156                return default
157            else:
158                continue
159        try:
160            ans = type(ans)
161        except (TypeError, ValueError) as ex:
162            sys.stdout.write("Invalid value ({}), please try again\n".format(ex))
163            continue
164        try:
165            valid = validator(ans)
166        except ValueError as ex:
167            sys.stdout.write("{}, please try again\n".format(ex))
168            continue
169        if not valid:
170            sys.stdout.write("Value not in specified range, please try again\n")
171            continue
172        return ans
175def hexdump(data_or_stream, bytes_per_line=16, aggregate=True):
176    """Convert the given bytes (or a stream with a buffering ``read()`` method) to hexdump-formatted lines,
177    with possible aggregation of identical lines. Returns a generator of formatted lines.
178    """
179    if hasattr(data_or_stream, "read"):
181        def read_chunk():
182            while True:
183                buf = data_or_stream.read(bytes_per_line)
184                if not buf:
185                    break
186                yield buf
188    else:
190        def read_chunk():
191            for i in range(0, len(data_or_stream), bytes_per_line):
192                yield data_or_stream[i : i + bytes_per_line]
194    prev = None
195    skipped = False
196    for i, chunk in enumerate(read_chunk()):
197        hexd = " ".join("{:02x}".format(ord(ch)) for ch in chunk)
198        text = "".join(ch if 32 <= ord(ch) < 127 else "." for ch in chunk)
199        if aggregate and prev == chunk:
200            skipped = True
201            continue
202        prev = chunk
203        if skipped:
204            yield "*"
205        yield "{:06x} | {}| {}".format(
206            i * bytes_per_line,
207            hexd.ljust(bytes_per_line * 3, " "),
208            text,
209        )
210        skipped = False
213def pager(rows, pagercmd=None):  # pragma: no cover
214    """Opens a pager (e.g., ``less``) to display the given text. Requires a terminal.
216    :param rows: a ``bytes`` or a list/iterator of "rows" (``bytes``)
217    :param pagercmd: the pager program to run. Defaults to ``less -RSin``
218    """
219    if not pagercmd:
220        pagercmd = local["less"]["-RSin"]
221    if hasattr(rows, "splitlines"):
222        rows = rows.splitlines()
224    pg = pagercmd.popen(stdout=None, stderr=None)
225    try:
226        for row in rows:
227            line = "{}\n".format(row)
228            try:
229                pg.stdin.write(line)
230                pg.stdin.flush()
231            except IOError:
232                break
233        pg.stdin.close()
234        pg.wait()
235    finally:
236        try:
237            rows.close()
238        except Exception:
239            pass
240        if pg and pg.poll() is None:
241            try:
242                pg.terminate()
243            except Exception:
244                pass
245            os.system("reset")