1import functools
2import inspect
3import itertools
4from textwrap import TextWrapper, dedent
5from typing import (
6    Any,
7    Callable,
8    Iterable,
9    Iterator,
10    List,
11    NoReturn,
12    Optional,
13    Tuple,
14)
15
16from blessed import Terminal
17
18from .compat import link
19from .keys import (
20    BINDINGS,
21    EXIT_KEY,
22    HELP as HELP_KEY,
23    KEYS_BY_QUERYMODE,
24    Key,
25    MODES,
26    PAUSE_KEY,
27    PROCESS_CANCEL,
28    PROCESS_KILL,
29    PROCESS_PIN,
30)
31from .types import (
32    ActivityStats,
33    Column,
34    DBInfo,
35    Host,
36    IOCounter,
37    MemoryInfo,
38    QueryDisplayMode,
39    SelectableProcesses,
40    SystemInfo,
41    UI,
42)
43from . import colors, utils
44from .activities import sorted as sorted_processes
45
46
47class line_counter:
48    def __init__(self, start: int) -> None:
49        self.value = start
50
51    def __repr__(self) -> str:
52        return f"{self.__class__.__name__}({self.value})"
53
54    def __next__(self) -> int:
55        current_value = self.value
56        self.value -= 1
57        return current_value
58
59
60def shorten(term: Terminal, text: str, width: Optional[int] = None) -> str:
61    r"""Truncate 'text' to fit in the given 'width' (or term.width).
62
63    This is similar to textwrap.shorten() but sequence-aware.
64
65    >>> term = Terminal()
66    >>> text = f"{term.green('hello')}, world"
67    >>> text
68    'hello, world'
69    >>> shorten(term, text, 6)
70    'hello,'
71    >>> shorten(term, text, 3)
72    'hel'
73    >>> shorten(term, "", 3)
74    ''
75    """
76    if not text:
77        return ""
78    wrapped = term.wrap(text, width=width, max_lines=1)
79    return wrapped[0] + term.normal
80
81
82def limit(func: Callable[..., Iterable[str]]) -> Callable[..., None]:
83    """View decorator handling screen height limit.
84
85    >>> term = Terminal()
86
87    >>> def view(term, n, *, prefix="line"):
88    ...     for i in range(n):
89    ...         yield f"{prefix} #{i}"
90
91    >>> count = line_counter(2)
92    >>> limit(view)(term, 3, lines_counter=count)
93    line #0
94    line #1
95    >>> count
96    line_counter(0)
97    >>> count = line_counter(3)
98    >>> limit(view)(term, 2, lines_counter=count)
99    line #0
100    line #1
101    >>> count
102    line_counter(1)
103    >>> limit(view)(term, 3, prefix="row")
104    row #0
105    row #1
106    row #2
107
108    A single line is displayed with an EOL as well:
109    >>> count = line_counter(10)
110    >>> limit(view)(term, 1, lines_counter=count) or print("<--", end="")
111    line #0
112    <--
113    >>> count
114    line_counter(9)
115    """
116
117    @functools.wraps(func)
118    def wrapper(term: Terminal, *args: Any, **kwargs: Any) -> None:
119        counter = kwargs.pop("lines_counter", None)
120        width = kwargs.pop("width", None)
121        signature = inspect.signature(func)
122        if "width" in signature.parameters:
123            kwargs["width"] = width
124        for line in func(term, *args, **kwargs):
125            print(shorten(term, line, width) + term.clear_eol)
126            if counter is not None and next(counter) == 1:
127                break
128
129    return wrapper
130
131
132@functools.singledispatch
133def render(x: NoReturn) -> str:
134    raise AssertionError(f"not implemented for type '{type(x).__name__}'")
135
136
137@render.register(MemoryInfo)
138def render_meminfo(m: MemoryInfo) -> str:
139    used, total = utils.naturalsize(m.used), utils.naturalsize(m.total)
140    return f"{m.percent}% - {used}/{total}"
141
142
143@render.register(IOCounter)
144def render_iocounter(i: IOCounter) -> str:
145    hbytes = utils.naturalsize(i.bytes)
146    return f"{hbytes}/s - {i.count}/s"
147
148
149@limit
150def help(term: Terminal, version: str, is_local: bool) -> Iterable[str]:
151    """Render help menu."""
152    project_url = "https://github.com/dalibo/pg_activity"
153    intro = dedent(
154        f"""\
155    {term.bold_green}pg_activity {version} - {link(term, project_url, project_url)}
156    {term.normal}Released under PostgreSQL License.
157    """
158    )
159
160    def key_mappings(keys: Iterable[Key]) -> Iterable[str]:
161        for key in keys:
162            key_name = key.name or key.value
163            yield f"{term.bright_cyan}{key_name.rjust(10)}{term.normal}: {key.description}"
164
165    footer = "Press any key to exit."
166    for line in intro.splitlines():
167        yield line
168    yield ""
169
170    bindings = BINDINGS
171    if not is_local:
172        bindings = [b for b in bindings if not b.local_only]
173    yield from key_mappings(bindings)
174    yield "Mode"
175    yield from key_mappings(MODES)
176    yield ""
177    yield footer
178
179
180@limit
181def header(
182    term: Terminal,
183    ui: UI,
184    *,
185    host: Host,
186    dbinfo: DBInfo,
187    pg_version: str,
188    tps: int,
189    active_connections: int,
190    system_info: Optional[SystemInfo] = None,
191) -> Iterator[str]:
192    """Return window header lines."""
193    pg_host = f"{host.user}@{host.host}:{host.port}/{host.dbname}"
194    yield (
195        " - ".join(
196            [
197                pg_version,
198                f"{term.bold}{host.hostname}{term.normal}",
199                f"{term.cyan}{pg_host}{term.normal}",
200                f"Ref.: {ui.refresh_time}s",
201            ]
202            + ([f"Min. duration: {ui.min_duration}s"] if ui.min_duration else [])
203        )
204    )
205
206    total_size = utils.naturalsize(dbinfo.total_size)
207    size_ev = utils.naturalsize(dbinfo.size_ev)
208
209    def render_columns(columns: List[List[str]], *, delimiter: str) -> Iterator[str]:
210        column_widths = [
211            max(len(column_row) for column_row in column) for column in columns
212        ]
213
214        def indent(text: str) -> str:
215            return " " + text
216
217        for row in itertools.zip_longest(*columns, fillvalue=""):
218            yield indent(
219                "".join(
220                    (cell + delimiter).ljust(width + len(delimiter))
221                    for width, cell in zip(column_widths, row)
222                )
223            ).rstrip().rstrip(delimiter.strip())
224
225    # First row is always displayed, as underlying data is always available.
226    columns = [
227        [f"Size: {total_size} - {size_ev}/s"],
228        [f"TPS: {term.bold_green(str(tps))}"],
229        [f"Active connections: {term.bold_green(str(active_connections))}"],
230        [f"Duration mode: {term.bold_green(ui.duration_mode.name)}"],
231    ]
232    yield from render_columns(columns, delimiter=f" {term.bold_blue('⋅')} ")
233
234    # System information, only available in "local" mode.
235    if system_info is not None:
236        load = system_info.load
237        system_columns = [
238            [
239                f"Mem.: {render(system_info.memory)}",
240                f"Swap: {render(system_info.swap)}",
241                f"Load: {load.avg1:.2f} {load.avg5:.2f} {load.avg15:.2f}",
242            ],
243            [
244                f"IO Max: {system_info.max_iops}/s",
245                f"Read:   {render(system_info.io_read)}",
246                f"Write:  {render(system_info.io_write)}",
247            ],
248        ]
249        yield from render_columns(system_columns, delimiter=",   ")
250
251
252@limit
253def query_mode(term: Terminal, ui: UI) -> Iterator[str]:
254    r"""Display query mode title.
255
256    >>> from pgactivity.types import QueryMode, UI
257
258    >>> term = Terminal()
259    >>> ui = UI.make(query_mode=QueryMode.blocking)
260    >>> query_mode(term, ui)
261                                    BLOCKING QUERIES
262    >>> ui = UI.make(query_mode=QueryMode.activities, in_pause=True)
263    >>> query_mode(term, ui)  # doctest: +NORMALIZE_WHITESPACE
264                                    PAUSE
265    """
266    if ui.in_pause:
267        yield term.black_on_yellow(term.center("PAUSE", fillchar=" "))
268    else:
269        yield term.green_bold(
270            term.center(ui.query_mode.value.upper(), fillchar=" ").rstrip()
271        )
272
273
274@limit
275def columns_header(term: Terminal, ui: UI) -> Iterator[str]:
276    """Yield columns header lines."""
277    htitles = []
278    for column in ui.columns():
279        color = getattr(term, f"black_on_{column.title_color(ui.sort_key)}")
280        htitles.append(f"{color}{column.title_render()}")
281    yield term.ljust("".join(htitles), fillchar=" ") + term.normal
282
283
284def get_indent(ui: UI) -> str:
285    """Return identation for Query column.
286
287    >>> from pgactivity.types import Flag, UI
288    >>> ui = UI.make(flag=Flag.CPU)
289    >>> get_indent(ui)
290    '                           '
291    >>> ui = UI.make(flag=Flag.PID | Flag.DATABASE | Flag.APPNAME | Flag.RELATION)
292    >>> get_indent(ui)
293    '                                                             '
294    """
295    indent = ""
296    for column in ui.columns():
297        if column.name != "Query":
298            indent += column.template_h % " "
299    return indent
300
301
302def format_query(query: str, is_parallel_worker: bool) -> str:
303    r"""Return the query string formatted.
304
305    >>> print(format_query("SELECT 1", True))
306    \_ SELECT 1
307    >>> format_query("SELECT   1", False)
308    'SELECT 1'
309    """
310    prefix = r"\_ " if is_parallel_worker else ""
311    return prefix + utils.clean_str(query)
312
313
314@limit
315def processes_rows(
316    term: Terminal,
317    ui: UI,
318    processes: SelectableProcesses,
319    width: Optional[int],
320) -> Iterator[str]:
321    """Display table rows with processes information."""
322
323    if width is None:
324        width = term.width
325
326    def text_append(value: str) -> None:
327        # We also restore 'normal' style so that the next item does not
328        # inherit previous one's style.
329        text.append(value + term.normal)
330
331    def cell(
332        value: Any,
333        column: Column,
334    ) -> None:
335        color = getattr(term, colors.FIELD_BY_MODE[column.color(value)][color_type])
336        text_append(f"{color}{column.render(value)}")
337
338    focused, pinned = processes.focused, processes.pinned
339
340    for process in processes:
341        if process.pid == focused:
342            color_type = "cursor"
343        elif process.pid in pinned:
344            color_type = "yellow"
345        else:
346            color_type = "default"
347        text: List[str] = []
348        for column in ui.columns():
349            field = column.key
350            if field != "query":
351                cell(getattr(process, field), column)
352
353        indent = get_indent(ui) + " "
354        dif = width - len(indent)
355
356        query_display_mode = ui.query_display_mode
357        if dif < 0:
358            # Switch to wrap_noindent mode if terminal is too narrow.
359            query_display_mode = QueryDisplayMode.wrap_noindent
360
361        if process.query is not None:
362            query = format_query(process.query, process.is_parallel_worker)
363
364            if query_display_mode == QueryDisplayMode.truncate:
365                query_value = query[:dif]
366            else:
367                if query_display_mode == QueryDisplayMode.wrap_noindent:
368                    if term.length(query.split(" ", 1)[0]) >= dif:
369                        # Query too long to even start on the first line, wrap all
370                        # lines.
371                        query_lines = TextWrapper(width).wrap(query)
372                    else:
373                        # Only wrap subsequent lines.
374                        wrapped_lines = TextWrapper(dif, drop_whitespace=False).wrap(
375                            query
376                        )
377                        if wrapped_lines:
378                            query_lines = [wrapped_lines[0]] + TextWrapper(width).wrap(
379                                "".join(wrapped_lines[1:]).lstrip()
380                            )
381                        else:
382                            query_lines = []
383                    query_value = "\n".join(query_lines)
384                else:
385                    assert (
386                        query_display_mode == QueryDisplayMode.wrap
387                    ), f"unexpected mode {query_display_mode}"
388                    wrapped_lines = TextWrapper(dif).wrap(query)
389                    query_value = f"\n{indent}".join(wrapped_lines)
390
391            cell(query_value, ui.column("query"))
392
393        for line in ("".join(text) + term.normal).splitlines():
394            yield line
395
396
397def footer_message(term: Terminal, message: str, width: Optional[int] = None) -> None:
398    if width is None:
399        width = term.width
400    print(term.center(message[:width]) + term.normal, end="")
401
402
403def footer_help(term: Terminal, width: Optional[int] = None) -> None:
404    """Footer line with help keys."""
405    query_modes_help = [
406        ("/".join(keys[:-1]), qm.value) for qm, keys in KEYS_BY_QUERYMODE.items()
407    ]
408    assert PAUSE_KEY.name is not None
409    footer_values = query_modes_help + [
410        (PAUSE_KEY.name, PAUSE_KEY.description),
411        (EXIT_KEY.value, EXIT_KEY.description),
412        (HELP_KEY, "help"),
413    ]
414    render_footer(term, footer_values, width)
415
416
417def render_footer(
418    term: Terminal, footer_values: List[Tuple[str, str]], width: Optional[int]
419) -> None:
420    if width is None:
421        width = term.width
422    ncols = len(footer_values)
423    column_width = (width - ncols - 1) // ncols
424
425    def render_column(key: str, desc: str) -> str:
426        col_width = column_width - term.length(key) - 1
427        if col_width <= 0:
428            return ""
429        desc = term.ljust(desc[:col_width], width=col_width, fillchar=" ")
430        return f"{key} {term.cyan_reverse(desc)}"
431
432    row = " ".join(
433        [render_column(key, desc.capitalize()) for key, desc in footer_values]
434    )
435    assert term.length(row) <= width, (term.length(row), width, ncols)
436    print(term.ljust(row, width=width, fillchar=term.cyan_reverse(" ")), end="")
437
438
439def footer_interative_help(term: Terminal, width: Optional[int] = None) -> None:
440    """Footer line with help keys for interactive mode."""
441    assert PROCESS_PIN.name is not None
442    footer_values = [
443        (PROCESS_CANCEL, "cancel current query"),
444        (PROCESS_KILL, "terminate current query"),
445        (PROCESS_PIN.name, PROCESS_PIN.description),
446        ("Other", "back to activities"),
447        (EXIT_KEY.value, EXIT_KEY.description),
448    ]
449    return render_footer(term, footer_values, width)
450
451
452def screen(
453    term: Terminal,
454    ui: UI,
455    *,
456    host: Host,
457    dbinfo: DBInfo,
458    pg_version: str,
459    tps: int,
460    active_connections: int,
461    activity_stats: ActivityStats,
462    message: Optional[str],
463    render_header: bool = True,
464    render_footer: bool = True,
465    width: Optional[int] = None,
466) -> None:
467    """Display the screen."""
468
469    system_info: Optional[SystemInfo]
470    if isinstance(activity_stats, tuple):
471        processes, system_info = activity_stats
472    else:
473        processes, system_info = activity_stats, None
474    processes.set_items(sorted_processes(processes, key=ui.sort_key, reverse=True))
475
476    print(term.home, end="")
477    top_height = term.height - (1 if render_footer else 0)
478    lines_counter = line_counter(top_height)
479
480    if render_header:
481        header(
482            term,
483            ui,
484            host=host,
485            dbinfo=dbinfo,
486            pg_version=pg_version,
487            tps=tps,
488            active_connections=active_connections,
489            system_info=system_info,
490            lines_counter=lines_counter,
491            width=width,
492        )
493
494    query_mode(term, ui, lines_counter=lines_counter, width=width)
495    columns_header(term, ui, lines_counter=lines_counter, width=width)
496    processes_rows(
497        term,
498        ui,
499        processes,
500        lines_counter=lines_counter,
501        width=width,
502    )
503
504    # Clear remaining lines in screen until footer (or EOS)
505    print(f"{term.clear_eol}\n" * lines_counter.value, end="")
506
507    if render_footer:
508        with term.location(x=0, y=top_height):
509            if message is not None:
510                footer_message(term, message, width)
511            elif ui.interactive():
512                footer_interative_help(term, width)
513            else:
514                footer_help(term, width)
515