1# -*- coding: utf-8 -*- 2""" 3Terminal-related utilities 4-------------------------- 5""" 6 7from __future__ import absolute_import, division, print_function 8 9import os 10import sys 11 12from plumbum import local 13 14from .progress import Progress 15from .termsize import get_terminal_size 16 17__all__ = ( 18 "readline", 19 "ask", 20 "choose", 21 "prompt", 22 "get_terminal_size", 23 "Progress", 24 "get_terminal_size", 25) 26 27 28def __dir__(): 29 return __all__ 30 31 32def readline(message=""): 33 """Gets a line of input from the user (stdin)""" 34 sys.stdout.write(message) 35 sys.stdout.flush() 36 return sys.stdin.readline() 37 38 39def ask(question, default=None): 40 """ 41 Presents the user with a yes/no question. 42 43 :param question: The question to ask 44 :param default: If ``None``, the user must answer. If ``True`` or ``False``, lack of response is 45 interpreted as the default option 46 47 :returns: the user's choice 48 """ 49 question = question.rstrip().rstrip("?").rstrip() + "?" 50 if default is None: 51 question += " (y/n) " 52 elif default: 53 question += " [Y/n] " 54 else: 55 question += " [y/N] " 56 57 while True: 58 try: 59 answer = readline(question).strip().lower() 60 except EOFError: 61 answer = None 62 if answer in ("y", "yes"): 63 return True 64 elif answer in ("n", "no"): 65 return False 66 elif not answer and default is not None: 67 return default 68 else: 69 sys.stdout.write("Invalid response, please try again\n") 70 71 72def choose(question, options, default=None): 73 """Prompts the user with a question and a set of options, from which the user needs to choose. 74 75 :param question: The question to ask 76 :param options: A set of options. It can be a list (of strings or two-tuples, mapping text 77 to returned-object) or a dict (mapping text to returned-object).`` 78 :param default: If ``None``, the user must answer. Otherwise, lack of response is interpreted 79 as this answer 80 81 :returns: The user's choice 82 83 Example:: 84 85 ans = choose("What is your favorite color?", ["blue", "yellow", "green"], default = "yellow") 86 # `ans` will be one of "blue", "yellow" or "green" 87 88 ans = choose("What is your favorite color?", 89 {"blue" : 0x0000ff, "yellow" : 0xffff00 , "green" : 0x00ff00}, default = 0x00ff00) 90 # this will display "blue", "yellow" and "green" but return a numerical value 91 """ 92 if hasattr(options, "items"): 93 options = options.items() 94 sys.stdout.write(question.rstrip() + "\n") 95 choices = {} 96 defindex = None 97 for i, item in enumerate(options): 98 i = i + 1 # python2.5 99 if isinstance(item, (tuple, list)) and len(item) == 2: 100 text = item[0] 101 val = item[1] 102 else: 103 text = item 104 val = item 105 choices[i] = val 106 if default is not None and default == val: 107 defindex = i 108 sys.stdout.write("(%d) %s\n" % (i, text)) 109 if default is not None: 110 if defindex is None: 111 msg = "Choice [{}]: ".format(default) 112 else: 113 msg = "Choice [%d]: " % (defindex,) 114 else: 115 msg = "Choice: " 116 while True: 117 try: 118 choice = readline(msg).strip() 119 except EOFError: 120 choice = "" 121 if not choice and default: 122 return default 123 try: 124 choice = int(choice) 125 if choice not in choices: 126 raise ValueError() 127 except ValueError: 128 sys.stdout.write("Invalid choice, please try again\n") 129 continue 130 return choices[choice] 131 132 133def prompt(question, type=str, default=NotImplemented, validator=lambda val: True): 134 """ 135 Presents the user with a validated question, keeps asking if validation does not pass. 136 137 :param question: The question to ask 138 :param type: The type of the answer, defaults to str 139 :param default: The default choice 140 :param validator: An extra validator called after type conversion, can raise ValueError or return False to trigger a retry. 141 142 :returns: the user's choice 143 """ 144 question = question.rstrip(" \t:") 145 if default is not NotImplemented: 146 question += " [{}]".format(default) 147 question += ": " 148 while True: 149 try: 150 ans = readline(question).strip() 151 except EOFError: 152 ans = "" 153 if not ans: 154 if default is not NotImplemented: 155 # sys.stdout.write("\b%s\n" % (default,)) 156 return default 157 else: 158 continue 159 try: 160 ans = type(ans) 161 except (TypeError, ValueError) as ex: 162 sys.stdout.write("Invalid value ({}), please try again\n".format(ex)) 163 continue 164 try: 165 valid = validator(ans) 166 except ValueError as ex: 167 sys.stdout.write("{}, please try again\n".format(ex)) 168 continue 169 if not valid: 170 sys.stdout.write("Value not in specified range, please try again\n") 171 continue 172 return ans 173 174 175def hexdump(data_or_stream, bytes_per_line=16, aggregate=True): 176 """Convert the given bytes (or a stream with a buffering ``read()`` method) to hexdump-formatted lines, 177 with possible aggregation of identical lines. Returns a generator of formatted lines. 178 """ 179 if hasattr(data_or_stream, "read"): 180 181 def read_chunk(): 182 while True: 183 buf = data_or_stream.read(bytes_per_line) 184 if not buf: 185 break 186 yield buf 187 188 else: 189 190 def read_chunk(): 191 for i in range(0, len(data_or_stream), bytes_per_line): 192 yield data_or_stream[i : i + bytes_per_line] 193 194 prev = None 195 skipped = False 196 for i, chunk in enumerate(read_chunk()): 197 hexd = " ".join("{:02x}".format(ord(ch)) for ch in chunk) 198 text = "".join(ch if 32 <= ord(ch) < 127 else "." for ch in chunk) 199 if aggregate and prev == chunk: 200 skipped = True 201 continue 202 prev = chunk 203 if skipped: 204 yield "*" 205 yield "{:06x} | {}| {}".format( 206 i * bytes_per_line, 207 hexd.ljust(bytes_per_line * 3, " "), 208 text, 209 ) 210 skipped = False 211 212 213def pager(rows, pagercmd=None): # pragma: no cover 214 """Opens a pager (e.g., ``less``) to display the given text. Requires a terminal. 215 216 :param rows: a ``bytes`` or a list/iterator of "rows" (``bytes``) 217 :param pagercmd: the pager program to run. Defaults to ``less -RSin`` 218 """ 219 if not pagercmd: 220 pagercmd = local["less"]["-RSin"] 221 if hasattr(rows, "splitlines"): 222 rows = rows.splitlines() 223 224 pg = pagercmd.popen(stdout=None, stderr=None) 225 try: 226 for row in rows: 227 line = "{}\n".format(row) 228 try: 229 pg.stdin.write(line) 230 pg.stdin.flush() 231 except IOError: 232 break 233 pg.stdin.close() 234 pg.wait() 235 finally: 236 try: 237 rows.close() 238 except Exception: 239 pass 240 if pg and pg.poll() is None: 241 try: 242 pg.terminate() 243 except Exception: 244 pass 245 os.system("reset") 246