1# All windows write only unicode to the terminal - 2# that's what blessings does, so we match it. 3 4 5from typing import ( 6 Optional, 7 IO, 8 Dict, 9 TypeVar, 10 Type, 11 Tuple, 12 Callable, 13 cast, 14 TextIO, 15 Union, 16 List, 17) 18from types import TracebackType 19 20import logging 21import re 22import sys 23 24import blessings 25 26from .formatstring import fmtstr, FmtStr 27from .formatstringarray import FSArray 28from .termhelpers import Cbreak 29 30logger = logging.getLogger(__name__) 31 32SCROLL_DOWN = "\x1bD" 33FIRST_COLUMN = "\x1b[1G" 34 35 36T = TypeVar("T", bound="BaseWindow") 37 38 39class BaseWindow: 40 def __init__( 41 self, out_stream: Optional[IO] = None, hide_cursor: bool = True 42 ) -> None: 43 logger.debug("-------initializing Window object %r------" % self) 44 if out_stream is None: 45 out_stream = sys.__stdout__ 46 self.t = blessings.Terminal(stream=out_stream, force_styling=True) 47 self.out_stream = out_stream 48 self.hide_cursor = hide_cursor 49 self._last_lines_by_row: Dict[int, Optional[FmtStr]] = {} 50 self._last_rendered_width: Optional[int] = None 51 self._last_rendered_height: Optional[int] = None 52 53 def scroll_down(self) -> None: 54 logger.debug("sending scroll down message w/ cursor on bottom line") 55 56 # since scroll-down only moves the screen if cursor is at bottom 57 with self.t.location(x=0, y=1000000): 58 self.write(SCROLL_DOWN) # TODO will blessings do this? 59 60 def write(self, msg: str) -> None: 61 self.out_stream.write(msg) 62 self.out_stream.flush() 63 64 def __enter__(self: T) -> T: 65 logger.debug("running BaseWindow.__enter__") 66 if self.hide_cursor: 67 self.write(self.t.hide_cursor) 68 return self 69 70 def __exit__( 71 self, 72 type: Optional[Type[BaseException]] = None, 73 value: Optional[BaseException] = None, 74 traceback: Optional[TracebackType] = None, 75 ) -> None: 76 logger.debug("running BaseWindow.__exit__") 77 if self.hide_cursor: 78 self.write(self.t.normal_cursor) 79 80 def on_terminal_size_change(self, height: int, width: int) -> None: 81 # Changing the terminal size breaks the cache, because it 82 # is unknown how the window size change affected scrolling / the cursor 83 self._last_lines_by_row = {} 84 self._last_rendered_width = width 85 self._last_rendered_height = height 86 87 def render_to_terminal( 88 self, array: Union[FSArray, List[FmtStr]], cursor_pos: Tuple[int, int] = (0, 0) 89 ) -> Optional[int]: 90 raise NotImplementedError 91 92 def get_term_hw(self) -> Tuple[int, int]: 93 """Returns current terminal height and width""" 94 return self.t.height, self.t.width 95 96 @property 97 def width(self) -> int: 98 "The current width of the terminal window" 99 return self.t.width 100 101 @property 102 def height(self) -> int: 103 "The current width of the terminal window" 104 return self.t.height 105 106 def array_from_text(self, msg: str) -> FSArray: 107 """Returns a FSArray of the size of the window containing msg""" 108 rows, columns = self.t.height, self.t.width 109 return self.array_from_text_rc(msg, rows, columns) 110 111 @classmethod 112 def array_from_text_rc(cls, msg: str, rows: int, columns: int) -> FSArray: 113 arr = FSArray(0, columns) 114 i = 0 115 for c in msg: 116 if i >= rows * columns: 117 return arr 118 elif c in "\r\n": 119 i = ((i // columns) + 1) * columns - 1 120 else: 121 arr[i // arr.width, i % arr.width] = [fmtstr(c)] 122 i += 1 123 return arr 124 125 def fmtstr_to_stdout_xform(self) -> Callable[[FmtStr], str]: 126 def for_stdout(s: FmtStr) -> str: 127 return str(s) 128 129 return for_stdout 130 131 132class FullscreenWindow(BaseWindow): 133 """2D-text rendering window that disappears when its context is left 134 135 FullscreenWindow will only render arrays the size of the terminal 136 or smaller, and leaves no trace on exit (like top or vim). It never 137 scrolls the terminal. Changing the terminal size doesn't do anything, 138 but rendered arrays need to fit on the screen. 139 140 Note: 141 The context of the FullscreenWindow 142 object must be entered before calling any of its methods. 143 144 Within the context of CursorAwareWindow, refrain from writing to 145 its out_stream; cached writes will be inaccurate. 146 """ 147 148 def __init__( 149 self, out_stream: Optional[IO] = None, hide_cursor: bool = True 150 ) -> None: 151 """Constructs a FullscreenWindow 152 153 Args: 154 out_stream (file): Defaults to sys.__stdout__ 155 hide_cursor (bool): Hides cursor while in context 156 """ 157 super().__init__(out_stream=out_stream, hide_cursor=hide_cursor) 158 self.fullscreen_ctx = self.t.fullscreen() 159 160 def __enter__(self) -> "FullscreenWindow": 161 self.fullscreen_ctx.__enter__() 162 return super().__enter__() 163 164 def __exit__( 165 self, 166 type: Optional[Type[BaseException]] = None, 167 value: Optional[BaseException] = None, 168 traceback: Optional[TracebackType] = None, 169 ) -> None: 170 self.fullscreen_ctx.__exit__(type, value, traceback) 171 super().__exit__(type, value, traceback) 172 173 def render_to_terminal( 174 self, array: Union[FSArray, List[FmtStr]], cursor_pos: Tuple[int, int] = (0, 0) 175 ) -> None: 176 """Renders array to terminal and places (0-indexed) cursor 177 178 Args: 179 array (FSArray): Grid of styled characters to be rendered. 180 181 * If array received is of width too small, render it anyway 182 * If array received is of width too large, 183 * render the renderable portion 184 * If array received is of height too small, render it anyway 185 * If array received is of height too large, 186 * render the renderable portion (no scroll) 187 """ 188 # TODO there's a race condition here - these height and widths are 189 # super fresh - they might change between the array being constructed 190 # and rendered 191 # Maybe the right behavior is to throw away the render 192 # in the signal handler? 193 height, width = self.height, self.width 194 195 for_stdout = self.fmtstr_to_stdout_xform() 196 if not self.hide_cursor: 197 self.write(self.t.hide_cursor) 198 if height != self._last_rendered_height or width != self._last_rendered_width: 199 self.on_terminal_size_change(height, width) 200 201 current_lines_by_row: Dict[int, Optional[FmtStr]] = {} 202 rows = list(range(height)) 203 rows_for_use = rows[: len(array)] 204 rest_of_rows = rows[len(array) :] 205 206 # rows which we have content for and don't require scrolling 207 for row, line in zip(rows_for_use, array): 208 current_lines_by_row[row] = line 209 if line == self._last_lines_by_row.get(row, None): 210 continue 211 self.write(self.t.move(row, 0)) 212 self.write(for_stdout(line)) 213 if len(line) < width: 214 self.write(self.t.clear_eol) 215 216 # rows onscreen that we don't have content for 217 for row in rest_of_rows: 218 if self._last_lines_by_row and row not in self._last_lines_by_row: 219 continue 220 self.write(self.t.move(row, 0)) 221 self.write(self.t.clear_eol) 222 self.write(self.t.clear_bol) 223 current_lines_by_row[row] = None 224 225 logger.debug("lines in last lines by row: %r" % self._last_lines_by_row.keys()) 226 logger.debug("lines in current lines by row: %r" % current_lines_by_row.keys()) 227 self.write(self.t.move(*cursor_pos)) 228 self._last_lines_by_row = current_lines_by_row 229 if not self.hide_cursor: 230 self.write(self.t.normal_cursor) 231 232 233class CursorAwareWindow(BaseWindow): 234 """ 235 Renders to the normal terminal screen and 236 can find the location of the cursor. 237 238 Note: 239 The context of the CursorAwareWindow 240 object must be entered before calling any of its methods. 241 242 Within the context of CursorAwareWindow, refrain from writing to 243 its out_stream; cached writes will be inaccurate and calculating 244 cursor depends on cursor not having moved since the last render. 245 Only use the render_to_terminal interface for moving the cursor. 246 """ 247 248 def __init__( 249 self, 250 out_stream: Optional[IO] = None, 251 in_stream: Optional[IO] = None, 252 keep_last_line: bool = False, 253 hide_cursor: bool = True, 254 extra_bytes_callback: Optional[Callable[[bytes], None]] = None, 255 ): 256 """Constructs a CursorAwareWindow 257 258 Args: 259 out_stream (file): Defaults to sys.__stdout__ 260 in_stream (file): Defaults to sys.__stdin__ 261 keep_last_line (bool): Causes the cursor to be moved down one line 262 on leaving context 263 hide_cursor (bool): Hides cursor while in context 264 extra_bytes_callback (f(bytes) -> None): Will be called with extra 265 bytes inadvertently read in get_cursor_position(). If not 266 provided, a ValueError will be raised when this occurs. 267 """ 268 super().__init__(out_stream=out_stream, hide_cursor=hide_cursor) 269 if in_stream is None: 270 in_stream = sys.__stdin__ 271 self.in_stream = in_stream 272 self._last_cursor_column: Optional[int] = None 273 self._last_cursor_row: Optional[int] = None 274 self.keep_last_line = keep_last_line 275 self.cbreak = Cbreak(self.in_stream) 276 self.extra_bytes_callback = extra_bytes_callback 277 278 # whether another SIGWINCH is queued up 279 self.another_sigwinch = False 280 281 # in the cursor query code of cursor diff 282 self.in_get_cursor_diff = False 283 284 def __enter__(self) -> "CursorAwareWindow": 285 self.cbreak.__enter__() 286 self.top_usable_row, _ = self.get_cursor_position() 287 self._orig_top_usable_row = self.top_usable_row 288 logger.debug("initial top_usable_row: %d" % self.top_usable_row) 289 return super().__enter__() 290 291 def __exit__( 292 self, 293 type: Optional[Type[BaseException]] = None, 294 value: Optional[BaseException] = None, 295 traceback: Optional[TracebackType] = None, 296 ) -> None: 297 if self.keep_last_line: 298 # just moves cursor down if not on last line 299 self.write(SCROLL_DOWN) 300 301 self.write(FIRST_COLUMN) 302 self.write(self.t.clear_eos) 303 self.write(self.t.clear_eol) 304 self.cbreak.__exit__(type, value, traceback) 305 super().__exit__(type, value, traceback) 306 307 def get_cursor_position(self) -> Tuple[int, int]: 308 """Returns the terminal (row, column) of the cursor 309 310 0-indexed, like blessings cursor positions""" 311 # TODO would this be cleaner as a parameter? 312 in_stream = self.in_stream 313 314 query_cursor_position = "\x1b[6n" 315 self.write(query_cursor_position) 316 317 def retrying_read() -> str: 318 while True: 319 try: 320 c = in_stream.read(1) 321 if c == "": 322 raise ValueError( 323 "Stream should be blocking - shouldn't" 324 " return ''. Returned %r so far", 325 (resp,), 326 ) 327 return c 328 except OSError: 329 # apparently sometimes this happens: the only documented 330 # case is Terminal on a Ubuntu 17.10 VM on osx 10.13. 331 # see issue #732 332 logger.info("stdin.read(1) that should never error just errored.") 333 continue 334 335 resp = "" 336 while True: 337 c = retrying_read() 338 resp += c 339 m = re.search( 340 r"(?P<extra>.*)" 341 r"(?P<CSI>\x1b\[|\x9b)" 342 r"(?P<row>\d+);(?P<column>\d+)R", 343 resp, 344 re.DOTALL, 345 ) 346 if m: 347 row = int(m.groupdict()["row"]) 348 col = int(m.groupdict()["column"]) 349 extra = m.groupdict()["extra"] 350 if extra: 351 if self.extra_bytes_callback is not None: 352 self.extra_bytes_callback( 353 # TODO how do we know that this works? 354 extra.encode(cast(TextIO, in_stream).encoding) 355 ) 356 else: 357 raise ValueError( 358 ( 359 "Bytes preceding cursor position " 360 "query response thrown out:\n%r\n" 361 "Pass an extra_bytes_callback to " 362 "CursorAwareWindow to prevent this" 363 ) 364 % (extra,) 365 ) 366 return (row - 1, col - 1) 367 368 def get_cursor_vertical_diff(self) -> int: 369 """Returns the how far down the cursor moved since last render. 370 371 Note: 372 If another get_cursor_vertical_diff call is already in progress, 373 immediately returns zero. (This situation is likely if 374 get_cursor_vertical_diff is called from a SIGWINCH signal 375 handler, since sigwinches can happen in rapid succession and 376 terminal emulators seem not to respond to cursor position 377 queries before the next sigwinch occurs.) 378 """ 379 # Probably called by a SIGWINCH handler, and therefore 380 # will do cursor querying until a SIGWINCH doesn't happen during 381 # the query. Calls to the function from a signal handler COULD STILL 382 # HAPPEN out of order - 383 # they just can't interrupt the actual cursor query. 384 if self.in_get_cursor_diff: 385 self.another_sigwinch = True 386 return 0 387 388 cursor_dy = 0 389 while True: 390 self.in_get_cursor_diff = True 391 self.another_sigwinch = False 392 cursor_dy += self._get_cursor_vertical_diff_once() 393 self.in_get_cursor_diff = False 394 if not self.another_sigwinch: 395 return cursor_dy 396 397 def _get_cursor_vertical_diff_once(self) -> int: 398 """Returns the how far down the cursor moved.""" 399 old_top_usable_row = self.top_usable_row 400 row, col = self.get_cursor_position() 401 if self._last_cursor_row is None: 402 cursor_dy = 0 403 else: 404 cursor_dy = row - self._last_cursor_row 405 logger.info("cursor moved %d lines down" % cursor_dy) 406 while self.top_usable_row > -1 and cursor_dy > 0: 407 self.top_usable_row += 1 408 cursor_dy -= 1 409 while self.top_usable_row > 1 and cursor_dy < 0: 410 self.top_usable_row -= 1 411 cursor_dy += 1 412 logger.info( 413 "top usable row changed from %d to %d", 414 old_top_usable_row, 415 self.top_usable_row, 416 ) 417 logger.info("returning cursor dy of %d from curtsies" % cursor_dy) 418 self._last_cursor_row = row 419 return cursor_dy 420 421 def render_to_terminal( 422 self, array: Union[FSArray, List[FmtStr]], cursor_pos: Tuple[int, int] = (0, 0) 423 ) -> int: 424 """Renders array to terminal, returns the number of lines scrolled offscreen 425 426 Returns: 427 Number of times scrolled 428 429 Args: 430 array (FSArray): Grid of styled characters to be rendered. 431 432 If array received is of width too small, render it anyway 433 434 if array received is of width too large, render it anyway 435 436 if array received is of height too small, render it anyway 437 438 if array received is of height too large, render it, scroll down, 439 and render the rest of it, then return how much we scrolled down 440 441 """ 442 for_stdout = self.fmtstr_to_stdout_xform() 443 # caching of write and tc (avoiding the self. lookups etc) made 444 # no significant performance difference here 445 if not self.hide_cursor: 446 self.write(self.t.hide_cursor) 447 448 # TODO race condition here? 449 height, width = self.t.height, self.t.width 450 if height != self._last_rendered_height or width != self._last_rendered_width: 451 self.on_terminal_size_change(height, width) 452 453 current_lines_by_row: Dict[int, Optional[FmtStr]] = {} 454 rows_for_use = list(range(self.top_usable_row, height)) 455 456 # rows which we have content for and don't require scrolling 457 # TODO rename shared 458 shared = min(len(array), len(rows_for_use)) 459 for row, line in zip(rows_for_use[:shared], array[:shared]): 460 current_lines_by_row[row] = line 461 if line == self._last_lines_by_row.get(row, None): 462 continue 463 self.write(self.t.move(row, 0)) 464 self.write(for_stdout(line)) 465 if len(line) < width: 466 self.write(self.t.clear_eol) 467 468 # rows already on screen that we don't have content for 469 rest_of_lines = array[shared:] 470 rest_of_rows = rows_for_use[shared:] 471 for row in rest_of_rows: # if array too small 472 if self._last_lines_by_row and row not in self._last_lines_by_row: 473 continue 474 self.write(self.t.move(row, 0)) 475 self.write(self.t.clear_eol) 476 # TODO probably not necessary - is first char cleared? 477 self.write(self.t.clear_bol) 478 current_lines_by_row[row] = None 479 480 # lines for which we need to scroll down to render 481 offscreen_scrolls = 0 482 for line in rest_of_lines: # if array too big 483 self.scroll_down() 484 if self.top_usable_row > 0: 485 self.top_usable_row -= 1 486 else: 487 offscreen_scrolls += 1 488 current_lines_by_row = {k - 1: v for k, v in current_lines_by_row.items()} 489 logger.debug("new top_usable_row: %d" % self.top_usable_row) 490 # since scrolling moves the cursor 491 self.write(self.t.move(height - 1, 0)) 492 self.write(for_stdout(line)) 493 current_lines_by_row[height - 1] = line 494 495 logger.debug("lines in last lines by row: %r" % self._last_lines_by_row.keys()) 496 logger.debug("lines in current lines by row: %r" % current_lines_by_row.keys()) 497 self._last_cursor_row = max( 498 0, cursor_pos[0] - offscreen_scrolls + self.top_usable_row 499 ) 500 self._last_cursor_column = cursor_pos[1] 501 self.write(self.t.move(self._last_cursor_row, self._last_cursor_column)) 502 self._last_lines_by_row = current_lines_by_row 503 if not self.hide_cursor: 504 self.write(self.t.normal_cursor) 505 return offscreen_scrolls 506 507 508def demo() -> None: 509 handler = logging.FileHandler(filename="display.log") 510 logging.getLogger(__name__).setLevel(logging.DEBUG) 511 logging.getLogger(__name__).addHandler(handler) 512 from . import input 513 514 with FullscreenWindow(sys.stdout) as w: 515 with input.Input(sys.stdin) as input_generator: 516 rows, columns = w.t.height, w.t.width 517 for c in input_generator: 518 assert isinstance(c, str) 519 if c == "": 520 sys.exit() # same as raise SystemExit() 521 elif c == "h": 522 a: Union[List[FmtStr], FSArray] = w.array_from_text( 523 "a for small array" 524 ) 525 elif c == "a": 526 a = [fmtstr(c * columns) for _ in range(rows)] 527 elif c == "s": 528 a = [fmtstr(c * columns) for _ in range(rows - 1)] 529 elif c == "d": 530 a = [fmtstr(c * columns) for _ in range(rows + 1)] 531 elif c == "f": 532 a = [fmtstr(c * columns) for _ in range(rows - 2)] 533 elif c == "q": 534 a = [fmtstr(c * columns) for _ in range(1)] 535 elif c == "w": 536 a = [fmtstr(c * columns) for _ in range(1)] 537 elif c == "e": 538 a = [fmtstr(c * columns) for _ in range(1)] 539 elif c == "\x0c": # ctrl-L 540 for _ in range(rows): 541 w.write("\n") 542 continue 543 else: 544 a = w.array_from_text("unknown command") 545 w.render_to_terminal(a) 546 547 548def main() -> None: 549 handler = logging.FileHandler(filename="display.log") 550 logging.getLogger(__name__).setLevel(logging.DEBUG) 551 logging.getLogger(__name__).addHandler(handler) 552 print("this should be just off-screen") 553 w = FullscreenWindow(sys.stdout) 554 rows, columns = w.t.height, w.t.width 555 with w: 556 a = [fmtstr(((f".row{row!r}.") * rows)[:columns]) for row in range(rows)] 557 w.render_to_terminal(a) 558 559 560if __name__ == "__main__": 561 demo() 562