1# All windows write only unicode to the terminal -
2# that's what blessings does, so we match it.
3
4
5from typing import (
6    Optional,
7    IO,
8    Dict,
9    TypeVar,
10    Type,
11    Tuple,
12    Callable,
13    cast,
14    TextIO,
15    Union,
16    List,
17)
18from types import TracebackType
19
20import logging
21import re
22import sys
23
24import blessings
25
26from .formatstring import fmtstr, FmtStr
27from .formatstringarray import FSArray
28from .termhelpers import Cbreak
29
30logger = logging.getLogger(__name__)
31
32SCROLL_DOWN = "\x1bD"
33FIRST_COLUMN = "\x1b[1G"
34
35
36T = TypeVar("T", bound="BaseWindow")
37
38
39class BaseWindow:
40    def __init__(
41        self, out_stream: Optional[IO] = None, hide_cursor: bool = True
42    ) -> None:
43        logger.debug("-------initializing Window object %r------" % self)
44        if out_stream is None:
45            out_stream = sys.__stdout__
46        self.t = blessings.Terminal(stream=out_stream, force_styling=True)
47        self.out_stream = out_stream
48        self.hide_cursor = hide_cursor
49        self._last_lines_by_row: Dict[int, Optional[FmtStr]] = {}
50        self._last_rendered_width: Optional[int] = None
51        self._last_rendered_height: Optional[int] = None
52
53    def scroll_down(self) -> None:
54        logger.debug("sending scroll down message w/ cursor on bottom line")
55
56        # since scroll-down only moves the screen if cursor is at bottom
57        with self.t.location(x=0, y=1000000):
58            self.write(SCROLL_DOWN)  # TODO will blessings do this?
59
60    def write(self, msg: str) -> None:
61        self.out_stream.write(msg)
62        self.out_stream.flush()
63
64    def __enter__(self: T) -> T:
65        logger.debug("running BaseWindow.__enter__")
66        if self.hide_cursor:
67            self.write(self.t.hide_cursor)
68        return self
69
70    def __exit__(
71        self,
72        type: Optional[Type[BaseException]] = None,
73        value: Optional[BaseException] = None,
74        traceback: Optional[TracebackType] = None,
75    ) -> None:
76        logger.debug("running BaseWindow.__exit__")
77        if self.hide_cursor:
78            self.write(self.t.normal_cursor)
79
80    def on_terminal_size_change(self, height: int, width: int) -> None:
81        # Changing the terminal size breaks the cache, because it
82        # is unknown how the window size change affected scrolling / the cursor
83        self._last_lines_by_row = {}
84        self._last_rendered_width = width
85        self._last_rendered_height = height
86
87    def render_to_terminal(
88        self, array: Union[FSArray, List[FmtStr]], cursor_pos: Tuple[int, int] = (0, 0)
89    ) -> Optional[int]:
90        raise NotImplementedError
91
92    def get_term_hw(self) -> Tuple[int, int]:
93        """Returns current terminal height and width"""
94        return self.t.height, self.t.width
95
96    @property
97    def width(self) -> int:
98        "The current width of the terminal window"
99        return self.t.width
100
101    @property
102    def height(self) -> int:
103        "The current width of the terminal window"
104        return self.t.height
105
106    def array_from_text(self, msg: str) -> FSArray:
107        """Returns a FSArray of the size of the window containing msg"""
108        rows, columns = self.t.height, self.t.width
109        return self.array_from_text_rc(msg, rows, columns)
110
111    @classmethod
112    def array_from_text_rc(cls, msg: str, rows: int, columns: int) -> FSArray:
113        arr = FSArray(0, columns)
114        i = 0
115        for c in msg:
116            if i >= rows * columns:
117                return arr
118            elif c in "\r\n":
119                i = ((i // columns) + 1) * columns - 1
120            else:
121                arr[i // arr.width, i % arr.width] = [fmtstr(c)]
122            i += 1
123        return arr
124
125    def fmtstr_to_stdout_xform(self) -> Callable[[FmtStr], str]:
126        def for_stdout(s: FmtStr) -> str:
127            return str(s)
128
129        return for_stdout
130
131
132class FullscreenWindow(BaseWindow):
133    """2D-text rendering window that disappears when its context is left
134
135    FullscreenWindow will only render arrays the size of the terminal
136    or smaller, and leaves no trace on exit (like top or vim). It never
137    scrolls the terminal. Changing the terminal size doesn't do anything,
138    but rendered arrays need to fit on the screen.
139
140    Note:
141        The context of the FullscreenWindow
142        object must be entered before calling any of its methods.
143
144        Within the context of CursorAwareWindow, refrain from writing to
145        its out_stream; cached writes will be inaccurate.
146    """
147
148    def __init__(
149        self, out_stream: Optional[IO] = None, hide_cursor: bool = True
150    ) -> None:
151        """Constructs a FullscreenWindow
152
153        Args:
154            out_stream (file): Defaults to sys.__stdout__
155            hide_cursor (bool): Hides cursor while in context
156        """
157        super().__init__(out_stream=out_stream, hide_cursor=hide_cursor)
158        self.fullscreen_ctx = self.t.fullscreen()
159
160    def __enter__(self) -> "FullscreenWindow":
161        self.fullscreen_ctx.__enter__()
162        return super().__enter__()
163
164    def __exit__(
165        self,
166        type: Optional[Type[BaseException]] = None,
167        value: Optional[BaseException] = None,
168        traceback: Optional[TracebackType] = None,
169    ) -> None:
170        self.fullscreen_ctx.__exit__(type, value, traceback)
171        super().__exit__(type, value, traceback)
172
173    def render_to_terminal(
174        self, array: Union[FSArray, List[FmtStr]], cursor_pos: Tuple[int, int] = (0, 0)
175    ) -> None:
176        """Renders array to terminal and places (0-indexed) cursor
177
178        Args:
179            array (FSArray): Grid of styled characters to be rendered.
180
181        * If array received is of width too small, render it anyway
182        * If array received is of width too large,
183        * render the renderable portion
184        * If array received is of height too small, render it anyway
185        * If array received is of height too large,
186        * render the renderable portion (no scroll)
187        """
188        # TODO there's a race condition here - these height and widths are
189        # super fresh - they might change between the array being constructed
190        # and rendered
191        # Maybe the right behavior is to throw away the render
192        # in the signal handler?
193        height, width = self.height, self.width
194
195        for_stdout = self.fmtstr_to_stdout_xform()
196        if not self.hide_cursor:
197            self.write(self.t.hide_cursor)
198        if height != self._last_rendered_height or width != self._last_rendered_width:
199            self.on_terminal_size_change(height, width)
200
201        current_lines_by_row: Dict[int, Optional[FmtStr]] = {}
202        rows = list(range(height))
203        rows_for_use = rows[: len(array)]
204        rest_of_rows = rows[len(array) :]
205
206        # rows which we have content for and don't require scrolling
207        for row, line in zip(rows_for_use, array):
208            current_lines_by_row[row] = line
209            if line == self._last_lines_by_row.get(row, None):
210                continue
211            self.write(self.t.move(row, 0))
212            self.write(for_stdout(line))
213            if len(line) < width:
214                self.write(self.t.clear_eol)
215
216        # rows onscreen that we don't have content for
217        for row in rest_of_rows:
218            if self._last_lines_by_row and row not in self._last_lines_by_row:
219                continue
220            self.write(self.t.move(row, 0))
221            self.write(self.t.clear_eol)
222            self.write(self.t.clear_bol)
223            current_lines_by_row[row] = None
224
225        logger.debug("lines in last lines by row: %r" % self._last_lines_by_row.keys())
226        logger.debug("lines in current lines by row: %r" % current_lines_by_row.keys())
227        self.write(self.t.move(*cursor_pos))
228        self._last_lines_by_row = current_lines_by_row
229        if not self.hide_cursor:
230            self.write(self.t.normal_cursor)
231
232
233class CursorAwareWindow(BaseWindow):
234    """
235    Renders to the normal terminal screen and
236    can find the location of the cursor.
237
238    Note:
239        The context of the CursorAwareWindow
240        object must be entered before calling any of its methods.
241
242        Within the context of CursorAwareWindow, refrain from writing to
243        its out_stream; cached writes will be inaccurate and calculating
244        cursor depends on cursor not having moved since the last render.
245        Only use the render_to_terminal interface for moving the cursor.
246    """
247
248    def __init__(
249        self,
250        out_stream: Optional[IO] = None,
251        in_stream: Optional[IO] = None,
252        keep_last_line: bool = False,
253        hide_cursor: bool = True,
254        extra_bytes_callback: Optional[Callable[[bytes], None]] = None,
255    ):
256        """Constructs a CursorAwareWindow
257
258        Args:
259            out_stream (file): Defaults to sys.__stdout__
260            in_stream (file): Defaults to sys.__stdin__
261            keep_last_line (bool): Causes the cursor to be moved down one line
262                on leaving context
263            hide_cursor (bool): Hides cursor while in context
264            extra_bytes_callback (f(bytes) -> None): Will be called with extra
265                bytes inadvertently read in get_cursor_position(). If not
266                provided, a ValueError will be raised when this occurs.
267        """
268        super().__init__(out_stream=out_stream, hide_cursor=hide_cursor)
269        if in_stream is None:
270            in_stream = sys.__stdin__
271        self.in_stream = in_stream
272        self._last_cursor_column: Optional[int] = None
273        self._last_cursor_row: Optional[int] = None
274        self.keep_last_line = keep_last_line
275        self.cbreak = Cbreak(self.in_stream)
276        self.extra_bytes_callback = extra_bytes_callback
277
278        # whether another SIGWINCH is queued up
279        self.another_sigwinch = False
280
281        # in the cursor query code of cursor diff
282        self.in_get_cursor_diff = False
283
284    def __enter__(self) -> "CursorAwareWindow":
285        self.cbreak.__enter__()
286        self.top_usable_row, _ = self.get_cursor_position()
287        self._orig_top_usable_row = self.top_usable_row
288        logger.debug("initial top_usable_row: %d" % self.top_usable_row)
289        return super().__enter__()
290
291    def __exit__(
292        self,
293        type: Optional[Type[BaseException]] = None,
294        value: Optional[BaseException] = None,
295        traceback: Optional[TracebackType] = None,
296    ) -> None:
297        if self.keep_last_line:
298            # just moves cursor down if not on last line
299            self.write(SCROLL_DOWN)
300
301        self.write(FIRST_COLUMN)
302        self.write(self.t.clear_eos)
303        self.write(self.t.clear_eol)
304        self.cbreak.__exit__(type, value, traceback)
305        super().__exit__(type, value, traceback)
306
307    def get_cursor_position(self) -> Tuple[int, int]:
308        """Returns the terminal (row, column) of the cursor
309
310        0-indexed, like blessings cursor positions"""
311        # TODO would this be cleaner as a parameter?
312        in_stream = self.in_stream
313
314        query_cursor_position = "\x1b[6n"
315        self.write(query_cursor_position)
316
317        def retrying_read() -> str:
318            while True:
319                try:
320                    c = in_stream.read(1)
321                    if c == "":
322                        raise ValueError(
323                            "Stream should be blocking - shouldn't"
324                            " return ''. Returned %r so far",
325                            (resp,),
326                        )
327                    return c
328                except OSError:
329                    # apparently sometimes this happens: the only documented
330                    # case is Terminal on a Ubuntu 17.10 VM on osx 10.13.
331                    # see issue #732
332                    logger.info("stdin.read(1) that should never error just errored.")
333                    continue
334
335        resp = ""
336        while True:
337            c = retrying_read()
338            resp += c
339            m = re.search(
340                r"(?P<extra>.*)"
341                r"(?P<CSI>\x1b\[|\x9b)"
342                r"(?P<row>\d+);(?P<column>\d+)R",
343                resp,
344                re.DOTALL,
345            )
346            if m:
347                row = int(m.groupdict()["row"])
348                col = int(m.groupdict()["column"])
349                extra = m.groupdict()["extra"]
350                if extra:
351                    if self.extra_bytes_callback is not None:
352                        self.extra_bytes_callback(
353                            # TODO how do we know that this works?
354                            extra.encode(cast(TextIO, in_stream).encoding)
355                        )
356                    else:
357                        raise ValueError(
358                            (
359                                "Bytes preceding cursor position "
360                                "query response thrown out:\n%r\n"
361                                "Pass an extra_bytes_callback to "
362                                "CursorAwareWindow to prevent this"
363                            )
364                            % (extra,)
365                        )
366                return (row - 1, col - 1)
367
368    def get_cursor_vertical_diff(self) -> int:
369        """Returns the how far down the cursor moved since last render.
370
371        Note:
372            If another get_cursor_vertical_diff call is already in progress,
373            immediately returns zero. (This situation is likely if
374            get_cursor_vertical_diff is called from a SIGWINCH signal
375            handler, since sigwinches can happen in rapid succession and
376            terminal emulators seem not to respond to cursor position
377            queries before the next sigwinch occurs.)
378        """
379        # Probably called by a SIGWINCH handler, and therefore
380        # will do cursor querying until a SIGWINCH doesn't happen during
381        # the query. Calls to the function from a signal handler COULD STILL
382        # HAPPEN out of order -
383        # they just can't interrupt the actual cursor query.
384        if self.in_get_cursor_diff:
385            self.another_sigwinch = True
386            return 0
387
388        cursor_dy = 0
389        while True:
390            self.in_get_cursor_diff = True
391            self.another_sigwinch = False
392            cursor_dy += self._get_cursor_vertical_diff_once()
393            self.in_get_cursor_diff = False
394            if not self.another_sigwinch:
395                return cursor_dy
396
397    def _get_cursor_vertical_diff_once(self) -> int:
398        """Returns the how far down the cursor moved."""
399        old_top_usable_row = self.top_usable_row
400        row, col = self.get_cursor_position()
401        if self._last_cursor_row is None:
402            cursor_dy = 0
403        else:
404            cursor_dy = row - self._last_cursor_row
405            logger.info("cursor moved %d lines down" % cursor_dy)
406            while self.top_usable_row > -1 and cursor_dy > 0:
407                self.top_usable_row += 1
408                cursor_dy -= 1
409            while self.top_usable_row > 1 and cursor_dy < 0:
410                self.top_usable_row -= 1
411                cursor_dy += 1
412        logger.info(
413            "top usable row changed from %d to %d",
414            old_top_usable_row,
415            self.top_usable_row,
416        )
417        logger.info("returning cursor dy of %d from curtsies" % cursor_dy)
418        self._last_cursor_row = row
419        return cursor_dy
420
421    def render_to_terminal(
422        self, array: Union[FSArray, List[FmtStr]], cursor_pos: Tuple[int, int] = (0, 0)
423    ) -> int:
424        """Renders array to terminal, returns the number of lines scrolled offscreen
425
426        Returns:
427            Number of times scrolled
428
429        Args:
430          array (FSArray): Grid of styled characters to be rendered.
431
432            If array received is of width too small, render it anyway
433
434            if array received is of width too large, render it anyway
435
436            if array received is of height too small, render it anyway
437
438            if array received is of height too large, render it, scroll down,
439            and render the rest of it, then return how much we scrolled down
440
441        """
442        for_stdout = self.fmtstr_to_stdout_xform()
443        # caching of write and tc (avoiding the self. lookups etc) made
444        # no significant performance difference here
445        if not self.hide_cursor:
446            self.write(self.t.hide_cursor)
447
448        # TODO race condition here?
449        height, width = self.t.height, self.t.width
450        if height != self._last_rendered_height or width != self._last_rendered_width:
451            self.on_terminal_size_change(height, width)
452
453        current_lines_by_row: Dict[int, Optional[FmtStr]] = {}
454        rows_for_use = list(range(self.top_usable_row, height))
455
456        # rows which we have content for and don't require scrolling
457        # TODO rename shared
458        shared = min(len(array), len(rows_for_use))
459        for row, line in zip(rows_for_use[:shared], array[:shared]):
460            current_lines_by_row[row] = line
461            if line == self._last_lines_by_row.get(row, None):
462                continue
463            self.write(self.t.move(row, 0))
464            self.write(for_stdout(line))
465            if len(line) < width:
466                self.write(self.t.clear_eol)
467
468        # rows already on screen that we don't have content for
469        rest_of_lines = array[shared:]
470        rest_of_rows = rows_for_use[shared:]
471        for row in rest_of_rows:  # if array too small
472            if self._last_lines_by_row and row not in self._last_lines_by_row:
473                continue
474            self.write(self.t.move(row, 0))
475            self.write(self.t.clear_eol)
476            # TODO probably not necessary - is first char cleared?
477            self.write(self.t.clear_bol)
478            current_lines_by_row[row] = None
479
480        # lines for which we need to scroll down to render
481        offscreen_scrolls = 0
482        for line in rest_of_lines:  # if array too big
483            self.scroll_down()
484            if self.top_usable_row > 0:
485                self.top_usable_row -= 1
486            else:
487                offscreen_scrolls += 1
488            current_lines_by_row = {k - 1: v for k, v in current_lines_by_row.items()}
489            logger.debug("new top_usable_row: %d" % self.top_usable_row)
490            # since scrolling moves the cursor
491            self.write(self.t.move(height - 1, 0))
492            self.write(for_stdout(line))
493            current_lines_by_row[height - 1] = line
494
495        logger.debug("lines in last lines by row: %r" % self._last_lines_by_row.keys())
496        logger.debug("lines in current lines by row: %r" % current_lines_by_row.keys())
497        self._last_cursor_row = max(
498            0, cursor_pos[0] - offscreen_scrolls + self.top_usable_row
499        )
500        self._last_cursor_column = cursor_pos[1]
501        self.write(self.t.move(self._last_cursor_row, self._last_cursor_column))
502        self._last_lines_by_row = current_lines_by_row
503        if not self.hide_cursor:
504            self.write(self.t.normal_cursor)
505        return offscreen_scrolls
506
507
508def demo() -> None:
509    handler = logging.FileHandler(filename="display.log")
510    logging.getLogger(__name__).setLevel(logging.DEBUG)
511    logging.getLogger(__name__).addHandler(handler)
512    from . import input
513
514    with FullscreenWindow(sys.stdout) as w:
515        with input.Input(sys.stdin) as input_generator:
516            rows, columns = w.t.height, w.t.width
517            for c in input_generator:
518                assert isinstance(c, str)
519                if c == "":
520                    sys.exit()  # same as raise SystemExit()
521                elif c == "h":
522                    a: Union[List[FmtStr], FSArray] = w.array_from_text(
523                        "a for small array"
524                    )
525                elif c == "a":
526                    a = [fmtstr(c * columns) for _ in range(rows)]
527                elif c == "s":
528                    a = [fmtstr(c * columns) for _ in range(rows - 1)]
529                elif c == "d":
530                    a = [fmtstr(c * columns) for _ in range(rows + 1)]
531                elif c == "f":
532                    a = [fmtstr(c * columns) for _ in range(rows - 2)]
533                elif c == "q":
534                    a = [fmtstr(c * columns) for _ in range(1)]
535                elif c == "w":
536                    a = [fmtstr(c * columns) for _ in range(1)]
537                elif c == "e":
538                    a = [fmtstr(c * columns) for _ in range(1)]
539                elif c == "\x0c":  # ctrl-L
540                    for _ in range(rows):
541                        w.write("\n")
542                    continue
543                else:
544                    a = w.array_from_text("unknown command")
545                w.render_to_terminal(a)
546
547
548def main() -> None:
549    handler = logging.FileHandler(filename="display.log")
550    logging.getLogger(__name__).setLevel(logging.DEBUG)
551    logging.getLogger(__name__).addHandler(handler)
552    print("this should be just off-screen")
553    w = FullscreenWindow(sys.stdout)
554    rows, columns = w.t.height, w.t.width
555    with w:
556        a = [fmtstr(((f".row{row!r}.") * rows)[:columns]) for row in range(rows)]
557        w.render_to_terminal(a)
558
559
560if __name__ == "__main__":
561    demo()
562