1import functools 2import inspect 3import itertools 4from textwrap import TextWrapper, dedent 5from typing import ( 6 Any, 7 Callable, 8 Iterable, 9 Iterator, 10 List, 11 NoReturn, 12 Optional, 13 Tuple, 14) 15 16from blessed import Terminal 17 18from .compat import link 19from .keys import ( 20 BINDINGS, 21 EXIT_KEY, 22 HELP as HELP_KEY, 23 KEYS_BY_QUERYMODE, 24 Key, 25 MODES, 26 PAUSE_KEY, 27 PROCESS_CANCEL, 28 PROCESS_KILL, 29 PROCESS_PIN, 30) 31from .types import ( 32 ActivityStats, 33 Column, 34 DBInfo, 35 Host, 36 IOCounter, 37 MemoryInfo, 38 QueryDisplayMode, 39 SelectableProcesses, 40 SystemInfo, 41 UI, 42) 43from . import colors, utils 44from .activities import sorted as sorted_processes 45 46 47class line_counter: 48 def __init__(self, start: int) -> None: 49 self.value = start 50 51 def __repr__(self) -> str: 52 return f"{self.__class__.__name__}({self.value})" 53 54 def __next__(self) -> int: 55 current_value = self.value 56 self.value -= 1 57 return current_value 58 59 60def shorten(term: Terminal, text: str, width: Optional[int] = None) -> str: 61 r"""Truncate 'text' to fit in the given 'width' (or term.width). 62 63 This is similar to textwrap.shorten() but sequence-aware. 64 65 >>> term = Terminal() 66 >>> text = f"{term.green('hello')}, world" 67 >>> text 68 'hello, world' 69 >>> shorten(term, text, 6) 70 'hello,' 71 >>> shorten(term, text, 3) 72 'hel' 73 >>> shorten(term, "", 3) 74 '' 75 """ 76 if not text: 77 return "" 78 wrapped = term.wrap(text, width=width, max_lines=1) 79 return wrapped[0] + term.normal 80 81 82def limit(func: Callable[..., Iterable[str]]) -> Callable[..., None]: 83 """View decorator handling screen height limit. 84 85 >>> term = Terminal() 86 87 >>> def view(term, n, *, prefix="line"): 88 ... for i in range(n): 89 ... yield f"{prefix} #{i}" 90 91 >>> count = line_counter(2) 92 >>> limit(view)(term, 3, lines_counter=count) 93 line #0 94 line #1 95 >>> count 96 line_counter(0) 97 >>> count = line_counter(3) 98 >>> limit(view)(term, 2, lines_counter=count) 99 line #0 100 line #1 101 >>> count 102 line_counter(1) 103 >>> limit(view)(term, 3, prefix="row") 104 row #0 105 row #1 106 row #2 107 108 A single line is displayed with an EOL as well: 109 >>> count = line_counter(10) 110 >>> limit(view)(term, 1, lines_counter=count) or print("<--", end="") 111 line #0 112 <-- 113 >>> count 114 line_counter(9) 115 """ 116 117 @functools.wraps(func) 118 def wrapper(term: Terminal, *args: Any, **kwargs: Any) -> None: 119 counter = kwargs.pop("lines_counter", None) 120 width = kwargs.pop("width", None) 121 signature = inspect.signature(func) 122 if "width" in signature.parameters: 123 kwargs["width"] = width 124 for line in func(term, *args, **kwargs): 125 print(shorten(term, line, width) + term.clear_eol) 126 if counter is not None and next(counter) == 1: 127 break 128 129 return wrapper 130 131 132@functools.singledispatch 133def render(x: NoReturn) -> str: 134 raise AssertionError(f"not implemented for type '{type(x).__name__}'") 135 136 137@render.register(MemoryInfo) 138def render_meminfo(m: MemoryInfo) -> str: 139 used, total = utils.naturalsize(m.used), utils.naturalsize(m.total) 140 return f"{m.percent}% - {used}/{total}" 141 142 143@render.register(IOCounter) 144def render_iocounter(i: IOCounter) -> str: 145 hbytes = utils.naturalsize(i.bytes) 146 return f"{hbytes}/s - {i.count}/s" 147 148 149@limit 150def help(term: Terminal, version: str, is_local: bool) -> Iterable[str]: 151 """Render help menu.""" 152 project_url = "https://github.com/dalibo/pg_activity" 153 intro = dedent( 154 f"""\ 155 {term.bold_green}pg_activity {version} - {link(term, project_url, project_url)} 156 {term.normal}Released under PostgreSQL License. 157 """ 158 ) 159 160 def key_mappings(keys: Iterable[Key]) -> Iterable[str]: 161 for key in keys: 162 key_name = key.name or key.value 163 yield f"{term.bright_cyan}{key_name.rjust(10)}{term.normal}: {key.description}" 164 165 footer = "Press any key to exit." 166 for line in intro.splitlines(): 167 yield line 168 yield "" 169 170 bindings = BINDINGS 171 if not is_local: 172 bindings = [b for b in bindings if not b.local_only] 173 yield from key_mappings(bindings) 174 yield "Mode" 175 yield from key_mappings(MODES) 176 yield "" 177 yield footer 178 179 180@limit 181def header( 182 term: Terminal, 183 ui: UI, 184 *, 185 host: Host, 186 dbinfo: DBInfo, 187 pg_version: str, 188 tps: int, 189 active_connections: int, 190 system_info: Optional[SystemInfo] = None, 191) -> Iterator[str]: 192 """Return window header lines.""" 193 pg_host = f"{host.user}@{host.host}:{host.port}/{host.dbname}" 194 yield ( 195 " - ".join( 196 [ 197 pg_version, 198 f"{term.bold}{host.hostname}{term.normal}", 199 f"{term.cyan}{pg_host}{term.normal}", 200 f"Ref.: {ui.refresh_time}s", 201 ] 202 + ([f"Min. duration: {ui.min_duration}s"] if ui.min_duration else []) 203 ) 204 ) 205 206 total_size = utils.naturalsize(dbinfo.total_size) 207 size_ev = utils.naturalsize(dbinfo.size_ev) 208 209 def render_columns(columns: List[List[str]], *, delimiter: str) -> Iterator[str]: 210 column_widths = [ 211 max(len(column_row) for column_row in column) for column in columns 212 ] 213 214 def indent(text: str) -> str: 215 return " " + text 216 217 for row in itertools.zip_longest(*columns, fillvalue=""): 218 yield indent( 219 "".join( 220 (cell + delimiter).ljust(width + len(delimiter)) 221 for width, cell in zip(column_widths, row) 222 ) 223 ).rstrip().rstrip(delimiter.strip()) 224 225 # First row is always displayed, as underlying data is always available. 226 columns = [ 227 [f"Size: {total_size} - {size_ev}/s"], 228 [f"TPS: {term.bold_green(str(tps))}"], 229 [f"Active connections: {term.bold_green(str(active_connections))}"], 230 [f"Duration mode: {term.bold_green(ui.duration_mode.name)}"], 231 ] 232 yield from render_columns(columns, delimiter=f" {term.bold_blue('⋅')} ") 233 234 # System information, only available in "local" mode. 235 if system_info is not None: 236 load = system_info.load 237 system_columns = [ 238 [ 239 f"Mem.: {render(system_info.memory)}", 240 f"Swap: {render(system_info.swap)}", 241 f"Load: {load.avg1:.2f} {load.avg5:.2f} {load.avg15:.2f}", 242 ], 243 [ 244 f"IO Max: {system_info.max_iops}/s", 245 f"Read: {render(system_info.io_read)}", 246 f"Write: {render(system_info.io_write)}", 247 ], 248 ] 249 yield from render_columns(system_columns, delimiter=", ") 250 251 252@limit 253def query_mode(term: Terminal, ui: UI) -> Iterator[str]: 254 r"""Display query mode title. 255 256 >>> from pgactivity.types import QueryMode, UI 257 258 >>> term = Terminal() 259 >>> ui = UI.make(query_mode=QueryMode.blocking) 260 >>> query_mode(term, ui) 261 BLOCKING QUERIES 262 >>> ui = UI.make(query_mode=QueryMode.activities, in_pause=True) 263 >>> query_mode(term, ui) # doctest: +NORMALIZE_WHITESPACE 264 PAUSE 265 """ 266 if ui.in_pause: 267 yield term.black_on_yellow(term.center("PAUSE", fillchar=" ")) 268 else: 269 yield term.green_bold( 270 term.center(ui.query_mode.value.upper(), fillchar=" ").rstrip() 271 ) 272 273 274@limit 275def columns_header(term: Terminal, ui: UI) -> Iterator[str]: 276 """Yield columns header lines.""" 277 htitles = [] 278 for column in ui.columns(): 279 color = getattr(term, f"black_on_{column.title_color(ui.sort_key)}") 280 htitles.append(f"{color}{column.title_render()}") 281 yield term.ljust("".join(htitles), fillchar=" ") + term.normal 282 283 284def get_indent(ui: UI) -> str: 285 """Return identation for Query column. 286 287 >>> from pgactivity.types import Flag, UI 288 >>> ui = UI.make(flag=Flag.CPU) 289 >>> get_indent(ui) 290 ' ' 291 >>> ui = UI.make(flag=Flag.PID | Flag.DATABASE | Flag.APPNAME | Flag.RELATION) 292 >>> get_indent(ui) 293 ' ' 294 """ 295 indent = "" 296 for column in ui.columns(): 297 if column.name != "Query": 298 indent += column.template_h % " " 299 return indent 300 301 302def format_query(query: str, is_parallel_worker: bool) -> str: 303 r"""Return the query string formatted. 304 305 >>> print(format_query("SELECT 1", True)) 306 \_ SELECT 1 307 >>> format_query("SELECT 1", False) 308 'SELECT 1' 309 """ 310 prefix = r"\_ " if is_parallel_worker else "" 311 return prefix + utils.clean_str(query) 312 313 314@limit 315def processes_rows( 316 term: Terminal, 317 ui: UI, 318 processes: SelectableProcesses, 319 width: Optional[int], 320) -> Iterator[str]: 321 """Display table rows with processes information.""" 322 323 if width is None: 324 width = term.width 325 326 def text_append(value: str) -> None: 327 # We also restore 'normal' style so that the next item does not 328 # inherit previous one's style. 329 text.append(value + term.normal) 330 331 def cell( 332 value: Any, 333 column: Column, 334 ) -> None: 335 color = getattr(term, colors.FIELD_BY_MODE[column.color(value)][color_type]) 336 text_append(f"{color}{column.render(value)}") 337 338 focused, pinned = processes.focused, processes.pinned 339 340 for process in processes: 341 if process.pid == focused: 342 color_type = "cursor" 343 elif process.pid in pinned: 344 color_type = "yellow" 345 else: 346 color_type = "default" 347 text: List[str] = [] 348 for column in ui.columns(): 349 field = column.key 350 if field != "query": 351 cell(getattr(process, field), column) 352 353 indent = get_indent(ui) + " " 354 dif = width - len(indent) 355 356 query_display_mode = ui.query_display_mode 357 if dif < 0: 358 # Switch to wrap_noindent mode if terminal is too narrow. 359 query_display_mode = QueryDisplayMode.wrap_noindent 360 361 if process.query is not None: 362 query = format_query(process.query, process.is_parallel_worker) 363 364 if query_display_mode == QueryDisplayMode.truncate: 365 query_value = query[:dif] 366 else: 367 if query_display_mode == QueryDisplayMode.wrap_noindent: 368 if term.length(query.split(" ", 1)[0]) >= dif: 369 # Query too long to even start on the first line, wrap all 370 # lines. 371 query_lines = TextWrapper(width).wrap(query) 372 else: 373 # Only wrap subsequent lines. 374 wrapped_lines = TextWrapper(dif, drop_whitespace=False).wrap( 375 query 376 ) 377 if wrapped_lines: 378 query_lines = [wrapped_lines[0]] + TextWrapper(width).wrap( 379 "".join(wrapped_lines[1:]).lstrip() 380 ) 381 else: 382 query_lines = [] 383 query_value = "\n".join(query_lines) 384 else: 385 assert ( 386 query_display_mode == QueryDisplayMode.wrap 387 ), f"unexpected mode {query_display_mode}" 388 wrapped_lines = TextWrapper(dif).wrap(query) 389 query_value = f"\n{indent}".join(wrapped_lines) 390 391 cell(query_value, ui.column("query")) 392 393 for line in ("".join(text) + term.normal).splitlines(): 394 yield line 395 396 397def footer_message(term: Terminal, message: str, width: Optional[int] = None) -> None: 398 if width is None: 399 width = term.width 400 print(term.center(message[:width]) + term.normal, end="") 401 402 403def footer_help(term: Terminal, width: Optional[int] = None) -> None: 404 """Footer line with help keys.""" 405 query_modes_help = [ 406 ("/".join(keys[:-1]), qm.value) for qm, keys in KEYS_BY_QUERYMODE.items() 407 ] 408 assert PAUSE_KEY.name is not None 409 footer_values = query_modes_help + [ 410 (PAUSE_KEY.name, PAUSE_KEY.description), 411 (EXIT_KEY.value, EXIT_KEY.description), 412 (HELP_KEY, "help"), 413 ] 414 render_footer(term, footer_values, width) 415 416 417def render_footer( 418 term: Terminal, footer_values: List[Tuple[str, str]], width: Optional[int] 419) -> None: 420 if width is None: 421 width = term.width 422 ncols = len(footer_values) 423 column_width = (width - ncols - 1) // ncols 424 425 def render_column(key: str, desc: str) -> str: 426 col_width = column_width - term.length(key) - 1 427 if col_width <= 0: 428 return "" 429 desc = term.ljust(desc[:col_width], width=col_width, fillchar=" ") 430 return f"{key} {term.cyan_reverse(desc)}" 431 432 row = " ".join( 433 [render_column(key, desc.capitalize()) for key, desc in footer_values] 434 ) 435 assert term.length(row) <= width, (term.length(row), width, ncols) 436 print(term.ljust(row, width=width, fillchar=term.cyan_reverse(" ")), end="") 437 438 439def footer_interative_help(term: Terminal, width: Optional[int] = None) -> None: 440 """Footer line with help keys for interactive mode.""" 441 assert PROCESS_PIN.name is not None 442 footer_values = [ 443 (PROCESS_CANCEL, "cancel current query"), 444 (PROCESS_KILL, "terminate current query"), 445 (PROCESS_PIN.name, PROCESS_PIN.description), 446 ("Other", "back to activities"), 447 (EXIT_KEY.value, EXIT_KEY.description), 448 ] 449 return render_footer(term, footer_values, width) 450 451 452def screen( 453 term: Terminal, 454 ui: UI, 455 *, 456 host: Host, 457 dbinfo: DBInfo, 458 pg_version: str, 459 tps: int, 460 active_connections: int, 461 activity_stats: ActivityStats, 462 message: Optional[str], 463 render_header: bool = True, 464 render_footer: bool = True, 465 width: Optional[int] = None, 466) -> None: 467 """Display the screen.""" 468 469 system_info: Optional[SystemInfo] 470 if isinstance(activity_stats, tuple): 471 processes, system_info = activity_stats 472 else: 473 processes, system_info = activity_stats, None 474 processes.set_items(sorted_processes(processes, key=ui.sort_key, reverse=True)) 475 476 print(term.home, end="") 477 top_height = term.height - (1 if render_footer else 0) 478 lines_counter = line_counter(top_height) 479 480 if render_header: 481 header( 482 term, 483 ui, 484 host=host, 485 dbinfo=dbinfo, 486 pg_version=pg_version, 487 tps=tps, 488 active_connections=active_connections, 489 system_info=system_info, 490 lines_counter=lines_counter, 491 width=width, 492 ) 493 494 query_mode(term, ui, lines_counter=lines_counter, width=width) 495 columns_header(term, ui, lines_counter=lines_counter, width=width) 496 processes_rows( 497 term, 498 ui, 499 processes, 500 lines_counter=lines_counter, 501 width=width, 502 ) 503 504 # Clear remaining lines in screen until footer (or EOS) 505 print(f"{term.clear_eol}\n" * lines_counter.value, end="") 506 507 if render_footer: 508 with term.location(x=0, y=top_height): 509 if message is not None: 510 footer_message(term, message, width) 511 elif ui.interactive(): 512 footer_interative_help(term, width) 513 else: 514 footer_help(term, width) 515