1import sys
2from threading import Event, RLock, Thread
3from typing import IO, Any, Callable, List, Optional
4
5from . import get_console
6from .console import Console, ConsoleRenderable, RenderableType, RenderHook
7from .control import Control
8from .file_proxy import FileProxy
9from .jupyter import JupyterMixin
10from .live_render import LiveRender, VerticalOverflowMethod
11from .screen import Screen
12from .text import Text
13
14
15class _RefreshThread(Thread):
16    """A thread that calls refresh() at regular intervals."""
17
18    def __init__(self, live: "Live", refresh_per_second: float) -> None:
19        self.live = live
20        self.refresh_per_second = refresh_per_second
21        self.done = Event()
22        super().__init__(daemon=True)
23
24    def stop(self) -> None:
25        self.done.set()
26
27    def run(self) -> None:
28        while not self.done.wait(1 / self.refresh_per_second):
29            with self.live._lock:
30                if not self.done.is_set():
31                    self.live.refresh()
32
33
34class Live(JupyterMixin, RenderHook):
35    """Renders an auto-updating live display of any given renderable.
36
37    Args:
38        renderable (RenderableType, optional): The renderable to live display. Defaults to displaying nothing.
39        console (Console, optional): Optional Console instance. Default will an internal Console instance writing to stdout.
40        screen (bool, optional): Enable alternate screen mode. Defaults to False.
41        auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()` or `update()` with refresh flag. Defaults to True
42        refresh_per_second (float, optional): Number of times per second to refresh the live display. Defaults to 4.
43        transient (bool, optional): Clear the renderable on exit (has no effect when screen=True). Defaults to False.
44        redirect_stdout (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
45        redirect_stderr (bool, optional): Enable redirection of stderr. Defaults to True.
46        vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the console. Defaults to "ellipsis".
47        get_renderable (Callable[[], RenderableType], optional): Optional callable to get renderable. Defaults to None.
48    """
49
50    def __init__(
51        self,
52        renderable: RenderableType = None,
53        *,
54        console: Console = None,
55        screen: bool = False,
56        auto_refresh: bool = True,
57        refresh_per_second: float = 4,
58        transient: bool = False,
59        redirect_stdout: bool = True,
60        redirect_stderr: bool = True,
61        vertical_overflow: VerticalOverflowMethod = "ellipsis",
62        get_renderable: Callable[[], RenderableType] = None,
63    ) -> None:
64        assert refresh_per_second > 0, "refresh_per_second must be > 0"
65        self._renderable = renderable
66        self.console = console if console is not None else get_console()
67        self._screen = screen
68        self._alt_screen = False
69
70        self._redirect_stdout = redirect_stdout
71        self._redirect_stderr = redirect_stderr
72        self._restore_stdout: Optional[IO[str]] = None
73        self._restore_stderr: Optional[IO[str]] = None
74
75        self._lock = RLock()
76        self.ipy_widget: Optional[Any] = None
77        self.auto_refresh = auto_refresh
78        self._started: bool = False
79        self.transient = True if screen else transient
80
81        self._refresh_thread: Optional[_RefreshThread] = None
82        self.refresh_per_second = refresh_per_second
83
84        self.vertical_overflow = vertical_overflow
85        self._get_renderable = get_renderable
86        self._live_render = LiveRender(
87            self.get_renderable(), vertical_overflow=vertical_overflow
88        )
89
90    @property
91    def is_started(self) -> bool:
92        """Check if live display has been started."""
93        return self._started
94
95    def get_renderable(self) -> RenderableType:
96        renderable = (
97            self._get_renderable()
98            if self._get_renderable is not None
99            else self._renderable
100        )
101        return renderable or ""
102
103    def start(self, refresh=False) -> None:
104        """Start live rendering display.
105
106        Args:
107            refresh (bool, optional): Also refresh. Defaults to False.
108        """
109        with self._lock:
110            if self._started:
111                return
112            self.console.set_live(self)
113            self._started = True
114            if self._screen:
115                self._alt_screen = self.console.set_alt_screen(True)
116            self.console.show_cursor(False)
117            self._enable_redirect_io()
118            self.console.push_render_hook(self)
119            if refresh:
120                self.refresh()
121            if self.auto_refresh:
122                self._refresh_thread = _RefreshThread(self, self.refresh_per_second)
123                self._refresh_thread.start()
124
125    def stop(self) -> None:
126        """Stop live rendering display."""
127        with self._lock:
128            if not self._started:
129                return
130            self.console.clear_live()
131            self._started = False
132            try:
133                if self.auto_refresh and self._refresh_thread is not None:
134                    self._refresh_thread.stop()
135                # allow it to fully render on the last even if overflow
136                self.vertical_overflow = "visible"
137                if not self._alt_screen and not self.console.is_jupyter:
138                    self.refresh()
139
140            finally:
141                self._disable_redirect_io()
142                self.console.pop_render_hook()
143                if not self._alt_screen and self.console.is_terminal:
144                    self.console.line()
145                self.console.show_cursor(True)
146                if self._alt_screen:
147                    self.console.set_alt_screen(False)
148
149        if self._refresh_thread is not None:
150            self._refresh_thread.join()
151            self._refresh_thread = None
152        if self.transient and not self._alt_screen:
153            self.console.control(self._live_render.restore_cursor())
154        if self.ipy_widget is not None:  # pragma: no cover
155            if self.transient:
156                self.ipy_widget.close()
157            else:
158                # jupyter last refresh must occur after console pop render hook
159                # i am not sure why this is needed
160                self.refresh()
161
162    def __enter__(self) -> "Live":
163        self.start(refresh=self._renderable is not None)
164        return self
165
166    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
167        self.stop()
168
169    def _enable_redirect_io(self):
170        """Enable redirecting of stdout / stderr."""
171        if self.console.is_terminal:
172            if self._redirect_stdout and not isinstance(sys.stdout, FileProxy):  # type: ignore
173                self._restore_stdout = sys.stdout
174                sys.stdout = FileProxy(self.console, sys.stdout)
175            if self._redirect_stderr and not isinstance(sys.stderr, FileProxy):  # type: ignore
176                self._restore_stderr = sys.stderr
177                sys.stderr = FileProxy(self.console, sys.stderr)
178
179    def _disable_redirect_io(self):
180        """Disable redirecting of stdout / stderr."""
181        if self._restore_stdout:
182            sys.stdout = self._restore_stdout
183            self._restore_stdout = None
184        if self._restore_stderr:
185            sys.stderr = self._restore_stderr
186            self._restore_stderr = None
187
188    @property
189    def renderable(self) -> RenderableType:
190        """Get the renderable that is being displayed
191
192        Returns:
193            RenderableType: Displayed renderable.
194        """
195        renderable = self.get_renderable()
196        return Screen(renderable) if self._alt_screen else renderable
197
198    def update(self, renderable: RenderableType, *, refresh: bool = False) -> None:
199        """Update the renderable that is being displayed
200
201        Args:
202            renderable (RenderableType): New renderable to use.
203            refresh (bool, optional): Refresh the display. Defaults to False.
204        """
205        with self._lock:
206            self._renderable = renderable
207            if refresh:
208                self.refresh()
209
210    def refresh(self) -> None:
211        """Update the display of the Live Render."""
212        with self._lock:
213            self._live_render.set_renderable(self.renderable)
214            if self.console.is_jupyter:  # pragma: no cover
215                try:
216                    from IPython.display import display
217                    from ipywidgets import Output
218                except ImportError:
219                    import warnings
220
221                    warnings.warn('install "ipywidgets" for Jupyter support')
222                else:
223                    if self.ipy_widget is None:
224                        self.ipy_widget = Output()
225                        display(self.ipy_widget)
226
227                    with self.ipy_widget:
228                        self.ipy_widget.clear_output(wait=True)
229                        self.console.print(self._live_render.renderable)
230            elif self.console.is_terminal and not self.console.is_dumb_terminal:
231                with self.console:
232                    self.console.print(Control())
233            elif (
234                not self._started and not self.transient
235            ):  # if it is finished allow files or dumb-terminals to see final result
236                with self.console:
237                    self.console.print(Control())
238
239    def process_renderables(
240        self, renderables: List[ConsoleRenderable]
241    ) -> List[ConsoleRenderable]:
242        """Process renderables to restore cursor and display progress."""
243        self._live_render.vertical_overflow = self.vertical_overflow
244        if self.console.is_interactive:
245            # lock needs acquiring as user can modify live_render renderable at any time unlike in Progress.
246            with self._lock:
247                reset = (
248                    Control.home()
249                    if self._alt_screen
250                    else self._live_render.position_cursor()
251                )
252                renderables = [
253                    reset,
254                    *renderables,
255                    self._live_render,
256                ]
257        elif (
258            not self._started and not self.transient
259        ):  # if it is finished render the final output for files or dumb_terminals
260            renderables = [*renderables, self._live_render]
261
262        return renderables
263
264
265if __name__ == "__main__":  # pragma: no cover
266    import random
267    import time
268    from itertools import cycle
269    from typing import Dict, List, Tuple
270
271    from .align import Align
272    from .console import Console
273    from .live import Live
274    from .panel import Panel
275    from .rule import Rule
276    from .syntax import Syntax
277    from .table import Table
278
279    console = Console()
280
281    syntax = Syntax(
282        '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
283    """Iterate and generate a tuple with a flag for last value."""
284    iter_values = iter(values)
285    try:
286        previous_value = next(iter_values)
287    except StopIteration:
288        return
289    for value in iter_values:
290        yield False, previous_value
291        previous_value = value
292    yield True, previous_value''',
293        "python",
294        line_numbers=True,
295    )
296
297    table = Table("foo", "bar", "baz")
298    table.add_row("1", "2", "3")
299
300    progress_renderables = [
301        "You can make the terminal shorter and taller to see the live table hide"
302        "Text may be printed while the progress bars are rendering.",
303        Panel("In fact, [i]any[/i] renderable will work"),
304        "Such as [magenta]tables[/]...",
305        table,
306        "Pretty printed structures...",
307        {"type": "example", "text": "Pretty printed"},
308        "Syntax...",
309        syntax,
310        Rule("Give it a try!"),
311    ]
312
313    examples = cycle(progress_renderables)
314
315    exchanges = [
316        "SGD",
317        "MYR",
318        "EUR",
319        "USD",
320        "AUD",
321        "JPY",
322        "CNH",
323        "HKD",
324        "CAD",
325        "INR",
326        "DKK",
327        "GBP",
328        "RUB",
329        "NZD",
330        "MXN",
331        "IDR",
332        "TWD",
333        "THB",
334        "VND",
335    ]
336    with Live(console=console) as live_table:
337        exchange_rate_dict: Dict[Tuple[str, str], float] = {}
338
339        for index in range(100):
340            select_exchange = exchanges[index % len(exchanges)]
341
342            for exchange in exchanges:
343                if exchange == select_exchange:
344                    continue
345                time.sleep(0.4)
346                if random.randint(0, 10) < 1:
347                    console.log(next(examples))
348                exchange_rate_dict[(select_exchange, exchange)] = 200 / (
349                    (random.random() * 320) + 1
350                )
351                if len(exchange_rate_dict) > len(exchanges) - 1:
352                    exchange_rate_dict.pop(list(exchange_rate_dict.keys())[0])
353                table = Table(title="Exchange Rates")
354
355                table.add_column("Source Currency")
356                table.add_column("Destination Currency")
357                table.add_column("Exchange Rate")
358
359                for ((source, dest), exchange_rate) in exchange_rate_dict.items():
360                    table.add_row(
361                        source,
362                        dest,
363                        Text(
364                            f"{exchange_rate:.4f}",
365                            style="red" if exchange_rate < 1.0 else "green",
366                        ),
367                    )
368
369                live_table.update(Align.center(table))
370