1#!/usr/local/bin/python3.8 2# pylint: disable=not-callable, no-member, unsubscriptable-object 3# indent = tab 4# tab-size = 4 5 6# Copyright 2021 Aristocratos (jakob@qvantnet.com) 7 8# Licensed under the Apache License, Version 2.0 (the "License"); 9# you may not use this file except in compliance with the License. 10# You may obtain a copy of the License at 11 12# http://www.apache.org/licenses/LICENSE-2.0 13 14# Unless required by applicable law or agreed to in writing, software 15# distributed under the License is distributed on an "AS IS" BASIS, 16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17# See the License for the specific language governing permissions and 18# limitations under the License. 19 20import os, sys, io, threading, signal, re, subprocess, logging, logging.handlers, argparse 21import urllib.request 22from time import time, sleep, strftime, tzset 23from datetime import timedelta 24from _thread import interrupt_main 25from collections import defaultdict 26from select import select 27from distutils.util import strtobool 28from string import Template 29from math import ceil, floor 30from random import randint 31from shutil import which 32from typing import List, Dict, Tuple, Union, Any, Iterable 33 34errors: List[str] = [] 35try: import fcntl, termios, tty, pwd 36except Exception as e: errors.append(f'{e}') 37 38try: import psutil # type: ignore 39except Exception as e: errors.append(f'{e}') 40 41SELF_START = time() 42 43SYSTEM: str 44if "linux" in sys.platform: SYSTEM = "Linux" 45elif "bsd" in sys.platform: SYSTEM = "BSD" 46elif "darwin" in sys.platform: SYSTEM = "MacOS" 47else: SYSTEM = "Other" 48 49if errors: 50 print("ERROR!") 51 print("\n".join(errors)) 52 if SYSTEM == "Other": 53 print("\nUnsupported platform!\n") 54 else: 55 print("\nInstall required modules!\n") 56 raise SystemExit(1) 57 58VERSION: str = "1.0.67" 59 60#? Argument parser -------------------------------------------------------------------------------> 61args = argparse.ArgumentParser() 62args.add_argument("-b", "--boxes", action="store", dest="boxes", help = "which boxes to show at start, example: -b \"cpu mem net proc\"") 63args.add_argument("-lc", "--low-color", action="store_true", help = "disable truecolor, converts 24-bit colors to 256-color") 64args.add_argument("-v", "--version", action="store_true", help = "show version info and exit") 65args.add_argument("--debug", action="store_true", help = "start with loglevel set to DEBUG overriding value set in config") 66stdargs = args.parse_args() 67 68if stdargs.version: 69 print(f'bpytop version: {VERSION}\n' 70 f'psutil version: {".".join(str(x) for x in psutil.version_info)}') 71 raise SystemExit(0) 72 73ARG_BOXES: str = stdargs.boxes 74LOW_COLOR: bool = stdargs.low_color 75DEBUG: bool = stdargs.debug 76 77#? Variables -------------------------------------------------------------------------------------> 78 79BANNER_SRC: List[Tuple[str, str, str]] = [ 80 ("#ffa50a", "#0fd7ff", "██████╗ ██████╗ ██╗ ██╗████████╗ ██████╗ ██████╗"), 81 ("#f09800", "#00bfe6", "██╔══██╗██╔══██╗╚██╗ ██╔╝╚══██╔══╝██╔═══██╗██╔══██╗"), 82 ("#db8b00", "#00a6c7", "██████╔╝██████╔╝ ╚████╔╝ ██║ ██║ ██║██████╔╝"), 83 ("#c27b00", "#008ca8", "██╔══██╗██╔═══╝ ╚██╔╝ ██║ ██║ ██║██╔═══╝ "), 84 ("#a86b00", "#006e85", "██████╔╝██║ ██║ ██║ ╚██████╔╝██║"), 85 ("#000000", "#000000", "╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝"), 86] 87 88#*?This is the template used to create the config file 89DEFAULT_CONF: Template = Template(f'#? Config file for bpytop v. {VERSION}' + ''' 90 91#* Color theme, looks for a .theme file in "/usr/[local/]share/bpytop/themes" and "~/.config/bpytop/themes", "Default" for builtin default theme. 92#* Prefix name by a plus sign (+) for a theme located in user themes folder, i.e. color_theme="+monokai" 93color_theme="$color_theme" 94 95#* If the theme set background should be shown, set to False if you want terminal background transparency 96theme_background=$theme_background 97 98#* Sets if 24-bit truecolor should be used, will convert 24-bit colors to 256 color (6x6x6 color cube) if false. 99truecolor=$truecolor 100 101#* Manually set which boxes to show. Available values are "cpu mem net proc", separate values with whitespace. 102shown_boxes="$shown_boxes" 103 104#* Update time in milliseconds, increases automatically if set below internal loops processing time, recommended 2000 ms or above for better sample times for graphs. 105update_ms=$update_ms 106 107#* Processes update multiplier, sets how often the process list is updated as a multiplier of "update_ms". 108#* Set to 2 or higher to greatly decrease bpytop cpu usage. (Only integers) 109proc_update_mult=$proc_update_mult 110 111#* Processes sorting, "pid" "program" "arguments" "threads" "user" "memory" "cpu lazy" "cpu responsive", 112#* "cpu lazy" updates top process over time, "cpu responsive" updates top process directly. 113proc_sorting="$proc_sorting" 114 115#* Reverse sorting order, True or False. 116proc_reversed=$proc_reversed 117 118#* Show processes as a tree 119proc_tree=$proc_tree 120 121#* Which depth the tree view should auto collapse processes at 122tree_depth=$tree_depth 123 124#* Use the cpu graph colors in the process list. 125proc_colors=$proc_colors 126 127#* Use a darkening gradient in the process list. 128proc_gradient=$proc_gradient 129 130#* If process cpu usage should be of the core it's running on or usage of the total available cpu power. 131proc_per_core=$proc_per_core 132 133#* Show process memory as bytes instead of percent 134proc_mem_bytes=$proc_mem_bytes 135 136#* Sets the CPU stat shown in upper half of the CPU graph, "total" is always available, see: 137#* https://psutil.readthedocs.io/en/latest/#psutil.cpu_times for attributes available on specific platforms. 138#* Select from a list of detected attributes from the options menu 139cpu_graph_upper="$cpu_graph_upper" 140 141#* Sets the CPU stat shown in lower half of the CPU graph, "total" is always available, see: 142#* https://psutil.readthedocs.io/en/latest/#psutil.cpu_times for attributes available on specific platforms. 143#* Select from a list of detected attributes from the options menu 144cpu_graph_lower="$cpu_graph_lower" 145 146#* Toggles if the lower CPU graph should be inverted. 147cpu_invert_lower=$cpu_invert_lower 148 149#* Set to True to completely disable the lower CPU graph. 150cpu_single_graph=$cpu_single_graph 151 152#* Shows the system uptime in the CPU box. 153show_uptime=$show_uptime 154 155#* Check cpu temperature, needs "osx-cpu-temp" on MacOS X. 156check_temp=$check_temp 157 158#* Which sensor to use for cpu temperature, use options menu to select from list of available sensors. 159cpu_sensor=$cpu_sensor 160 161#* Show temperatures for cpu cores also if check_temp is True and sensors has been found 162show_coretemp=$show_coretemp 163 164#* Which temperature scale to use, available values: "celsius", "fahrenheit", "kelvin" and "rankine" 165temp_scale="$temp_scale" 166 167#* Show CPU frequency, can cause slowdowns on certain systems with some versions of psutil 168show_cpu_freq=$show_cpu_freq 169 170#* Draw a clock at top of screen, formatting according to strftime, empty string to disable. 171draw_clock="$draw_clock" 172 173#* Update main ui in background when menus are showing, set this to false if the menus is flickering too much for comfort. 174background_update=$background_update 175 176#* Custom cpu model name, empty string to disable. 177custom_cpu_name="$custom_cpu_name" 178 179#* Optional filter for shown disks, should be full path of a mountpoint, separate multiple values with a comma ",". 180#* Begin line with "exclude=" to change to exclude filter, otherwise defaults to "most include" filter. Example: disks_filter="exclude=/boot, /home/user" 181disks_filter="$disks_filter" 182 183#* Show graphs instead of meters for memory values. 184mem_graphs=$mem_graphs 185 186#* If swap memory should be shown in memory box. 187show_swap=$show_swap 188 189#* Show swap as a disk, ignores show_swap value above, inserts itself after first disk. 190swap_disk=$swap_disk 191 192#* If mem box should be split to also show disks info. 193show_disks=$show_disks 194 195#* Filter out non physical disks. Set this to False to include network disks, RAM disks and similar. 196only_physical=$only_physical 197 198#* Read disks list from /etc/fstab. This also disables only_physical. 199use_fstab=$use_fstab 200 201#* Toggles if io stats should be shown in regular disk usage view 202show_io_stat=$show_io_stat 203 204#* Toggles io mode for disks, showing only big graphs for disk read/write speeds. 205io_mode=$io_mode 206 207#* Set to True to show combined read/write io graphs in io mode. 208io_graph_combined=$io_graph_combined 209 210#* Set the top speed for the io graphs in MiB/s (10 by default), use format "device:speed" separate disks with a comma ",". 211#* Example: "/dev/sda:100, /dev/sdb:20" 212io_graph_speeds="$io_graph_speeds" 213 214#* Set fixed values for network graphs, default "10M" = 10 Mibibytes, possible units "K", "M", "G", append with "bit" for bits instead of bytes, i.e "100mbit" 215net_download="$net_download" 216net_upload="$net_upload" 217 218#* Start in network graphs auto rescaling mode, ignores any values set above and rescales down to 10 Kibibytes at the lowest. 219net_auto=$net_auto 220 221#* Sync the scaling for download and upload to whichever currently has the highest scale 222net_sync=$net_sync 223 224#* If the network graphs color gradient should scale to bandwidth usage or auto scale, bandwidth usage is based on "net_download" and "net_upload" values 225net_color_fixed=$net_color_fixed 226 227#* Starts with the Network Interface specified here. 228net_iface="$net_iface" 229 230#* Show battery stats in top right if battery is present 231show_battery=$show_battery 232 233#* Show init screen at startup, the init screen is purely cosmetical 234show_init=$show_init 235 236#* Enable check for new version from github.com/aristocratos/bpytop at start. 237update_check=$update_check 238 239#* Set loglevel for "~/.config/bpytop/error.log" levels are: "ERROR" "WARNING" "INFO" "DEBUG". 240#* The level set includes all lower levels, i.e. "DEBUG" will show all logging info. 241log_level=$log_level 242''') 243 244CONFIG_DIR: str = f'{os.path.expanduser("~")}/.config/bpytop' 245if not os.path.isdir(CONFIG_DIR): 246 try: 247 os.makedirs(CONFIG_DIR) 248 os.mkdir(f'{CONFIG_DIR}/themes') 249 except PermissionError: 250 print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!') 251 raise SystemExit(1) 252CONFIG_FILE: str = f'{CONFIG_DIR}/bpytop.conf' 253THEME_DIR: str = "" 254 255if os.path.isdir(f'{os.path.dirname(__file__)}/bpytop-themes'): 256 THEME_DIR = f'{os.path.dirname(__file__)}/bpytop-themes' 257elif os.path.isdir(f'{os.path.dirname(__file__)}/themes'): 258 THEME_DIR = f'{os.path.dirname(__file__)}/themes' 259else: 260 for td in ["/usr/local/", "/usr/", "/snap/bpytop/current/usr/"]: 261 if os.path.isdir(f'{td}share/bpytop/themes'): 262 THEME_DIR = f'{td}share/bpytop/themes' 263 break 264USER_THEME_DIR: str = f'{CONFIG_DIR}/themes' 265 266CORES: int = psutil.cpu_count(logical=False) or 1 267THREADS: int = psutil.cpu_count(logical=True) or 1 268 269THREAD_ERROR: int = 0 270 271DEFAULT_THEME: Dict[str, str] = { 272 "main_bg" : "#00", 273 "main_fg" : "#cc", 274 "title" : "#ee", 275 "hi_fg" : "#969696", 276 "selected_bg" : "#7e2626", 277 "selected_fg" : "#ee", 278 "inactive_fg" : "#40", 279 "graph_text" : "#60", 280 "meter_bg" : "#40", 281 "proc_misc" : "#0de756", 282 "cpu_box" : "#3d7b46", 283 "mem_box" : "#8a882e", 284 "net_box" : "#423ba5", 285 "proc_box" : "#923535", 286 "div_line" : "#30", 287 "temp_start" : "#4897d4", 288 "temp_mid" : "#5474e8", 289 "temp_end" : "#ff40b6", 290 "cpu_start" : "#50f095", 291 "cpu_mid" : "#f2e266", 292 "cpu_end" : "#fa1e1e", 293 "free_start" : "#223014", 294 "free_mid" : "#b5e685", 295 "free_end" : "#dcff85", 296 "cached_start" : "#0b1a29", 297 "cached_mid" : "#74e6fc", 298 "cached_end" : "#26c5ff", 299 "available_start" : "#292107", 300 "available_mid" : "#ffd77a", 301 "available_end" : "#ffb814", 302 "used_start" : "#3b1f1c", 303 "used_mid" : "#d9626d", 304 "used_end" : "#ff4769", 305 "download_start" : "#231a63", 306 "download_mid" : "#4f43a3", 307 "download_end" : "#b0a9de", 308 "upload_start" : "#510554", 309 "upload_mid" : "#7d4180", 310 "upload_end" : "#dcafde", 311 "process_start" : "#80d0a3", 312 "process_mid" : "#dcd179", 313 "process_end" : "#d45454", 314} 315 316MENUS: Dict[str, Dict[str, Tuple[str, ...]]] = { 317 "options" : { 318 "normal" : ( 319 "┌─┐┌─┐┌┬┐┬┌─┐┌┐┌┌─┐", 320 "│ │├─┘ │ ││ ││││└─┐", 321 "└─┘┴ ┴ ┴└─┘┘└┘└─┘"), 322 "selected" : ( 323 "╔═╗╔═╗╔╦╗╦╔═╗╔╗╔╔═╗", 324 "║ ║╠═╝ ║ ║║ ║║║║╚═╗", 325 "╚═╝╩ ╩ ╩╚═╝╝╚╝╚═╝") }, 326 "help" : { 327 "normal" : ( 328 "┬ ┬┌─┐┬ ┌─┐", 329 "├─┤├┤ │ ├─┘", 330 "┴ ┴└─┘┴─┘┴ "), 331 "selected" : ( 332 "╦ ╦╔═╗╦ ╔═╗", 333 "╠═╣║╣ ║ ╠═╝", 334 "╩ ╩╚═╝╩═╝╩ ") }, 335 "quit" : { 336 "normal" : ( 337 "┌─┐ ┬ ┬ ┬┌┬┐", 338 "│─┼┐│ │ │ │ ", 339 "└─┘└└─┘ ┴ ┴ "), 340 "selected" : ( 341 "╔═╗ ╦ ╦ ╦╔╦╗ ", 342 "║═╬╗║ ║ ║ ║ ", 343 "╚═╝╚╚═╝ ╩ ╩ ") } 344} 345 346MENU_COLORS: Dict[str, Tuple[str, ...]] = { 347 "normal" : ("#0fd7ff", "#00bfe6", "#00a6c7", "#008ca8"), 348 "selected" : ("#ffa50a", "#f09800", "#db8b00", "#c27b00") 349} 350 351#? Units for floating_humanizer function 352UNITS: Dict[str, Tuple[str, ...]] = { 353 "bit" : ("bit", "Kib", "Mib", "Gib", "Tib", "Pib", "Eib", "Zib", "Yib", "Bib", "GEb"), 354 "byte" : ("Byte", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", "BiB", "GEB") 355} 356 357SUBSCRIPT: Tuple[str, ...] = ("₀", "₁", "₂", "₃", "₄", "₅", "₆", "₇", "₈", "₉") 358SUPERSCRIPT: Tuple[str, ...] = ("⁰", "¹", "²", "³", "⁴", "⁵", "⁶", "⁷", "⁸", "⁹") 359 360#? Setup error logger ----------------------------------------------------------------> 361 362try: 363 errlog = logging.getLogger("ErrorLogger") 364 errlog.setLevel(logging.DEBUG) 365 eh = logging.handlers.RotatingFileHandler(f'{CONFIG_DIR}/error.log', maxBytes=1048576, backupCount=4) 366 eh.setLevel(logging.DEBUG) 367 eh.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s: %(message)s", datefmt="%d/%m/%y (%X)")) 368 errlog.addHandler(eh) 369except PermissionError: 370 print(f'ERROR!\nNo permission to write to "{CONFIG_DIR}" directory!') 371 raise SystemExit(1) 372 373#? Timers for testing and debugging --------------------------------------------------------------> 374 375class TimeIt: 376 timers: Dict[str, float] = {} 377 paused: Dict[str, float] = {} 378 379 @classmethod 380 def start(cls, name): 381 cls.timers[name] = time() 382 383 @classmethod 384 def pause(cls, name): 385 if name in cls.timers: 386 cls.paused[name] = time() - cls.timers[name] 387 del cls.timers[name] 388 389 @classmethod 390 def stop(cls, name): 391 if name in cls.timers: 392 total: float = time() - cls.timers[name] 393 del cls.timers[name] 394 if name in cls.paused: 395 total += cls.paused[name] 396 del cls.paused[name] 397 errlog.debug(f'{name} completed in {total:.6f} seconds') 398 399def timeit_decorator(func): 400 def timed(*args, **kw): 401 ts = time() 402 out = func(*args, **kw) 403 errlog.debug(f'{func.__name__} completed in {time() - ts:.6f} seconds') 404 return out 405 return timed 406 407#? Set up config class and load config -----------------------------------------------------------> 408 409class Config: 410 '''Holds all config variables and functions for loading from and saving to disk''' 411 keys: List[str] = ["color_theme", "update_ms", "proc_sorting", "proc_reversed", "proc_tree", "check_temp", "draw_clock", "background_update", "custom_cpu_name", 412 "proc_colors", "proc_gradient", "proc_per_core", "proc_mem_bytes", "disks_filter", "update_check", "log_level", "mem_graphs", "show_swap", 413 "swap_disk", "show_disks", "use_fstab", "net_download", "net_upload", "net_auto", "net_color_fixed", "show_init", "theme_background", 414 "net_sync", "show_battery", "tree_depth", "cpu_sensor", "show_coretemp", "proc_update_mult", "shown_boxes", "net_iface", "only_physical", 415 "truecolor", "io_mode", "io_graph_combined", "io_graph_speeds", "show_io_stat", "cpu_graph_upper", "cpu_graph_lower", "cpu_invert_lower", 416 "cpu_single_graph", "show_uptime", "temp_scale", "show_cpu_freq"] 417 conf_dict: Dict[str, Union[str, int, bool]] = {} 418 color_theme: str = "Default" 419 theme_background: bool = True 420 truecolor: bool = True 421 shown_boxes: str = "cpu mem net proc" 422 update_ms: int = 2000 423 proc_update_mult: int = 2 424 proc_sorting: str = "cpu lazy" 425 proc_reversed: bool = False 426 proc_tree: bool = False 427 tree_depth: int = 3 428 proc_colors: bool = True 429 proc_gradient: bool = True 430 proc_per_core: bool = False 431 proc_mem_bytes: bool = True 432 cpu_graph_upper: str = "total" 433 cpu_graph_lower: str = "total" 434 cpu_invert_lower: bool = True 435 cpu_single_graph: bool = False 436 show_uptime: bool = True 437 check_temp: bool = True 438 cpu_sensor: str = "Auto" 439 show_coretemp: bool = True 440 temp_scale: str = "celsius" 441 show_cpu_freq: bool = True 442 draw_clock: str = "%X" 443 background_update: bool = True 444 custom_cpu_name: str = "" 445 disks_filter: str = "" 446 update_check: bool = True 447 mem_graphs: bool = True 448 show_swap: bool = True 449 swap_disk: bool = True 450 show_disks: bool = True 451 only_physical: bool = True 452 use_fstab: bool = False 453 show_io_stat: bool = True 454 io_mode: bool = False 455 io_graph_combined: bool = False 456 io_graph_speeds: str = "" 457 net_download: str = "10M" 458 net_upload: str = "10M" 459 net_color_fixed: bool = False 460 net_auto: bool = True 461 net_sync: bool = False 462 net_iface: str = "" 463 show_battery: bool = True 464 show_init: bool = False 465 log_level: str = "WARNING" 466 467 warnings: List[str] = [] 468 info: List[str] = [] 469 470 sorting_options: List[str] = ["pid", "program", "arguments", "threads", "user", "memory", "cpu lazy", "cpu responsive"] 471 log_levels: List[str] = ["ERROR", "WARNING", "INFO", "DEBUG"] 472 cpu_percent_fields: List = ["total"] 473 cpu_percent_fields.extend(getattr(psutil.cpu_times_percent(), "_fields", [])) 474 temp_scales: List[str] = ["celsius", "fahrenheit", "kelvin", "rankine"] 475 476 cpu_sensors: List[str] = [ "Auto" ] 477 478 if hasattr(psutil, "sensors_temperatures"): 479 try: 480 _temps = psutil.sensors_temperatures() 481 if _temps: 482 for _name, _entries in _temps.items(): 483 for _num, _entry in enumerate(_entries, 1): 484 if hasattr(_entry, "current"): 485 cpu_sensors.append(f'{_name}:{_num if _entry.label == "" else _entry.label}') 486 except: 487 pass 488 489 changed: bool = False 490 recreate: bool = False 491 config_file: str = "" 492 493 _initialized: bool = False 494 495 def __init__(self, path: str): 496 self.config_file = path 497 conf: Dict[str, Union[str, int, bool]] = self.load_config() 498 if not "version" in conf.keys(): 499 self.recreate = True 500 self.info.append(f'Config file malformatted or missing, will be recreated on exit!') 501 elif conf["version"] != VERSION: 502 self.recreate = True 503 self.info.append(f'Config file version and bpytop version missmatch, will be recreated on exit!') 504 for key in self.keys: 505 if key in conf.keys() and conf[key] != "_error_": 506 setattr(self, key, conf[key]) 507 else: 508 self.recreate = True 509 self.conf_dict[key] = getattr(self, key) 510 self._initialized = True 511 512 def __setattr__(self, name, value): 513 if self._initialized: 514 object.__setattr__(self, "changed", True) 515 object.__setattr__(self, name, value) 516 if name not in ["_initialized", "recreate", "changed"]: 517 self.conf_dict[name] = value 518 519 def load_config(self) -> Dict[str, Union[str, int, bool]]: 520 '''Load config from file, set correct types for values and return a dict''' 521 new_config: Dict[str,Union[str, int, bool]] = {} 522 conf_file: str = "" 523 if os.path.isfile(self.config_file): 524 conf_file = self.config_file 525 elif SYSTEM == "BSD" and os.path.isfile("/usr/local/etc/bpytop.conf"): 526 conf_file = "/usr/local/etc/bpytop.conf" 527 elif SYSTEM != "BSD" and os.path.isfile("/etc/bpytop.conf"): 528 conf_file = "/etc/bpytop.conf" 529 else: 530 return new_config 531 try: 532 with open(conf_file, "r") as f: 533 for line in f: 534 line = line.strip() 535 if line.startswith("#? Config"): 536 new_config["version"] = line[line.find("v. ") + 3:] 537 continue 538 if not '=' in line: 539 continue 540 key, line = line.split('=', maxsplit=1) 541 if not key in self.keys: 542 continue 543 line = line.strip('"') 544 if type(getattr(self, key)) == int: 545 try: 546 new_config[key] = int(line) 547 except ValueError: 548 self.warnings.append(f'Config key "{key}" should be an integer!') 549 if type(getattr(self, key)) == bool: 550 try: 551 new_config[key] = bool(strtobool(line)) 552 except ValueError: 553 self.warnings.append(f'Config key "{key}" can only be True or False!') 554 if type(getattr(self, key)) == str: 555 new_config[key] = str(line) 556 except Exception as e: 557 errlog.exception(str(e)) 558 if "proc_sorting" in new_config and not new_config["proc_sorting"] in self.sorting_options: 559 new_config["proc_sorting"] = "_error_" 560 self.warnings.append(f'Config key "proc_sorted" didn\'t get an acceptable value!') 561 if "log_level" in new_config and not new_config["log_level"] in self.log_levels: 562 new_config["log_level"] = "_error_" 563 self.warnings.append(f'Config key "log_level" didn\'t get an acceptable value!') 564 if "update_ms" in new_config and int(new_config["update_ms"]) < 100: 565 new_config["update_ms"] = 100 566 self.warnings.append(f'Config key "update_ms" can\'t be lower than 100!') 567 for net_name in ["net_download", "net_upload"]: 568 if net_name in new_config and not new_config[net_name][0].isdigit(): # type: ignore 569 new_config[net_name] = "_error_" 570 if "cpu_sensor" in new_config and not new_config["cpu_sensor"] in self.cpu_sensors: 571 new_config["cpu_sensor"] = "_error_" 572 self.warnings.append(f'Config key "cpu_sensor" does not contain an available sensor!') 573 if "shown_boxes" in new_config and not new_config["shown_boxes"] == "": 574 for box in new_config["shown_boxes"].split(): #type: ignore 575 if not box in ["cpu", "mem", "net", "proc"]: 576 new_config["shown_boxes"] = "_error_" 577 self.warnings.append(f'Config key "shown_boxes" contains invalid box names!') 578 break 579 for cpu_graph in ["cpu_graph_upper", "cpu_graph_lower"]: 580 if cpu_graph in new_config and not new_config[cpu_graph] in self.cpu_percent_fields: 581 new_config[cpu_graph] = "_error_" 582 self.warnings.append(f'Config key "{cpu_graph}" does not contain an available cpu stat attribute!') 583 if "temp_scale" in new_config and not new_config["temp_scale"] in self.temp_scales: 584 new_config["temp_scale"] = "_error_" 585 self.warnings.append(f'Config key "temp_scale" does not contain a recognized temperature scale!') 586 return new_config 587 588 def save_config(self): 589 '''Save current config to config file if difference in values or version, creates a new file if not found''' 590 if not self.changed and not self.recreate: return 591 try: 592 with open(self.config_file, "w" if os.path.isfile(self.config_file) else "x") as f: 593 f.write(DEFAULT_CONF.substitute(self.conf_dict)) 594 except Exception as e: 595 errlog.exception(str(e)) 596 597try: 598 CONFIG: Config = Config(CONFIG_FILE) 599 if DEBUG: 600 errlog.setLevel(logging.DEBUG) 601 else: 602 errlog.setLevel(getattr(logging, CONFIG.log_level)) 603 DEBUG = CONFIG.log_level == "DEBUG" 604 errlog.info(f'New instance of bpytop version {VERSION} started with pid {os.getpid()}') 605 errlog.info(f'Loglevel set to {"DEBUG" if DEBUG else CONFIG.log_level}') 606 errlog.debug(f'Using psutil version {".".join(str(x) for x in psutil.version_info)}') 607 errlog.debug(f'CMD: {" ".join(sys.argv)}') 608 if CONFIG.info: 609 for info in CONFIG.info: 610 errlog.info(info) 611 CONFIG.info = [] 612 if CONFIG.warnings: 613 for warning in CONFIG.warnings: 614 errlog.warning(warning) 615 CONFIG.warnings = [] 616except Exception as e: 617 errlog.exception(f'{e}') 618 raise SystemExit(1) 619 620if ARG_BOXES: 621 _new_boxes: List = [] 622 for _box in ARG_BOXES.split(): 623 if _box in ["cpu", "mem", "net", "proc"]: 624 _new_boxes.append(_box) 625 CONFIG.shown_boxes = " ".join(_new_boxes) 626 del _box, _new_boxes 627 628if SYSTEM == "Linux" and not os.path.isdir("/sys/class/power_supply"): 629 CONFIG.show_battery = False 630 631if psutil.version_info[0] < 5 or (psutil.version_info[0] == 5 and psutil.version_info[1] < 7): 632 warn = f'psutil version {".".join(str(x) for x in psutil.version_info)} detected, version 5.7.0 or later required for full functionality!' 633 print("WARNING!", warn) 634 errlog.warning(warn) 635 636 637#? Classes ---------------------------------------------------------------------------------------> 638 639class Term: 640 """Terminal info and commands""" 641 width: int = 0 642 height: int = 0 643 resized: bool = False 644 _w : int = 0 645 _h : int = 0 646 fg: str = "" #* Default foreground color 647 bg: str = "" #* Default background color 648 hide_cursor = "\033[?25l" #* Hide terminal cursor 649 show_cursor = "\033[?25h" #* Show terminal cursor 650 alt_screen = "\033[?1049h" #* Switch to alternate screen 651 normal_screen = "\033[?1049l" #* Switch to normal screen 652 clear = "\033[2J\033[0;0f" #* Clear screen and set cursor to position 0,0 653 mouse_on = "\033[?1002h\033[?1015h\033[?1006h" #* Enable reporting of mouse position on click and release 654 mouse_off = "\033[?1002l" #* Disable mouse reporting 655 mouse_direct_on = "\033[?1003h" #* Enable reporting of mouse position at any movement 656 mouse_direct_off = "\033[?1003l" #* Disable direct mouse reporting 657 winch = threading.Event() 658 old_boxes: List = [] 659 min_width: int = 0 660 min_height: int = 0 661 662 @classmethod 663 def refresh(cls, *args, force: bool = False): 664 """Update width, height and set resized flag if terminal has been resized""" 665 if Init.running: cls.resized = False; return 666 if cls.resized: cls.winch.set(); return 667 cls._w, cls._h = os.get_terminal_size() 668 if (cls._w, cls._h) == (cls.width, cls.height) and cls.old_boxes == Box.boxes and not force: return 669 if force: Collector.collect_interrupt = True 670 if cls.old_boxes != Box.boxes: 671 w_p = h_p = 0 672 cls.min_width = cls.min_height = 0 673 cls.old_boxes = Box.boxes.copy() 674 for box_class in Box.__subclasses__(): 675 for box_name in Box.boxes: 676 if box_name in str(box_class).capitalize(): 677 if not (box_name == "cpu" and "proc" in Box.boxes) and not (box_name == "net" and "mem" in Box.boxes) and w_p + box_class.width_p <= 100: 678 w_p += box_class.width_p 679 cls.min_width += getattr(box_class, "min_w", 0) 680 if not (box_name in ["mem", "net"] and "proc" in Box.boxes) and h_p + box_class.height_p <= 100: 681 h_p += box_class.height_p 682 cls.min_height += getattr(box_class, "min_h", 0) 683 while (cls._w, cls._h) != (cls.width, cls.height) or (cls._w < cls.min_width or cls._h < cls.min_height): 684 if Init.running: Init.resized = True 685 CpuBox.clock_block = True 686 cls.resized = True 687 Collector.collect_interrupt = True 688 cls.width, cls.height = cls._w, cls._h 689 Draw.now(Term.clear) 690 box_width = min(50, cls._w - 2) 691 Draw.now(f'{create_box(cls._w // 2 - box_width // 2, cls._h // 2 - 2, 50, 3, "resizing", line_color=Colors.green, title_color=Colors.white)}', 692 f'{Mv.r(box_width // 4)}{Colors.default}{Colors.black_bg}{Fx.b}Width : {cls._w} Height: {cls._h}{Fx.ub}{Term.bg}{Term.fg}') 693 if cls._w < 80 or cls._h < 24: 694 while cls._w < cls.min_width or cls._h < cls.min_height: 695 Draw.now(Term.clear) 696 box_width = min(50, cls._w - 2) 697 Draw.now(f'{create_box(cls._w // 2 - box_width // 2, cls._h // 2 - 2, box_width, 4, "warning", line_color=Colors.red, title_color=Colors.white)}', 698 f'{Mv.r(box_width // 4)}{Colors.default}{Colors.black_bg}{Fx.b}Width: {Colors.red if cls._w < cls.min_width else Colors.green}{cls._w} ', 699 f'{Colors.default}Height: {Colors.red if cls._h < cls.min_height else Colors.green}{cls._h}{Term.bg}{Term.fg}', 700 f'{Mv.d(1)}{Mv.l(25)}{Colors.default}{Colors.black_bg}Current config need: {cls.min_width} x {cls.min_height}{Fx.ub}{Term.bg}{Term.fg}') 701 cls.winch.wait(0.3) 702 while Key.has_key(): 703 if Key.last() == "q": clean_quit() 704 cls.winch.clear() 705 cls._w, cls._h = os.get_terminal_size() 706 else: 707 cls.winch.wait(0.3) 708 cls.winch.clear() 709 cls._w, cls._h = os.get_terminal_size() 710 711 Key.mouse = {} 712 Box.calc_sizes() 713 Collector.proc_counter = 1 714 if Menu.active: Menu.resized = True 715 Box.draw_bg(now=False) 716 cls.resized = False 717 Timer.finish() 718 719 @staticmethod 720 def echo(on: bool): 721 """Toggle input echo""" 722 (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr(sys.stdin.fileno()) 723 if on: 724 lflag |= termios.ECHO # type: ignore 725 else: 726 lflag &= ~termios.ECHO # type: ignore 727 new_attr = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] 728 termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, new_attr) 729 730 @staticmethod 731 def title(text: str = "") -> str: 732 out: str = f'{os.environ.get("TERMINAL_TITLE", "")}' 733 if out and text: out += " " 734 if text: out += f'{text}' 735 return f'\033]0;{out}\a' 736 737class Fx: 738 """Text effects 739 * trans(string: str): Replace whitespace with escape move right to not overwrite background behind whitespace. 740 * uncolor(string: str) : Removes all 24-bit color and returns string .""" 741 start = "\033[" #* Escape sequence start 742 sep = ";" #* Escape sequence separator 743 end = "m" #* Escape sequence end 744 reset = rs = "\033[0m" #* Reset foreground/background color and text effects 745 bold = b = "\033[1m" #* Bold on 746 unbold = ub = "\033[22m" #* Bold off 747 dark = d = "\033[2m" #* Dark on 748 undark = ud = "\033[22m" #* Dark off 749 italic = i = "\033[3m" #* Italic on 750 unitalic = ui = "\033[23m" #* Italic off 751 underline = u = "\033[4m" #* Underline on 752 ununderline = uu = "\033[24m" #* Underline off 753 blink = bl = "\033[5m" #* Blink on 754 unblink = ubl = "\033[25m" #* Blink off 755 strike = s = "\033[9m" #* Strike / crossed-out on 756 unstrike = us = "\033[29m" #* Strike / crossed-out off 757 758 #* Precompiled regex for finding a 24-bit color escape sequence in a string 759 color_re = re.compile(r"\033\[\d+;\d?;?\d*;?\d*;?\d*m") 760 761 @staticmethod 762 def trans(string: str): 763 return string.replace(" ", "\033[1C") 764 765 @classmethod 766 def uncolor(cls, string: str) -> str: 767 return f'{cls.color_re.sub("", string)}' 768 769class Raw(object): 770 """Set raw input mode for device""" 771 def __init__(self, stream): 772 self.stream = stream 773 self.fd = self.stream.fileno() 774 def __enter__(self): 775 self.original_stty = termios.tcgetattr(self.stream) 776 tty.setcbreak(self.stream) 777 def __exit__(self, type, value, traceback): 778 termios.tcsetattr(self.stream, termios.TCSANOW, self.original_stty) 779 780class Nonblocking(object): 781 """Set nonblocking mode for device""" 782 def __init__(self, stream): 783 self.stream = stream 784 self.fd = self.stream.fileno() 785 def __enter__(self): 786 self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) 787 fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK) 788 def __exit__(self, *args): 789 fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl) 790 791class Mv: 792 """Class with collection of cursor movement functions: .t[o](line, column) | .r[ight](columns) | .l[eft](columns) | .u[p](lines) | .d[own](lines) | .save() | .restore()""" 793 @staticmethod 794 def to(line: int, col: int) -> str: 795 return f'\033[{line};{col}f' #* Move cursor to line, column 796 @staticmethod 797 def right(x: int) -> str: #* Move cursor right x columns 798 return f'\033[{x}C' 799 @staticmethod 800 def left(x: int) -> str: #* Move cursor left x columns 801 return f'\033[{x}D' 802 @staticmethod 803 def up(x: int) -> str: #* Move cursor up x lines 804 return f'\033[{x}A' 805 @staticmethod 806 def down(x: int) -> str: #* Move cursor down x lines 807 return f'\033[{x}B' 808 809 save: str = "\033[s" #* Save cursor position 810 restore: str = "\033[u" #* Restore saved cursor postion 811 t = to 812 r = right 813 l = left 814 u = up 815 d = down 816 817class Key: 818 """Handles the threaded input reader for keypresses and mouse events""" 819 list: List[str] = [] 820 mouse: Dict[str, List[List[int]]] = {} 821 mouse_pos: Tuple[int, int] = (0, 0) 822 escape: Dict[Union[str, Tuple[str, str]], str] = { 823 "\n" : "enter", 824 ("\x7f", "\x08") : "backspace", 825 ("[A", "OA") : "up", 826 ("[B", "OB") : "down", 827 ("[D", "OD") : "left", 828 ("[C", "OC") : "right", 829 "[2~" : "insert", 830 "[3~" : "delete", 831 "[H" : "home", 832 "[F" : "end", 833 "[5~" : "page_up", 834 "[6~" : "page_down", 835 "\t" : "tab", 836 "[Z" : "shift_tab", 837 "OP" : "f1", 838 "OQ" : "f2", 839 "OR" : "f3", 840 "OS" : "f4", 841 "[15" : "f5", 842 "[17" : "f6", 843 "[18" : "f7", 844 "[19" : "f8", 845 "[20" : "f9", 846 "[21" : "f10", 847 "[23" : "f11", 848 "[24" : "f12" 849 } 850 new = threading.Event() 851 idle = threading.Event() 852 mouse_move = threading.Event() 853 mouse_report: bool = False 854 idle.set() 855 stopping: bool = False 856 started: bool = False 857 reader: threading.Thread 858 @classmethod 859 def start(cls): 860 cls.stopping = False 861 cls.reader = threading.Thread(target=cls._get_key) 862 cls.reader.start() 863 cls.started = True 864 865 @classmethod 866 def stop(cls): 867 if cls.started and cls.reader.is_alive(): 868 cls.stopping = True 869 try: 870 cls.reader.join() 871 except: 872 pass 873 874 @classmethod 875 def last(cls) -> str: 876 if cls.list: return cls.list.pop() 877 else: return "" 878 879 @classmethod 880 def get(cls) -> str: 881 if cls.list: return cls.list.pop(0) 882 else: return "" 883 884 @classmethod 885 def get_mouse(cls) -> Tuple[int, int]: 886 if cls.new.is_set(): 887 cls.new.clear() 888 return cls.mouse_pos 889 890 @classmethod 891 def mouse_moved(cls) -> bool: 892 if cls.mouse_move.is_set(): 893 cls.mouse_move.clear() 894 return True 895 else: 896 return False 897 898 @classmethod 899 def has_key(cls) -> bool: 900 return bool(cls.list) 901 902 @classmethod 903 def clear(cls): 904 cls.list = [] 905 906 @classmethod 907 def input_wait(cls, sec: float = 0.0, mouse: bool = False) -> bool: 908 '''Returns True if key is detected else waits out timer and returns False''' 909 if cls.list: return True 910 if mouse: Draw.now(Term.mouse_direct_on) 911 cls.new.wait(sec if sec > 0 else 0.0) 912 if mouse: Draw.now(Term.mouse_direct_off, Term.mouse_on) 913 914 if cls.new.is_set(): 915 cls.new.clear() 916 return True 917 else: 918 return False 919 920 @classmethod 921 def break_wait(cls): 922 cls.list.append("_null") 923 cls.new.set() 924 sleep(0.01) 925 cls.new.clear() 926 927 @classmethod 928 def _get_key(cls): 929 """Get a key or escape sequence from stdin, convert to readable format and save to keys list. Meant to be run in it's own thread.""" 930 input_key: str = "" 931 clean_key: str = "" 932 try: 933 while not cls.stopping: 934 with Raw(sys.stdin): 935 if not select([sys.stdin], [], [], 0.1)[0]: #* Wait 100ms for input on stdin then restart loop to check for stop flag 936 continue 937 input_key += sys.stdin.read(1) #* Read 1 key safely with blocking on 938 if input_key == "\033": #* If first character is a escape sequence keep reading 939 cls.idle.clear() #* Report IO block in progress to prevent Draw functions from getting a IO Block error 940 Draw.idle.wait() #* Wait for Draw function to finish if busy 941 with Nonblocking(sys.stdin): #* Set non blocking to prevent read stall 942 input_key += sys.stdin.read(20) 943 if input_key.startswith("\033[<"): 944 _ = sys.stdin.read(1000) 945 cls.idle.set() #* Report IO blocking done 946 #errlog.debug(f'{repr(input_key)}') 947 if input_key == "\033": clean_key = "escape" #* Key is "escape" key if only containing \033 948 elif input_key.startswith(("\033[<0;", "\033[<35;", "\033[<64;", "\033[<65;")): #* Detected mouse event 949 try: 950 cls.mouse_pos = (int(input_key.split(";")[1]), int(input_key.split(";")[2].rstrip("mM"))) 951 except: 952 pass 953 else: 954 if input_key.startswith("\033[<35;"): #* Detected mouse move in mouse direct mode 955 cls.mouse_move.set() 956 cls.new.set() 957 elif input_key.startswith("\033[<64;"): #* Detected mouse scroll up 958 clean_key = "mouse_scroll_up" 959 elif input_key.startswith("\033[<65;"): #* Detected mouse scroll down 960 clean_key = "mouse_scroll_down" 961 elif input_key.startswith("\033[<0;") and input_key.endswith("m"): #* Detected mouse click release 962 if Menu.active: 963 clean_key = "mouse_click" 964 else: 965 for key_name, positions in cls.mouse.items(): #* Check if mouse position is clickable 966 if list(cls.mouse_pos) in positions: 967 clean_key = key_name 968 break 969 else: 970 clean_key = "mouse_click" 971 elif input_key == "\\": clean_key = "\\" #* Clean up "\" to not return escaped 972 else: 973 for code in cls.escape.keys(): #* Go trough dict of escape codes to get the cleaned key name 974 if input_key.lstrip("\033").startswith(code): 975 clean_key = cls.escape[code] 976 break 977 else: #* If not found in escape dict and length of key is 1, assume regular character 978 if len(input_key) == 1: 979 clean_key = input_key 980 if clean_key: 981 cls.list.append(clean_key) #* Store up to 10 keys in input queue for later processing 982 if len(cls.list) > 10: del cls.list[0] 983 clean_key = "" 984 cls.new.set() #* Set threading event to interrupt main thread sleep 985 input_key = "" 986 987 988 except Exception as e: 989 errlog.exception(f'Input thread failed with exception: {e}') 990 cls.idle.set() 991 cls.list.clear() 992 clean_quit(1, thread=True) 993 994class Draw: 995 '''Holds the draw buffer and manages IO blocking queue 996 * .buffer([+]name[!], *args, append=False, now=False, z=100) : Add *args to buffer 997 * - Adding "+" prefix to name sets append to True and appends to name's current string 998 * - Adding "!" suffix to name sets now to True and print name's current string 999 * .out(clear=False) : Print all strings in buffer, clear=True clear all buffers after 1000 * .now(*args) : Prints all arguments as a string 1001 * .clear(*names) : Clear named buffers, all if no argument 1002 * .last_screen() : Prints all saved buffers 1003 ''' 1004 strings: Dict[str, str] = {} 1005 z_order: Dict[str, int] = {} 1006 saved: Dict[str, str] = {} 1007 save: Dict[str, bool] = {} 1008 once: Dict[str, bool] = {} 1009 idle = threading.Event() 1010 idle.set() 1011 1012 @classmethod 1013 def now(cls, *args): 1014 '''Wait for input reader and self to be idle then print to screen''' 1015 Key.idle.wait() 1016 cls.idle.wait() 1017 cls.idle.clear() 1018 try: 1019 print(*args, sep="", end="", flush=True) 1020 except BlockingIOError: 1021 pass 1022 Key.idle.wait() 1023 print(*args, sep="", end="", flush=True) 1024 cls.idle.set() 1025 1026 @classmethod 1027 def buffer(cls, name: str, *args: str, append: bool = False, now: bool = False, z: int = 100, only_save: bool = False, no_save: bool = False, once: bool = False): 1028 string: str = "" 1029 if name.startswith("+"): 1030 name = name.lstrip("+") 1031 append = True 1032 if name.endswith("!"): 1033 name = name.rstrip("!") 1034 now = True 1035 cls.save[name] = not no_save 1036 cls.once[name] = once 1037 if not name in cls.z_order or z != 100: cls.z_order[name] = z 1038 if args: string = "".join(args) 1039 if only_save: 1040 if name not in cls.saved or not append: cls.saved[name] = "" 1041 cls.saved[name] += string 1042 else: 1043 if name not in cls.strings or not append: cls.strings[name] = "" 1044 cls.strings[name] += string 1045 if now: 1046 cls.out(name) 1047 1048 @classmethod 1049 def out(cls, *names: str, clear = False): 1050 out: str = "" 1051 if not cls.strings: return 1052 if names: 1053 for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): #type: ignore 1054 if name in names and name in cls.strings: 1055 out += cls.strings[name] 1056 if cls.save[name]: 1057 cls.saved[name] = cls.strings[name] 1058 if clear or cls.once[name]: 1059 cls.clear(name) 1060 cls.now(out) 1061 else: 1062 for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): #type: ignore 1063 if name in cls.strings: 1064 out += cls.strings[name] 1065 if cls.save[name]: 1066 cls.saved[name] = cls.strings[name] 1067 if cls.once[name] and not clear: 1068 cls.clear(name) 1069 if clear: 1070 cls.clear() 1071 cls.now(out) 1072 1073 @classmethod 1074 def saved_buffer(cls) -> str: 1075 out: str = "" 1076 for name in sorted(cls.z_order, key=cls.z_order.get, reverse=True): #type: ignore 1077 if name in cls.saved: 1078 out += cls.saved[name] 1079 return out 1080 1081 1082 @classmethod 1083 def clear(cls, *names, saved: bool = False): 1084 if names: 1085 for name in names: 1086 if name in cls.strings: 1087 del cls.strings[name] 1088 if name in cls.save: 1089 del cls.save[name] 1090 if name in cls.once: 1091 del cls.once[name] 1092 if saved: 1093 if name in cls.saved: 1094 del cls.saved[name] 1095 if name in cls.z_order: 1096 del cls.z_order[name] 1097 else: 1098 cls.strings = {} 1099 cls.save = {} 1100 cls.once = {} 1101 if saved: 1102 cls.saved = {} 1103 cls.z_order = {} 1104 1105class Color: 1106 '''Holds representations for a 24-bit color value 1107 __init__(color, depth="fg", default=False) 1108 -- color accepts 6 digit hexadecimal: string "#RRGGBB", 2 digit hexadecimal: string "#FF" or decimal RGB "255 255 255" as a string. 1109 -- depth accepts "fg" or "bg" 1110 __call__(*args) joins str arguments to a string and apply color 1111 __str__ returns escape sequence to set color 1112 __iter__ returns iteration over red, green and blue in integer values of 0-255. 1113 * Values: .hexa: str | .dec: Tuple[int, int, int] | .red: int | .green: int | .blue: int | .depth: str | .escape: str 1114 ''' 1115 hexa: str; dec: Tuple[int, int, int]; red: int; green: int; blue: int; depth: str; escape: str; default: bool 1116 1117 def __init__(self, color: str, depth: str = "fg", default: bool = False): 1118 self.depth = depth 1119 self.default = default 1120 try: 1121 if not color: 1122 self.dec = (-1, -1, -1) 1123 self.hexa = "" 1124 self.red = self.green = self.blue = -1 1125 self.escape = "\033[49m" if depth == "bg" and default else "" 1126 return 1127 1128 elif color.startswith("#"): 1129 self.hexa = color 1130 if len(self.hexa) == 3: 1131 self.hexa += self.hexa[1:3] + self.hexa[1:3] 1132 c = int(self.hexa[1:3], base=16) 1133 self.dec = (c, c, c) 1134 elif len(self.hexa) == 7: 1135 self.dec = (int(self.hexa[1:3], base=16), int(self.hexa[3:5], base=16), int(self.hexa[5:7], base=16)) 1136 else: 1137 raise ValueError(f'Incorrectly formatted hexadecimal rgb string: {self.hexa}') 1138 1139 else: 1140 c_t = tuple(map(int, color.split(" "))) 1141 if len(c_t) == 3: 1142 self.dec = c_t #type: ignore 1143 else: 1144 raise ValueError(f'RGB dec should be "0-255 0-255 0-255"') 1145 1146 if not all(0 <= c <= 255 for c in self.dec): 1147 raise ValueError(f'One or more RGB values are out of range: {color}') 1148 1149 except Exception as e: 1150 errlog.exception(str(e)) 1151 self.escape = "" 1152 return 1153 1154 if self.dec and not self.hexa: self.hexa = f'{hex(self.dec[0]).lstrip("0x").zfill(2)}{hex(self.dec[1]).lstrip("0x").zfill(2)}{hex(self.dec[2]).lstrip("0x").zfill(2)}' 1155 1156 if self.dec and self.hexa: 1157 self.red, self.green, self.blue = self.dec 1158 self.escape = f'\033[{38 if self.depth == "fg" else 48};2;{";".join(str(c) for c in self.dec)}m' 1159 1160 if not CONFIG.truecolor or LOW_COLOR: 1161 self.escape = f'{self.truecolor_to_256(rgb=self.dec, depth=self.depth)}' 1162 1163 def __str__(self) -> str: 1164 return self.escape 1165 1166 def __repr__(self) -> str: 1167 return repr(self.escape) 1168 1169 def __iter__(self) -> Iterable: 1170 for c in self.dec: yield c 1171 1172 def __call__(self, *args: str) -> str: 1173 if len(args) < 1: return "" 1174 return f'{self.escape}{"".join(args)}{getattr(Term, self.depth)}' 1175 1176 @staticmethod 1177 def truecolor_to_256(rgb: Tuple[int, int, int], depth: str="fg") -> str: 1178 out: str = "" 1179 pre: str = f'\033[{"38" if depth == "fg" else "48"};5;' 1180 1181 greyscale: Tuple[int, int, int] = ( rgb[0] // 11, rgb[1] // 11, rgb[2] // 11 ) 1182 if greyscale[0] == greyscale[1] == greyscale[2]: 1183 out = f'{pre}{232 + greyscale[0]}m' 1184 else: 1185 out = f'{pre}{round(rgb[0] / 51) * 36 + round(rgb[1] / 51) * 6 + round(rgb[2] / 51) + 16}m' 1186 1187 return out 1188 1189 @staticmethod 1190 def escape_color(hexa: str = "", r: int = 0, g: int = 0, b: int = 0, depth: str = "fg") -> str: 1191 """Returns escape sequence to set color 1192 * accepts either 6 digit hexadecimal hexa="#RRGGBB", 2 digit hexadecimal: hexa="#FF" 1193 * or decimal RGB: r=0-255, g=0-255, b=0-255 1194 * depth="fg" or "bg" 1195 """ 1196 dint: int = 38 if depth == "fg" else 48 1197 color: str = "" 1198 if hexa: 1199 try: 1200 if len(hexa) == 3: 1201 c = int(hexa[1:], base=16) 1202 if CONFIG.truecolor and not LOW_COLOR: 1203 color = f'\033[{dint};2;{c};{c};{c}m' 1204 else: 1205 color = f'{Color.truecolor_to_256(rgb=(c, c, c), depth=depth)}' 1206 elif len(hexa) == 7: 1207 if CONFIG.truecolor and not LOW_COLOR: 1208 color = f'\033[{dint};2;{int(hexa[1:3], base=16)};{int(hexa[3:5], base=16)};{int(hexa[5:7], base=16)}m' 1209 else: 1210 color = f'{Color.truecolor_to_256(rgb=(int(hexa[1:3], base=16), int(hexa[3:5], base=16), int(hexa[5:7], base=16)), depth=depth)}' 1211 except ValueError as e: 1212 errlog.exception(f'{e}') 1213 else: 1214 if CONFIG.truecolor and not LOW_COLOR: 1215 color = f'\033[{dint};2;{r};{g};{b}m' 1216 else: 1217 color = f'{Color.truecolor_to_256(rgb=(r, g, b), depth=depth)}' 1218 return color 1219 1220 @classmethod 1221 def fg(cls, *args) -> str: 1222 if len(args) > 2: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="fg") 1223 else: return cls.escape_color(hexa=args[0], depth="fg") 1224 1225 @classmethod 1226 def bg(cls, *args) -> str: 1227 if len(args) > 2: return cls.escape_color(r=args[0], g=args[1], b=args[2], depth="bg") 1228 else: return cls.escape_color(hexa=args[0], depth="bg") 1229 1230class Colors: 1231 '''Standard colors for menus and dialogs''' 1232 default = Color("#cc") 1233 white = Color("#ff") 1234 red = Color("#bf3636") 1235 green = Color("#68bf36") 1236 blue = Color("#0fd7ff") 1237 yellow = Color("#db8b00") 1238 black_bg = Color("#00", depth="bg") 1239 null = Color("") 1240 1241class Theme: 1242 '''__init__ accepts a dict containing { "color_element" : "color" }''' 1243 1244 themes: Dict[str, str] = {} 1245 cached: Dict[str, Dict[str, str]] = { "Default" : DEFAULT_THEME } 1246 current: str = "" 1247 1248 main_bg = main_fg = title = hi_fg = selected_bg = selected_fg = inactive_fg = proc_misc = cpu_box = mem_box = net_box = proc_box = div_line = temp_start = temp_mid = temp_end = cpu_start = cpu_mid = cpu_end = free_start = free_mid = free_end = cached_start = cached_mid = cached_end = available_start = available_mid = available_end = used_start = used_mid = used_end = download_start = download_mid = download_end = upload_start = upload_mid = upload_end = graph_text = meter_bg = process_start = process_mid = process_end = Colors.default 1249 1250 gradient: Dict[str, List[str]] = { 1251 "temp" : [], 1252 "cpu" : [], 1253 "free" : [], 1254 "cached" : [], 1255 "available" : [], 1256 "used" : [], 1257 "download" : [], 1258 "upload" : [], 1259 "proc" : [], 1260 "proc_color" : [], 1261 "process" : [], 1262 } 1263 def __init__(self, theme: str): 1264 self.refresh() 1265 self._load_theme(theme) 1266 1267 def __call__(self, theme: str): 1268 for k in self.gradient.keys(): self.gradient[k] = [] 1269 self._load_theme(theme) 1270 1271 def _load_theme(self, theme: str): 1272 tdict: Dict[str, str] 1273 if theme in self.cached: 1274 tdict = self.cached[theme] 1275 elif theme in self.themes: 1276 tdict = self._load_file(self.themes[theme]) 1277 self.cached[theme] = tdict 1278 else: 1279 errlog.warning(f'No theme named "{theme}" found!') 1280 theme = "Default" 1281 CONFIG.color_theme = theme 1282 tdict = DEFAULT_THEME 1283 self.current = theme 1284 #if CONFIG.color_theme != theme: CONFIG.color_theme = theme 1285 if not "graph_text" in tdict and "inactive_fg" in tdict: 1286 tdict["graph_text"] = tdict["inactive_fg"] 1287 if not "meter_bg" in tdict and "inactive_fg" in tdict: 1288 tdict["meter_bg"] = tdict["inactive_fg"] 1289 if not "process_start" in tdict and "cpu_start" in tdict: 1290 tdict["process_start"] = tdict["cpu_start"] 1291 tdict["process_mid"] = tdict.get("cpu_mid", "") 1292 tdict["process_end"] = tdict.get("cpu_end", "") 1293 1294 1295 #* Get key names from DEFAULT_THEME dict to not leave any color unset if missing from theme dict 1296 for item, value in DEFAULT_THEME.items(): 1297 default = item in ["main_fg", "main_bg"] 1298 depth = "bg" if item in ["main_bg", "selected_bg"] else "fg" 1299 if item in tdict: 1300 setattr(self, item, Color(tdict[item], depth=depth, default=default)) 1301 else: 1302 setattr(self, item, Color(value, depth=depth, default=default)) 1303 1304 #* Create color gradients from one, two or three colors, 101 values indexed 0-100 1305 self.proc_start, self.proc_mid, self.proc_end = self.main_fg, Colors.null, self.inactive_fg 1306 self.proc_color_start, self.proc_color_mid, self.proc_color_end = self.inactive_fg, Colors.null, self.process_start 1307 1308 rgb: Dict[str, Tuple[int, int, int]] 1309 colors: List[List[int]] = [] 1310 for name in self.gradient: 1311 rgb = { "start" : getattr(self, f'{name}_start').dec, "mid" : getattr(self, f'{name}_mid').dec, "end" : getattr(self, f'{name}_end').dec } 1312 colors = [ list(getattr(self, f'{name}_start')) ] 1313 if rgb["end"][0] >= 0: 1314 r = 50 if rgb["mid"][0] >= 0 else 100 1315 for first, second in ["start", "mid" if r == 50 else "end"], ["mid", "end"]: 1316 for i in range(r): 1317 colors += [[rgb[first][n] + i * (rgb[second][n] - rgb[first][n]) // r for n in range(3)]] 1318 if r == 100: 1319 break 1320 self.gradient[name] += [ Color.fg(*color) for color in colors ] 1321 1322 else: 1323 c = Color.fg(*rgb["start"]) 1324 self.gradient[name] += [c] * 101 1325 #* Set terminal colors 1326 Term.fg = f'{self.main_fg}' 1327 Term.bg = f'{self.main_bg}' if CONFIG.theme_background else "\033[49m" 1328 Draw.now(self.main_fg, self.main_bg) 1329 1330 @classmethod 1331 def refresh(cls): 1332 '''Sets themes dict with names and paths to all found themes''' 1333 cls.themes = { "Default" : "Default" } 1334 try: 1335 for d in (THEME_DIR, USER_THEME_DIR): 1336 if not d: continue 1337 for f in os.listdir(d): 1338 if f.endswith(".theme"): 1339 cls.themes[f'{"" if d == THEME_DIR else "+"}{f[:-6]}'] = f'{d}/{f}' 1340 except Exception as e: 1341 errlog.exception(str(e)) 1342 1343 @staticmethod 1344 def _load_file(path: str) -> Dict[str, str]: 1345 '''Load a bashtop formatted theme file and return a dict''' 1346 new_theme: Dict[str, str] = {} 1347 try: 1348 with open(path, "r") as f: 1349 for line in f: 1350 if not line.startswith("theme["): continue 1351 key = line[6:line.find("]")] 1352 s = line.find('"') 1353 value = line[s + 1:line.find('"', s + 1)] 1354 new_theme[key] = value 1355 except Exception as e: 1356 errlog.exception(str(e)) 1357 1358 return new_theme 1359 1360class Banner: 1361 '''Holds the bpytop banner, .draw(line, [col=0], [center=False], [now=False])''' 1362 out: List[str] = [] 1363 c_color: str = "" 1364 length: int = 0 1365 if not out: 1366 for num, (color, color2, line) in enumerate(BANNER_SRC): 1367 if len(line) > length: length = len(line) 1368 out_var = "" 1369 line_color = Color.fg(color) 1370 line_color2 = Color.fg(color2) 1371 line_dark = Color.fg(f'#{80 - num * 6}') 1372 for n, letter in enumerate(line): 1373 if letter == "█" and c_color != line_color: 1374 if 5 < n < 25: c_color = line_color2 1375 else: c_color = line_color 1376 out_var += c_color 1377 elif letter == " ": 1378 letter = f'{Mv.r(1)}' 1379 c_color = "" 1380 elif letter != "█" and c_color != line_dark: 1381 c_color = line_dark 1382 out_var += line_dark 1383 out_var += letter 1384 out.append(out_var) 1385 1386 @classmethod 1387 def draw(cls, line: int, col: int = 0, center: bool = False, now: bool = False): 1388 out: str = "" 1389 if center: col = Term.width // 2 - cls.length // 2 1390 for n, o in enumerate(cls.out): 1391 out += f'{Mv.to(line + n, col)}{o}' 1392 out += f'{Term.fg}' 1393 if now: Draw.out(out) 1394 else: return out 1395 1396class Symbol: 1397 h_line: str = "─" 1398 v_line: str = "│" 1399 left_up: str = "┌" 1400 right_up: str = "┐" 1401 left_down: str = "└" 1402 right_down: str = "┘" 1403 title_left: str = "┤" 1404 title_right: str = "├" 1405 div_up: str = "┬" 1406 div_down: str = "┴" 1407 graph_up: Dict[float, str] = { 1408 0.0 : " ", 0.1 : "⢀", 0.2 : "⢠", 0.3 : "⢰", 0.4 : "⢸", 1409 1.0 : "⡀", 1.1 : "⣀", 1.2 : "⣠", 1.3 : "⣰", 1.4 : "⣸", 1410 2.0 : "⡄", 2.1 : "⣄", 2.2 : "⣤", 2.3 : "⣴", 2.4 : "⣼", 1411 3.0 : "⡆", 3.1 : "⣆", 3.2 : "⣦", 3.3 : "⣶", 3.4 : "⣾", 1412 4.0 : "⡇", 4.1 : "⣇", 4.2 : "⣧", 4.3 : "⣷", 4.4 : "⣿" 1413 } 1414 graph_up_small = graph_up.copy() 1415 graph_up_small[0.0] = "\033[1C" 1416 1417 graph_down: Dict[float, str] = { 1418 0.0 : " ", 0.1 : "⠈", 0.2 : "⠘", 0.3 : "⠸", 0.4 : "⢸", 1419 1.0 : "⠁", 1.1 : "⠉", 1.2 : "⠙", 1.3 : "⠹", 1.4 : "⢹", 1420 2.0 : "⠃", 2.1 : "⠋", 2.2 : "⠛", 2.3 : "⠻", 2.4 : "⢻", 1421 3.0 : "⠇", 3.1 : "⠏", 3.2 : "⠟", 3.3 : "⠿", 3.4 : "⢿", 1422 4.0 : "⡇", 4.1 : "⡏", 4.2 : "⡟", 4.3 : "⡿", 4.4 : "⣿" 1423 } 1424 graph_down_small = graph_down.copy() 1425 graph_down_small[0.0] = "\033[1C" 1426 meter: str = "■" 1427 up: str = "↑" 1428 down: str = "↓" 1429 left: str = "←" 1430 right: str = "→" 1431 enter: str = "↲" 1432 ok: str = f'{Color.fg("#30ff50")}√{Color.fg("#cc")}' 1433 fail: str = f'{Color.fg("#ff3050")}!{Color.fg("#cc")}' 1434 1435class Graph: 1436 '''Class for creating and adding to graphs 1437 * __str__ : returns graph as a string 1438 * add(value: int) : adds a value to graph and returns it as a string 1439 * __call__ : same as add 1440 ''' 1441 out: str 1442 width: int 1443 height: int 1444 graphs: Dict[bool, List[str]] 1445 colors: List[str] 1446 invert: bool 1447 max_value: int 1448 color_max_value: int 1449 offset: int 1450 no_zero: bool 1451 round_up_low: bool 1452 current: bool 1453 last: int 1454 lowest: int = 0 1455 symbol: Dict[float, str] 1456 1457 def __init__(self, width: int, height: int, color: Union[List[str], Color, None], data: List[int], invert: bool = False, max_value: int = 0, offset: int = 0, color_max_value: Union[int, None] = None, no_zero: bool = False, round_up_low: bool = False): 1458 self.graphs: Dict[bool, List[str]] = {False : [], True : []} 1459 self.current: bool = True 1460 self.width = width 1461 self.height = height 1462 self.invert = invert 1463 self.offset = offset 1464 self.round_up_low = round_up_low 1465 self.no_zero = no_zero or round_up_low 1466 if not data: data = [0] 1467 if max_value: 1468 self.lowest = 1 if self.round_up_low else 0 1469 self.max_value = max_value 1470 data = [ min_max((v + offset) * 100 // (max_value + offset), min_max(v + offset, 0, self.lowest), 100) for v in data ] #* Convert values to percentage values of max_value with max_value as ceiling 1471 else: 1472 self.max_value = 0 1473 if color_max_value: 1474 self.color_max_value = color_max_value 1475 else: 1476 self.color_max_value = self.max_value 1477 if self.color_max_value and self.max_value: 1478 color_scale = int(100.0 * self.max_value / self.color_max_value) 1479 else: 1480 color_scale = 100 1481 self.colors: List[str] = [] 1482 if isinstance(color, list) and height > 1: 1483 for i in range(1, height + 1): self.colors.insert(0, color[min(100, i * color_scale // height)]) #* Calculate colors of graph 1484 if invert: self.colors.reverse() 1485 elif isinstance(color, Color) and height > 1: 1486 self.colors = [ f'{color}' for _ in range(height) ] 1487 else: 1488 if isinstance(color, list): self.colors = color 1489 elif isinstance(color, Color): self.colors = [ f'{color}' for _ in range(101) ] 1490 if self.height == 1: 1491 self.symbol = Symbol.graph_down_small if invert else Symbol.graph_up_small 1492 else: 1493 self.symbol = Symbol.graph_down if invert else Symbol.graph_up 1494 value_width: int = ceil(len(data) / 2) 1495 filler: str = "" 1496 if value_width > width: #* If the size of given data set is bigger then width of graph, shrink data set 1497 data = data[-(width*2):] 1498 value_width = ceil(len(data) / 2) 1499 elif value_width < width: #* If the size of given data set is smaller then width of graph, fill graph with whitespace 1500 filler = self.symbol[0.0] * (width - value_width) 1501 if len(data) % 2: data.insert(0, 0) 1502 for _ in range(height): 1503 for b in [True, False]: 1504 self.graphs[b].append(filler) 1505 self._create(data, new=True) 1506 1507 def _create(self, data: List[int], new: bool = False): 1508 h_high: int 1509 h_low: int 1510 value: Dict[str, int] = { "left" : 0, "right" : 0 } 1511 val: int 1512 side: str 1513 1514 #* Create the graph 1515 for h in range(self.height): 1516 h_high = round(100 * (self.height - h) / self.height) if self.height > 1 else 100 1517 h_low = round(100 * (self.height - (h + 1)) / self.height) if self.height > 1 else 0 1518 for v in range(len(data)): 1519 if new: self.current = bool(v % 2) #* Switch between True and False graphs 1520 if new and v == 0: self.last = 0 1521 for val, side in [self.last, "left"], [data[v], "right"]: # type: ignore 1522 if val >= h_high: 1523 value[side] = 4 1524 elif val <= h_low: 1525 value[side] = 0 1526 else: 1527 if self.height == 1: value[side] = round(val * 4 / 100 + 0.5) 1528 else: value[side] = round((val - h_low) * 4 / (h_high - h_low) + 0.1) 1529 if self.no_zero and not (new and v == 0 and side == "left") and h == self.height - 1 and value[side] < 1 and not (self.round_up_low and val == 0): value[side] = 1 1530 if new: self.last = data[v] 1531 self.graphs[self.current][h] += self.symbol[float(value["left"] + value["right"] / 10)] 1532 if data: self.last = data[-1] 1533 self.out = "" 1534 1535 if self.height == 1: 1536 self.out += f'{"" if not self.colors else (THEME.inactive_fg if self.last < 5 else self.colors[self.last])}{self.graphs[self.current][0]}' 1537 elif self.height > 1: 1538 for h in range(self.height): 1539 if h > 0: self.out += f'{Mv.d(1)}{Mv.l(self.width)}' 1540 self.out += f'{"" if not self.colors else self.colors[h]}{self.graphs[self.current][h if not self.invert else (self.height - 1) - h]}' 1541 if self.colors: self.out += f'{Term.fg}' 1542 1543 def __call__(self, value: Union[int, None] = None) -> str: 1544 if not isinstance(value, int): return self.out 1545 self.current = not self.current 1546 if self.height == 1: 1547 if self.graphs[self.current][0].startswith(self.symbol[0.0]): 1548 self.graphs[self.current][0] = self.graphs[self.current][0].replace(self.symbol[0.0], "", 1) 1549 else: 1550 self.graphs[self.current][0] = self.graphs[self.current][0][1:] 1551 else: 1552 for n in range(self.height): 1553 self.graphs[self.current][n] = self.graphs[self.current][n][1:] 1554 if self.max_value: value = min_max((value + self.offset) * 100 // (self.max_value + self.offset), min_max(value + self.offset, 0, self.lowest), 100) 1555 self._create([value]) 1556 return self.out 1557 1558 def add(self, value: Union[int, None] = None) -> str: 1559 return self.__call__(value) 1560 1561 def __str__(self): 1562 return self.out 1563 1564 def __repr__(self): 1565 return repr(self.out) 1566 1567 1568class Graphs: 1569 '''Holds all graphs and lists of graphs for dynamically created graphs''' 1570 cpu: Dict[str, Graph] = {} 1571 cores: List[Graph] = [NotImplemented] * THREADS 1572 temps: List[Graph] = [NotImplemented] * (THREADS + 1) 1573 net: Dict[str, Graph] = {} 1574 detailed_cpu: Graph = NotImplemented 1575 detailed_mem: Graph = NotImplemented 1576 pid_cpu: Dict[int, Graph] = {} 1577 disk_io: Dict[str, Dict[str, Graph]] = {} 1578 1579class Meter: 1580 '''Creates a percentage meter 1581 __init__(value, width, theme, gradient_name) to create new meter 1582 __call__(value) to set value and return meter as a string 1583 __str__ returns last set meter as a string 1584 ''' 1585 out: str 1586 color_gradient: List[str] 1587 color_inactive: Color 1588 gradient_name: str 1589 width: int 1590 invert: bool 1591 saved: Dict[int, str] 1592 1593 def __init__(self, value: int, width: int, gradient_name: str, invert: bool = False): 1594 self.gradient_name = gradient_name 1595 self.color_gradient = THEME.gradient[gradient_name] 1596 self.color_inactive = THEME.meter_bg 1597 self.width = width 1598 self.saved = {} 1599 self.invert = invert 1600 self.out = self._create(value) 1601 1602 def __call__(self, value: Union[int, None]) -> str: 1603 if not isinstance(value, int): return self.out 1604 if value > 100: value = 100 1605 elif value < 0: value = 100 1606 if value in self.saved: 1607 self.out = self.saved[value] 1608 else: 1609 self.out = self._create(value) 1610 return self.out 1611 1612 def __str__(self) -> str: 1613 return self.out 1614 1615 def __repr__(self): 1616 return repr(self.out) 1617 1618 def _create(self, value: int) -> str: 1619 if value > 100: value = 100 1620 elif value < 0: value = 100 1621 out: str = "" 1622 for i in range(1, self.width + 1): 1623 if value >= round(i * 100 / self.width): 1624 out += f'{self.color_gradient[round(i * 100 / self.width) if not self.invert else round(100 - (i * 100 / self.width))]}{Symbol.meter}' 1625 else: 1626 out += self.color_inactive(Symbol.meter * (self.width + 1 - i)) 1627 break 1628 else: 1629 out += f'{Term.fg}' 1630 if not value in self.saved: 1631 self.saved[value] = out 1632 return out 1633 1634class Meters: 1635 cpu: Meter 1636 battery: Meter 1637 mem: Dict[str, Union[Meter, Graph]] = {} 1638 swap: Dict[str, Union[Meter, Graph]] = {} 1639 disks_used: Dict[str, Meter] = {} 1640 disks_free: Dict[str, Meter] = {} 1641 1642class Box: 1643 '''Box class with all needed attributes for create_box() function''' 1644 name: str 1645 num: int = 0 1646 boxes: List = [] 1647 view_modes: Dict[str, List] = {"full" : ["cpu", "mem", "net", "proc"], "stat" : ["cpu", "mem", "net"], "proc" : ["cpu", "proc"]} 1648 view_mode: str 1649 for view_mode in view_modes: 1650 if sorted(CONFIG.shown_boxes.split(), key=str.lower) == view_modes[view_mode]: 1651 break 1652 else: 1653 view_mode = "user" 1654 view_modes["user"] = CONFIG.shown_boxes.split() 1655 height_p: int 1656 width_p: int 1657 x: int 1658 y: int 1659 width: int 1660 height: int 1661 out: str 1662 bg: str 1663 _b_cpu_h: int 1664 _b_mem_h: int 1665 redraw_all: bool 1666 buffers: List[str] = [] 1667 c_counter: int = 0 1668 clock_on: bool = False 1669 clock: str = "" 1670 clock_len: int = 0 1671 resized: bool = False 1672 clock_custom_format: Dict[str, Any] = { 1673 "/host" : os.uname()[1], 1674 "/user" : os.environ.get("USER") or pwd.getpwuid(os.getuid())[0], 1675 "/uptime" : "", 1676 } 1677 if clock_custom_format["/host"].endswith(".local"): 1678 clock_custom_format["/host"] = clock_custom_format["/host"].replace(".local", "") 1679 1680 @classmethod 1681 def calc_sizes(cls): 1682 '''Calculate sizes of boxes''' 1683 cls.boxes = CONFIG.shown_boxes.split() 1684 for sub in cls.__subclasses__(): 1685 sub._calc_size() # type: ignore 1686 sub.resized = True # type: ignore 1687 1688 @classmethod 1689 def draw_update_ms(cls, now: bool = True): 1690 if not "cpu" in cls.boxes: return 1691 update_string: str = f'{CONFIG.update_ms}ms' 1692 xpos: int = CpuBox.x + CpuBox.width - len(update_string) - 15 1693 if not "+" in Key.mouse: 1694 Key.mouse["+"] = [[xpos + 7 + i, CpuBox.y] for i in range(3)] 1695 Key.mouse["-"] = [[CpuBox.x + CpuBox.width - 4 + i, CpuBox.y] for i in range(3)] 1696 Draw.buffer("update_ms!" if now and not Menu.active else "update_ms", 1697 f'{Mv.to(CpuBox.y, xpos)}{THEME.cpu_box(Symbol.h_line * 7, Symbol.title_left)}{Fx.b}{THEME.hi_fg("+")} ', 1698 f'{THEME.title(update_string)} {THEME.hi_fg("-")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}', only_save=Menu.active, once=True) 1699 if now and not Menu.active: 1700 Draw.clear("update_ms") 1701 if CONFIG.show_battery and hasattr(psutil, "sensors_battery") and psutil.sensors_battery(): 1702 Draw.out("battery") 1703 1704 @classmethod 1705 def draw_clock(cls, force: bool = False): 1706 if not "cpu" in cls.boxes or not cls.clock_on: return 1707 cls.c_counter += 1 1708 if cls.c_counter > 3600 / (Config.update_ms / 1000): 1709 tzset() 1710 cls.c_counter = 0 1711 out: str = "" 1712 if force: pass 1713 elif Term.resized or strftime(CONFIG.draw_clock) == cls.clock: return 1714 clock_string = cls.clock = strftime(CONFIG.draw_clock) 1715 for custom in cls.clock_custom_format: 1716 if custom in clock_string: 1717 if custom == "/uptime": cls.clock_custom_format["/uptime"] = CpuCollector.uptime 1718 clock_string = clock_string.replace(custom, cls.clock_custom_format[custom]) 1719 clock_len = len(clock_string[:(CpuBox.width-56)]) 1720 if cls.clock_len != clock_len and not CpuBox.resized: 1721 out = f'{Mv.to(CpuBox.y, ((CpuBox.width)//2)-(cls.clock_len//2))}{Fx.ub}{THEME.cpu_box}{Symbol.h_line * cls.clock_len}' 1722 cls.clock_len = clock_len 1723 now: bool = False if Menu.active else not force 1724 out += (f'{Mv.to(CpuBox.y, ((CpuBox.width)//2)-(clock_len//2))}{Fx.ub}{THEME.cpu_box}' 1725 f'{Symbol.title_left}{Fx.b}{THEME.title(clock_string[:clock_len])}{Fx.ub}{THEME.cpu_box}{Symbol.title_right}{Term.fg}') 1726 Draw.buffer("clock", out, z=1, now=now, once=not force, only_save=Menu.active) 1727 if now and not Menu.active: 1728 if CONFIG.show_battery and hasattr(psutil, "sensors_battery") and psutil.sensors_battery(): 1729 Draw.out("battery") 1730 1731 @classmethod 1732 def empty_bg(cls) -> str: 1733 return (f'{Term.clear}' + 1734 (f'{Banner.draw(Term.height // 2 - 10, center=True)}' 1735 f'{Mv.d(1)}{Mv.l(46)}{Colors.black_bg}{Colors.default}{Fx.b}[esc] Menu' 1736 f'{Mv.r(25)}{Fx.i}Version: {VERSION}{Fx.ui}' if Term.height > 22 else "") + 1737 f'{Mv.d(1)}{Mv.l(34)}{Fx.b}All boxes hidden!' 1738 f'{Mv.d(1)}{Mv.l(17)}{Fx.b}[1] {Fx.ub}Toggle CPU box' 1739 f'{Mv.d(1)}{Mv.l(18)}{Fx.b}[2] {Fx.ub}Toggle MEM box' 1740 f'{Mv.d(1)}{Mv.l(18)}{Fx.b}[3] {Fx.ub}Toggle NET box' 1741 f'{Mv.d(1)}{Mv.l(18)}{Fx.b}[4] {Fx.ub}Toggle PROC box' 1742 f'{Mv.d(1)}{Mv.l(19)}{Fx.b}[m] {Fx.ub}Cycle presets' 1743 f'{Mv.d(1)}{Mv.l(17)}{Fx.b}[q] Quit {Fx.ub}{Term.bg}{Term.fg}') 1744 1745 @classmethod 1746 def draw_bg(cls, now: bool = True): 1747 '''Draw all boxes outlines and titles''' 1748 out: str = "" 1749 if not cls.boxes: 1750 out = cls.empty_bg() 1751 else: 1752 out = "".join(sub._draw_bg() for sub in cls.__subclasses__()) # type: ignore 1753 Draw.buffer("bg", out, now=now, z=1000, only_save=Menu.active, once=True) 1754 cls.draw_update_ms(now=now) 1755 if CONFIG.draw_clock: cls.draw_clock(force=True) 1756 1757class SubBox: 1758 box_x: int = 0 1759 box_y: int = 0 1760 box_width: int = 0 1761 box_height: int = 0 1762 box_columns: int = 0 1763 column_size: int = 0 1764 1765class CpuBox(Box, SubBox): 1766 name = "cpu" 1767 num = 1 1768 x = 1 1769 y = 1 1770 height_p = 32 1771 width_p = 100 1772 min_w: int = 60 1773 min_h: int = 8 1774 resized: bool = True 1775 redraw: bool = False 1776 buffer: str = "cpu" 1777 battery_percent: int = 1000 1778 battery_secs: int = 0 1779 battery_status: str = "Unknown" 1780 old_battery_pos = 0 1781 old_battery_len = 0 1782 battery_path: Union[str, None] = "" 1783 battery_clear: bool = False 1784 battery_symbols: Dict[str, str] = {"Charging": "▲", 1785 "Discharging": "▼", 1786 "Full": "■", 1787 "Not charging": "■"} 1788 clock_block: bool = True 1789 Box.buffers.append(buffer) 1790 1791 @classmethod 1792 def _calc_size(cls): 1793 if not "cpu" in cls.boxes: 1794 Box._b_cpu_h = 0 1795 cls.width = Term.width 1796 return 1797 cpu = CpuCollector 1798 height_p: int 1799 if cls.boxes == ["cpu"]: 1800 height_p = 100 1801 else: 1802 height_p = cls.height_p 1803 cls.width = round(Term.width * cls.width_p / 100) 1804 cls.height = round(Term.height * height_p / 100) 1805 if cls.height < 8: cls.height = 8 1806 Box._b_cpu_h = cls.height 1807 #THREADS = 64 1808 cls.box_columns = ceil((THREADS + 1) / (cls.height - 5)) 1809 if cls.box_columns * (20 + 13 if cpu.got_sensors else 21) < cls.width - (cls.width // 3): 1810 cls.column_size = 2 1811 cls.box_width = (20 + 13 if cpu.got_sensors else 21) * cls.box_columns - ((cls.box_columns - 1) * 1) 1812 elif cls.box_columns * (15 + 6 if cpu.got_sensors else 15) < cls.width - (cls.width // 3): 1813 cls.column_size = 1 1814 cls.box_width = (15 + 6 if cpu.got_sensors else 15) * cls.box_columns - ((cls.box_columns - 1) * 1) 1815 elif cls.box_columns * (8 + 6 if cpu.got_sensors else 8) < cls.width - (cls.width // 3): 1816 cls.column_size = 0 1817 else: 1818 cls.box_columns = (cls.width - cls.width // 3) // (8 + 6 if cpu.got_sensors else 8); cls.column_size = 0 1819 1820 if cls.column_size == 0: cls.box_width = (8 + 6 if cpu.got_sensors else 8) * cls.box_columns + 1 1821 1822 cls.box_height = ceil(THREADS / cls.box_columns) + 4 1823 1824 if cls.box_height > cls.height - 2: cls.box_height = cls.height - 2 1825 cls.box_x = (cls.width - 1) - cls.box_width 1826 cls.box_y = cls.y + ceil((cls.height - 2) / 2) - ceil(cls.box_height / 2) + 1 1827 1828 @classmethod 1829 def _draw_bg(cls) -> str: 1830 if not "cpu" in cls.boxes: return "" 1831 if not "M" in Key.mouse: 1832 Key.mouse["M"] = [[cls.x + 10 + i, cls.y] for i in range(6)] 1833 return (f'{create_box(box=cls, line_color=THEME.cpu_box)}' 1834 f'{Mv.to(cls.y, cls.x + 10)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("M")}{THEME.title("enu")}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}' 1835 f'{create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title=CPU_NAME[:cls.box_width - 14] if not CONFIG.custom_cpu_name else CONFIG.custom_cpu_name[:cls.box_width - 14])}') 1836 1837 @classmethod 1838 def battery_activity(cls) -> bool: 1839 if not hasattr(psutil, "sensors_battery") or psutil.sensors_battery() == None: 1840 if cls.battery_percent != 1000: 1841 cls.battery_clear = True 1842 return False 1843 1844 if cls.battery_path == "": 1845 cls.battery_path = None 1846 if os.path.isdir("/sys/class/power_supply"): 1847 for directory in sorted(os.listdir("/sys/class/power_supply")): 1848 if directory.startswith('BAT') or 'battery' in directory.lower(): 1849 cls.battery_path = f'/sys/class/power_supply/{directory}/' 1850 break 1851 1852 return_true: bool = False 1853 percent: int = ceil(getattr(psutil.sensors_battery(), "percent", 0)) 1854 if percent != cls.battery_percent: 1855 cls.battery_percent = percent 1856 return_true = True 1857 1858 seconds: int = getattr(psutil.sensors_battery(), "secsleft", 0) 1859 if seconds != cls.battery_secs: 1860 cls.battery_secs = seconds 1861 return_true = True 1862 1863 status: str = "not_set" 1864 if cls.battery_path: 1865 status = readfile(cls.battery_path + "status", default="not_set") 1866 if status == "not_set" and getattr(psutil.sensors_battery(), "power_plugged", None) == True: 1867 status = "Charging" if cls.battery_percent < 100 else "Full" 1868 elif status == "not_set" and getattr(psutil.sensors_battery(), "power_plugged", None) == False: 1869 status = "Discharging" 1870 elif status == "not_set": 1871 status = "Unknown" 1872 if status != cls.battery_status: 1873 cls.battery_status = status 1874 return_true = True 1875 1876 return return_true or cls.resized or cls.redraw or Menu.active 1877 1878 @classmethod 1879 def _draw_fg(cls): 1880 if not "cpu" in cls.boxes: return 1881 cpu = CpuCollector 1882 if cpu.redraw: cls.redraw = True 1883 out: str = "" 1884 out_misc: str = "" 1885 lavg: str = "" 1886 x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 1887 bx, by, bw, bh = cls.box_x + 1, cls.box_y + 1, cls.box_width - 2, cls.box_height - 2 1888 hh: int = ceil(h / 2) 1889 hh2: int = h - hh 1890 mid_line: bool = False 1891 temp: int = 0 1892 unit: str = "" 1893 if not CONFIG.cpu_single_graph and CONFIG.cpu_graph_upper != CONFIG.cpu_graph_lower: 1894 mid_line = True 1895 if h % 2: hh = floor(h / 2) 1896 else: hh2 -= 1 1897 1898 hide_cores: bool = (cpu.cpu_temp_only or not CONFIG.show_coretemp) and cpu.got_sensors 1899 ct_width: int = (max(6, 6 * cls.column_size)) * hide_cores 1900 1901 if cls.resized or cls.redraw: 1902 if not "m" in Key.mouse: 1903 Key.mouse["m"] = [[cls.x + 16 + i, cls.y] for i in range(12)] 1904 out_misc += f'{Mv.to(cls.y, cls.x + 16)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("m")}{THEME.title}ode:{Box.view_mode}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}' 1905 Graphs.cpu["up"] = Graph(w - bw - 3, (h if CONFIG.cpu_single_graph else hh), THEME.gradient["cpu"], cpu.cpu_upper, round_up_low=True) 1906 if not CONFIG.cpu_single_graph: 1907 Graphs.cpu["down"] = Graph(w - bw - 3, hh2, THEME.gradient["cpu"], cpu.cpu_lower, invert=CONFIG.cpu_invert_lower, round_up_low=True) 1908 Meters.cpu = Meter(cpu.cpu_usage[0][-1], bw - (21 if cpu.got_sensors else 9), "cpu") 1909 if cls.column_size > 0 or ct_width > 0: 1910 for n in range(THREADS): 1911 Graphs.cores[n] = Graph(5 * cls.column_size + ct_width, 1, None, cpu.cpu_usage[n + 1]) 1912 if cpu.got_sensors: 1913 Graphs.temps[0] = Graph(5, 1, None, cpu.cpu_temp[0], max_value=cpu.cpu_temp_crit, offset=-23) 1914 if cls.column_size > 1: 1915 for n in range(1, THREADS + 1): 1916 if not cpu.cpu_temp[n]: 1917 continue 1918 Graphs.temps[n] = Graph(5, 1, None, cpu.cpu_temp[n], max_value=cpu.cpu_temp_crit, offset=-23) 1919 Draw.buffer("cpu_misc", out_misc, only_save=True) 1920 1921 if CONFIG.show_battery and cls.battery_activity(): 1922 bat_out: str = "" 1923 if cls.battery_secs > 0: 1924 battery_time: str = f' {cls.battery_secs // 3600:02}:{(cls.battery_secs % 3600) // 60:02}' 1925 else: 1926 battery_time = "" 1927 if not hasattr(Meters, "battery") or cls.resized: 1928 Meters.battery = Meter(cls.battery_percent, 10, "cpu", invert=True) 1929 battery_symbol: str = cls.battery_symbols.get(cls.battery_status, "○") 1930 battery_len: int = len(f'{CONFIG.update_ms}') + (11 if cls.width >= 100 else 0) + len(battery_time) + len(f'{cls.battery_percent}') 1931 battery_pos = cls.width - battery_len - 17 1932 if (battery_pos != cls.old_battery_pos or battery_len != cls.old_battery_len) and cls.old_battery_pos > 0 and not cls.resized: 1933 bat_out += f'{Mv.to(y-1, cls.old_battery_pos)}{THEME.cpu_box(Symbol.h_line*(cls.old_battery_len+4))}' 1934 cls.old_battery_pos, cls.old_battery_len = battery_pos, battery_len 1935 bat_out += (f'{Mv.to(y-1, battery_pos)}{THEME.cpu_box(Symbol.title_left)}{Fx.b}{THEME.title}BAT{battery_symbol} {cls.battery_percent}%'+ 1936 ("" if cls.width < 100 else f' {Fx.ub}{Meters.battery(cls.battery_percent)}{Fx.b}') + 1937 f'{THEME.title}{battery_time}{Fx.ub}{THEME.cpu_box(Symbol.title_right)}') 1938 Draw.buffer("battery", f'{bat_out}{Term.fg}', only_save=Menu.active) 1939 elif cls.battery_clear: 1940 out += f'{Mv.to(y-1, cls.old_battery_pos)}{THEME.cpu_box(Symbol.h_line*(cls.old_battery_len+4))}' 1941 cls.battery_clear = False 1942 cls.battery_percent = 1000 1943 cls.battery_secs = 0 1944 cls.battery_status = "Unknown" 1945 cls.old_battery_pos = 0 1946 cls.old_battery_len = 0 1947 cls.battery_path = "" 1948 Draw.clear("battery", saved=True) 1949 1950 cx = cy = cc = 0 1951 ccw = (bw + 1) // cls.box_columns 1952 if cpu.cpu_freq: 1953 freq: str = f'{cpu.cpu_freq} Mhz' if cpu.cpu_freq < 1000 else f'{float(cpu.cpu_freq / 1000):.1f} GHz' 1954 out += f'{Mv.to(by - 1, bx + bw - 9)}{THEME.div_line(Symbol.title_left)}{Fx.b}{THEME.title(freq)}{Fx.ub}{THEME.div_line(Symbol.title_right)}' 1955 out += f'{Mv.to(y, x)}{Graphs.cpu["up"](None if cls.resized else cpu.cpu_upper[-1])}' 1956 if mid_line: 1957 out += (f'{Mv.to(y+hh, x-1)}{THEME.cpu_box(Symbol.title_right)}{THEME.div_line}{Symbol.h_line * (w - bw - 3)}{THEME.div_line(Symbol.title_left)}' 1958 f'{Mv.to(y+hh, x+((w-bw)//2)-((len(CONFIG.cpu_graph_upper)+len(CONFIG.cpu_graph_lower))//2)-4)}{THEME.main_fg}{CONFIG.cpu_graph_upper}{Mv.r(1)}▲▼{Mv.r(1)}{CONFIG.cpu_graph_lower}') 1959 if not CONFIG.cpu_single_graph and Graphs.cpu.get("down"): 1960 out += f'{Mv.to(y + hh + (1 * mid_line), x)}{Graphs.cpu["down"](None if cls.resized else cpu.cpu_lower[-1])}' 1961 out += (f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{Fx.b}{"CPU "}{Fx.ub}{Meters.cpu(cpu.cpu_usage[0][-1])}' 1962 f'{THEME.gradient["cpu"][cpu.cpu_usage[0][-1]]}{cpu.cpu_usage[0][-1]:>4}{THEME.main_fg}%') 1963 if cpu.got_sensors: 1964 try: 1965 temp, unit = temperature(cpu.cpu_temp[0][-1], CONFIG.temp_scale) 1966 out += (f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][min_max(cpu.cpu_temp[0][-1], 0, cpu.cpu_temp_crit) * 100 // cpu.cpu_temp_crit]}{Graphs.temps[0](None if cls.resized else cpu.cpu_temp[0][-1])}' 1967 f'{temp:>4}{THEME.main_fg}{unit}') 1968 except: 1969 cpu.got_sensors = False 1970 1971 cy += 1 1972 for n in range(1, THREADS + 1): 1973 out += f'{THEME.main_fg}{Mv.to(by + cy, bx + cx)}{Fx.b + "C" + Fx.ub if THREADS < 100 else ""}{str(n):<{2 if cls.column_size == 0 else 3}}' 1974 if cls.column_size > 0 or ct_width > 0: 1975 out += f'{THEME.inactive_fg}{"⡀" * (5 * cls.column_size + ct_width)}{Mv.l(5 * cls.column_size + ct_width)}{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}{Graphs.cores[n-1](None if cls.resized else cpu.cpu_usage[n][-1])}' 1976 else: 1977 out += f'{THEME.gradient["cpu"][cpu.cpu_usage[n][-1]]}' 1978 out += f'{cpu.cpu_usage[n][-1]:>{3 if cls.column_size < 2 else 4}}{THEME.main_fg}%' 1979 if cpu.got_sensors and cpu.cpu_temp[n] and not hide_cores: 1980 try: 1981 temp, unit = temperature(cpu.cpu_temp[n][-1], CONFIG.temp_scale) 1982 if cls.column_size > 1: 1983 out += f'{THEME.inactive_fg} ⡀⡀⡀⡀⡀{Mv.l(5)}{THEME.gradient["temp"][min_max(cpu.cpu_temp[n][-1], 0, cpu.cpu_temp_crit) * 100 // cpu.cpu_temp_crit]}{Graphs.temps[n](None if cls.resized else cpu.cpu_temp[n][-1])}' 1984 else: 1985 out += f'{THEME.gradient["temp"][min_max(temp, 0, cpu.cpu_temp_crit) * 100 // cpu.cpu_temp_crit]}' 1986 out += f'{temp:>4}{THEME.main_fg}{unit}' 1987 except: 1988 cpu.got_sensors = False 1989 elif cpu.got_sensors and not hide_cores: 1990 out += f'{Mv.r(max(6, 6 * cls.column_size))}' 1991 out += f'{THEME.div_line(Symbol.v_line)}' 1992 cy += 1 1993 if cy > ceil(THREADS/cls.box_columns) and n != THREADS: 1994 cc += 1; cy = 1; cx = ccw * cc 1995 if cc == cls.box_columns: break 1996 1997 if cy < bh - 1: cy = bh - 1 1998 1999 if cy < bh and cc < cls.box_columns: 2000 if cls.column_size == 2 and cpu.got_sensors: 2001 lavg = f' Load AVG: {" ".join(str(l) for l in cpu.load_avg):^19.19}' 2002 elif cls.column_size == 2 or (cls.column_size == 1 and cpu.got_sensors): 2003 lavg = f'LAV: {" ".join(str(l) for l in cpu.load_avg):^14.14}' 2004 elif cls.column_size == 1 or (cls.column_size == 0 and cpu.got_sensors): 2005 lavg = f'L {" ".join(str(round(l, 1)) for l in cpu.load_avg):^11.11}' 2006 else: 2007 lavg = f'{" ".join(str(round(l, 1)) for l in cpu.load_avg[:2]):^7.7}' 2008 out += f'{Mv.to(by + cy, bx + cx)}{THEME.main_fg}{lavg}{THEME.div_line(Symbol.v_line)}' 2009 2010 if CONFIG.show_uptime: 2011 out += f'{Mv.to(y + (0 if not CONFIG.cpu_invert_lower or CONFIG.cpu_single_graph else h - 1), x + 1)}{THEME.graph_text}{Fx.trans("up " + cpu.uptime)}' 2012 2013 2014 Draw.buffer(cls.buffer, f'{out_misc}{out}{Term.fg}', only_save=Menu.active) 2015 cls.resized = cls.redraw = cls.clock_block = False 2016 2017class MemBox(Box): 2018 name = "mem" 2019 num = 2 2020 height_p = 38 2021 width_p = 45 2022 min_w: int = 36 2023 min_h: int = 10 2024 x = 1 2025 y = 1 2026 mem_meter: int = 0 2027 mem_size: int = 0 2028 disk_meter: int = 0 2029 divider: int = 0 2030 mem_width: int = 0 2031 disks_width: int = 0 2032 disks_io_h: int = 0 2033 disks_io_order: List[str] = [] 2034 graph_speeds: Dict[str, int] = {} 2035 graph_height: int 2036 resized: bool = True 2037 redraw: bool = False 2038 buffer: str = "mem" 2039 swap_on: bool = CONFIG.show_swap 2040 Box.buffers.append(buffer) 2041 mem_names: List[str] = ["used", "available", "cached", "free"] 2042 swap_names: List[str] = ["used", "free"] 2043 2044 @classmethod 2045 def _calc_size(cls): 2046 if not "mem" in cls.boxes: 2047 Box._b_mem_h = 0 2048 cls.width = Term.width 2049 return 2050 width_p: int; height_p: int 2051 if not "proc" in cls.boxes: 2052 width_p = 100 2053 else: 2054 width_p = cls.width_p 2055 2056 if not "cpu" in cls.boxes: 2057 height_p = 60 if "net" in cls.boxes else 98 2058 elif not "net" in cls.boxes: 2059 height_p = 98 - CpuBox.height_p 2060 else: 2061 height_p = cls.height_p 2062 2063 cls.width = round(Term.width * width_p / 100) 2064 cls.height = round(Term.height * height_p / 100) + 1 2065 if cls.height + Box._b_cpu_h > Term.height: cls.height = Term.height - Box._b_cpu_h 2066 Box._b_mem_h = cls.height 2067 cls.y = Box._b_cpu_h + 1 2068 if CONFIG.show_disks: 2069 cls.mem_width = ceil((cls.width - 3) / 2) 2070 cls.disks_width = cls.width - cls.mem_width - 3 2071 if cls.mem_width + cls.disks_width < cls.width - 2: cls.mem_width += 1 2072 cls.divider = cls.x + cls.mem_width 2073 else: 2074 cls.mem_width = cls.width - 1 2075 2076 item_height: int = 6 if cls.swap_on and not CONFIG.swap_disk else 4 2077 if cls.height - (3 if cls.swap_on and not CONFIG.swap_disk else 2) > 2 * item_height: cls.mem_size = 3 2078 elif cls.mem_width > 25: cls.mem_size = 2 2079 else: cls.mem_size = 1 2080 2081 cls.mem_meter = cls.width - (cls.disks_width if CONFIG.show_disks else 0) - (9 if cls.mem_size > 2 else 20) 2082 if cls.mem_size == 1: cls.mem_meter += 6 2083 if cls.mem_meter < 1: cls.mem_meter = 0 2084 2085 if CONFIG.mem_graphs: 2086 cls.graph_height = round(((cls.height - (2 if cls.swap_on and not CONFIG.swap_disk else 1)) - (2 if cls.mem_size == 3 else 1) * item_height) / item_height) 2087 if cls.graph_height == 0: cls.graph_height = 1 2088 if cls.graph_height > 1: cls.mem_meter += 6 2089 else: 2090 cls.graph_height = 0 2091 2092 if CONFIG.show_disks: 2093 cls.disk_meter = cls.width - cls.mem_width - 23 2094 if cls.disks_width < 25: 2095 cls.disk_meter += 10 2096 if cls.disk_meter < 1: cls.disk_meter = 0 2097 2098 @classmethod 2099 def _draw_bg(cls) -> str: 2100 if not "mem" in cls.boxes: return "" 2101 out: str = "" 2102 out += f'{create_box(box=cls, line_color=THEME.mem_box)}' 2103 if CONFIG.show_disks: 2104 out += (f'{Mv.to(cls.y, cls.divider + 2)}{THEME.mem_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("d")}{THEME.title("isks")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}' 2105 f'{Mv.to(cls.y, cls.divider)}{THEME.mem_box(Symbol.div_up)}' 2106 f'{Mv.to(cls.y + cls.height - 1, cls.divider)}{THEME.mem_box(Symbol.div_down)}{THEME.div_line}' 2107 f'{"".join(f"{Mv.to(cls.y + i, cls.divider)}{Symbol.v_line}" for i in range(1, cls.height - 1))}') 2108 Key.mouse["d"] = [[cls.divider + 3 + i, cls.y] for i in range(5)] 2109 else: 2110 out += f'{Mv.to(cls.y, cls.x + cls.width - 9)}{THEME.mem_box(Symbol.title_left)}{THEME.hi_fg("d")}{THEME.title("isks")}{THEME.mem_box(Symbol.title_right)}' 2111 Key.mouse["d"] = [[cls.x + cls.width - 8 + i, cls.y] for i in range(5)] 2112 return out 2113 2114 @classmethod 2115 def _draw_fg(cls): 2116 if not "mem" in cls.boxes: return 2117 mem = MemCollector 2118 if mem.redraw: cls.redraw = True 2119 out: str = "" 2120 out_misc: str = "" 2121 gbg: str = "" 2122 gmv: str = "" 2123 gli: str = "" 2124 x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 2125 if cls.resized or cls.redraw: 2126 cls.redraw = True 2127 cls._calc_size() 2128 out_misc += cls._draw_bg() 2129 Meters.mem = {} 2130 Meters.swap = {} 2131 Meters.disks_used = {} 2132 Meters.disks_free = {} 2133 if cls.mem_meter > 0: 2134 for name in cls.mem_names: 2135 if CONFIG.mem_graphs: 2136 Meters.mem[name] = Graph(cls.mem_meter, cls.graph_height, THEME.gradient[name], mem.vlist[name]) 2137 else: 2138 Meters.mem[name] = Meter(mem.percent[name], cls.mem_meter, name) 2139 if cls.swap_on: 2140 for name in cls.swap_names: 2141 if CONFIG.swap_disk and CONFIG.show_disks: 2142 break 2143 elif CONFIG.mem_graphs and not CONFIG.swap_disk: 2144 Meters.swap[name] = Graph(cls.mem_meter, cls.graph_height, THEME.gradient[name], mem.swap_vlist[name]) 2145 else: 2146 Meters.swap[name] = Meter(mem.swap_percent[name], cls.mem_meter, name) 2147 2148 if CONFIG.show_disks and mem.disks: 2149 if CONFIG.show_io_stat or CONFIG.io_mode: 2150 d_graph: List[str] = [] 2151 d_no_graph: List[str] = [] 2152 l_vals: List[Tuple[str, int, str, bool]] = [] 2153 if CONFIG.io_mode: 2154 cls.disks_io_h = (cls.height - 2 - len(mem.disks)) // max(1, len(mem.disks_io_dict)) 2155 if cls.disks_io_h < 2: cls.disks_io_h = 1 if CONFIG.io_graph_combined else 2 2156 else: 2157 cls.disks_io_h = 1 2158 2159 if CONFIG.io_graph_speeds and not cls.graph_speeds: 2160 try: 2161 cls.graph_speeds = { spds.split(":")[0] : int(spds.split(":")[1]) for spds in list(i.strip() for i in CONFIG.io_graph_speeds.split(","))} 2162 except (KeyError, ValueError): 2163 errlog.error("Wrong formatting in io_graph_speeds variable. Using defaults.") 2164 for name in mem.disks.keys(): 2165 if name in mem.disks_io_dict: 2166 d_graph.append(name) 2167 else: 2168 d_no_graph.append(name) 2169 continue 2170 if CONFIG.io_graph_combined or not CONFIG.io_mode: 2171 l_vals = [("rw", cls.disks_io_h, "available", False)] 2172 else: 2173 l_vals = [("read", cls.disks_io_h // 2, "free", False), ("write", cls.disks_io_h // 2, "used", True)] 2174 2175 Graphs.disk_io[name] = {_name : Graph(width=cls.disks_width - (6 if not CONFIG.io_mode else 0), height=_height, color=THEME.gradient[_gradient], 2176 data=mem.disks_io_dict[name][_name], invert=_invert, max_value=cls.graph_speeds.get(name, 10), no_zero=True) 2177 for _name, _height, _gradient, _invert in l_vals} 2178 cls.disks_io_order = d_graph + d_no_graph 2179 2180 if cls.disk_meter > 0: 2181 for n, name in enumerate(mem.disks.keys()): 2182 if n * 2 > h: break 2183 Meters.disks_used[name] = Meter(mem.disks[name]["used_percent"], cls.disk_meter, "used") 2184 if len(mem.disks) * 3 <= h + 1: 2185 Meters.disks_free[name] = Meter(mem.disks[name]["free_percent"], cls.disk_meter, "free") 2186 if not "g" in Key.mouse: 2187 Key.mouse["g"] = [[x + 8 + i, y-1] for i in range(5)] 2188 out_misc += (f'{Mv.to(y-1, x + 7)}{THEME.mem_box(Symbol.title_left)}{Fx.b if CONFIG.mem_graphs else ""}' 2189 f'{THEME.hi_fg("g")}{THEME.title("raph")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}') 2190 if CONFIG.show_disks: 2191 if not "s" in Key.mouse: 2192 Key.mouse["s"] = [[x + w - 6 + i, y-1] for i in range(4)] 2193 out_misc += (f'{Mv.to(y-1, x + w - 7)}{THEME.mem_box(Symbol.title_left)}{Fx.b if CONFIG.swap_disk else ""}' 2194 f'{THEME.hi_fg("s")}{THEME.title("wap")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}') 2195 if not "i" in Key.mouse: 2196 Key.mouse["i"] = [[x + w - 10 + i, y-1] for i in range(2)] 2197 out_misc += (f'{Mv.to(y-1, x + w - 11)}{THEME.mem_box(Symbol.title_left)}{Fx.b if CONFIG.io_mode else ""}' 2198 f'{THEME.hi_fg("i")}{THEME.title("o")}{Fx.ub}{THEME.mem_box(Symbol.title_right)}') 2199 2200 if Collector.collect_interrupt: return 2201 Draw.buffer("mem_misc", out_misc, only_save=True) 2202 try: 2203 #* Mem 2204 cx = 1; cy = 1 2205 2206 out += f'{Mv.to(y, x+1)}{THEME.title}{Fx.b}Total:{mem.string["total"]:>{cls.mem_width - 9}}{Fx.ub}{THEME.main_fg}' 2207 if cls.graph_height > 0: 2208 gli = f'{Mv.l(2)}{THEME.mem_box(Symbol.title_right)}{THEME.div_line}{Symbol.h_line * (cls.mem_width - 1)}{"" if CONFIG.show_disks else THEME.mem_box}{Symbol.title_left}{Mv.l(cls.mem_width - 1)}{THEME.title}' 2209 if cls.graph_height >= 2: 2210 gbg = f'{Mv.l(1)}' 2211 gmv = f'{Mv.l(cls.mem_width - 2)}{Mv.u(cls.graph_height - 1)}' 2212 2213 big_mem: bool = cls.mem_width > 21 2214 for name in cls.mem_names: 2215 if cy > h - 1: break 2216 if Collector.collect_interrupt: return 2217 if cls.mem_size > 2: 2218 out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if big_mem else 5]+":":<{1 if big_mem else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.string[name])))}{Fx.trans(mem.string[name])}' 2219 f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.mem[name](None if cls.resized else mem.percent[name])}{gmv}{str(mem.percent[name])+"%":>4}') 2220 cy += 2 if not cls.graph_height else cls.graph_height + 1 2221 else: 2222 out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.mem[name](None if cls.resized else mem.percent[name])}{mem.string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}' 2223 cy += 1 if not cls.graph_height else cls.graph_height 2224 #* Swap 2225 if cls.swap_on and CONFIG.show_swap and not CONFIG.swap_disk and mem.swap_string: 2226 if h - cy > 5: 2227 if cls.graph_height > 0: out += f'{Mv.to(y+cy, x+cx)}{gli}' 2228 cy += 1 2229 2230 out += f'{Mv.to(y+cy, x+cx)}{THEME.title}{Fx.b}Swap:{mem.swap_string["total"]:>{cls.mem_width - 8}}{Fx.ub}{THEME.main_fg}' 2231 cy += 1 2232 for name in cls.swap_names: 2233 if cy > h - 1: break 2234 if Collector.collect_interrupt: return 2235 if cls.mem_size > 2: 2236 out += (f'{Mv.to(y+cy, x+cx)}{gli}{name.capitalize()[:None if big_mem else 5]+":":<{1 if big_mem else 6.6}}{Mv.to(y+cy, x+cx + cls.mem_width - 3 - (len(mem.swap_string[name])))}{Fx.trans(mem.swap_string[name])}' 2237 f'{Mv.to(y+cy+1, x+cx)}{gbg}{Meters.swap[name](None if cls.resized else mem.swap_percent[name])}{gmv}{str(mem.swap_percent[name])+"%":>4}') 2238 cy += 2 if not cls.graph_height else cls.graph_height + 1 2239 else: 2240 out += f'{Mv.to(y+cy, x+cx)}{name.capitalize():{5.5 if cls.mem_size > 1 else 1.1}} {gbg}{Meters.swap[name](None if cls.resized else mem.swap_percent[name])}{mem.swap_string[name][:None if cls.mem_size > 1 else -2]:>{9 if cls.mem_size > 1 else 7}}'; cy += 1 if not cls.graph_height else cls.graph_height 2241 2242 if cls.graph_height > 0 and not cy == h: out += f'{Mv.to(y+cy, x+cx)}{gli}' 2243 2244 #* Disks 2245 if CONFIG.show_disks and mem.disks: 2246 cx = x + cls.mem_width - 1; cy = 0 2247 big_disk: bool = cls.disks_width >= 25 2248 gli = f'{Mv.l(2)}{THEME.div_line}{Symbol.title_right}{Symbol.h_line * cls.disks_width}{THEME.mem_box}{Symbol.title_left}{Mv.l(cls.disks_width - 1)}' 2249 if CONFIG.io_mode: 2250 for name in cls.disks_io_order: 2251 item = mem.disks[name] 2252 io_item = mem.disks_io_dict.get(name, {}) 2253 if Collector.collect_interrupt: return 2254 if cy > h - 1: break 2255 out += Fx.trans(f'{Mv.to(y+cy, x+cx)}{gli}{THEME.title}{Fx.b}{item["name"]:{cls.disks_width - 2}.12}{Mv.to(y+cy, x + cx + cls.disks_width - 11)}{item["total"][:None if big_disk else -2]:>9}') 2256 if big_disk: 2257 out += Fx.trans(f'{Mv.to(y+cy, x + cx + (cls.disks_width // 2) - (len(str(item["used_percent"])) // 2) - 2)}{Fx.ub}{THEME.main_fg}{item["used_percent"]}%') 2258 cy += 1 2259 2260 if io_item: 2261 if cy > h - 1: break 2262 if CONFIG.io_graph_combined: 2263 if cls.disks_io_h <= 1: 2264 out += f'{Mv.to(y+cy, x+cx-1)}{" " * 5}' 2265 out += (f'{Mv.to(y+cy, x+cx-1)}{Fx.ub}{Graphs.disk_io[name]["rw"](None if cls.redraw else mem.disks_io_dict[name]["rw"][-1])}' 2266 f'{Mv.to(y+cy, x+cx-1)}{THEME.main_fg}{item["io"] or "RW"}') 2267 cy += cls.disks_io_h 2268 else: 2269 if cls.disks_io_h <= 3: 2270 out += f'{Mv.to(y+cy, x+cx-1)}{" " * 5}{Mv.to(y+cy+1, x+cx-1)}{" " * 5}' 2271 out += (f'{Mv.to(y+cy, x+cx-1)}{Fx.ub}{Graphs.disk_io[name]["read"](None if cls.redraw else mem.disks_io_dict[name]["read"][-1])}' 2272 f'{Mv.to(y+cy, x+cx-1)}{THEME.main_fg}{item["io_r"] or "R"}') 2273 cy += cls.disks_io_h // 2 2274 out += f'{Mv.to(y+cy, x+cx-1)}{Graphs.disk_io[name]["write"](None if cls.redraw else mem.disks_io_dict[name]["write"][-1])}' 2275 cy += cls.disks_io_h // 2 2276 out += f'{Mv.to(y+cy-1, x+cx-1)}{THEME.main_fg}{item["io_w"] or "W"}' 2277 else: 2278 for name, item in mem.disks.items(): 2279 if Collector.collect_interrupt: return 2280 if not name in Meters.disks_used: 2281 continue 2282 if cy > h - 1: break 2283 out += Fx.trans(f'{Mv.to(y+cy, x+cx)}{gli}{THEME.title}{Fx.b}{item["name"]:{cls.disks_width - 2}.12}{Mv.to(y+cy, x + cx + cls.disks_width - 11)}{item["total"][:None if big_disk else -2]:>9}') 2284 if big_disk: 2285 out += f'{Mv.to(y+cy, x + cx + (cls.disks_width // 2) - (len(item["io"]) // 2) - 2)}{Fx.ub}{THEME.main_fg}{Fx.trans(item["io"])}' 2286 cy += 1 2287 if cy > h - 1: break 2288 if CONFIG.show_io_stat and name in Graphs.disk_io: 2289 out += f'{Mv.to(y+cy, x+cx-1)}{THEME.main_fg}{Fx.ub}{" IO: " if big_disk else " IO " + Mv.l(2)}{Fx.ub}{Graphs.disk_io[name]["rw"](None if cls.redraw else mem.disks_io_dict[name]["rw"][-1])}' 2290 if not big_disk and item["io"]: 2291 out += f'{Mv.to(y+cy, x+cx-1)}{Fx.ub}{THEME.main_fg}{item["io"]}' 2292 cy += 1 2293 if cy > h - 1: break 2294 out += Mv.to(y+cy, x+cx) + (f'Used:{str(item["used_percent"]) + "%":>4} ' if big_disk else "U ") 2295 out += f'{Meters.disks_used[name](None if cls.resized else mem.disks[name]["used_percent"])}{item["used"][:None if big_disk else -2]:>{9 if big_disk else 7}}' 2296 cy += 1 2297 2298 if len(mem.disks) * 3 + (len(mem.disks_io_dict) if CONFIG.show_io_stat else 0) <= h + 1: 2299 if cy > h - 1: break 2300 out += Mv.to(y+cy, x+cx) 2301 out += f'Free:{str(item["free_percent"]) + "%":>4} ' if big_disk else f'{"F "}' 2302 out += f'{Meters.disks_free[name](None if cls.resized else mem.disks[name]["free_percent"])}{item["free"][:None if big_disk else -2]:>{9 if big_disk else 7}}' 2303 cy += 1 2304 if len(mem.disks) * 4 + (len(mem.disks_io_dict) if CONFIG.show_io_stat else 0) <= h + 1: cy += 1 2305 except (KeyError, TypeError): 2306 return 2307 Draw.buffer(cls.buffer, f'{out_misc}{out}{Term.fg}', only_save=Menu.active) 2308 cls.resized = cls.redraw = False 2309 2310class NetBox(Box, SubBox): 2311 name = "net" 2312 num = 3 2313 height_p = 30 2314 width_p = 45 2315 min_w: int = 36 2316 min_h: int = 6 2317 x = 1 2318 y = 1 2319 resized: bool = True 2320 redraw: bool = True 2321 graph_height: Dict[str, int] = {} 2322 symbols: Dict[str, str] = {"download" : "▼", "upload" : "▲"} 2323 buffer: str = "net" 2324 2325 Box.buffers.append(buffer) 2326 2327 @classmethod 2328 def _calc_size(cls): 2329 if not "net" in cls.boxes: 2330 cls.width = Term.width 2331 return 2332 if not "proc" in cls.boxes: 2333 width_p = 100 2334 else: 2335 width_p = cls.width_p 2336 2337 cls.width = round(Term.width * width_p / 100) 2338 cls.height = Term.height - Box._b_cpu_h - Box._b_mem_h 2339 cls.y = Term.height - cls.height + 1 2340 cls.box_width = 27 if cls.width > 45 else 19 2341 cls.box_height = 9 if cls.height > 10 else cls.height - 2 2342 cls.box_x = cls.width - cls.box_width - 1 2343 cls.box_y = cls.y + ((cls.height - 2) // 2) - cls.box_height // 2 + 1 2344 cls.graph_height["download"] = round((cls.height - 2) / 2) 2345 cls.graph_height["upload"] = cls.height - 2 - cls.graph_height["download"] 2346 cls.redraw = True 2347 2348 @classmethod 2349 def _draw_bg(cls) -> str: 2350 if not "net" in cls.boxes: return "" 2351 return f'{create_box(box=cls, line_color=THEME.net_box)}\ 2352 {create_box(x=cls.box_x, y=cls.box_y, width=cls.box_width, height=cls.box_height, line_color=THEME.div_line, fill=False, title="Download", title2="Upload")}' 2353 2354 @classmethod 2355 def _draw_fg(cls): 2356 if not "net" in cls.boxes: return 2357 net = NetCollector 2358 if net.redraw: cls.redraw = True 2359 if not net.nic: return 2360 out: str = "" 2361 out_misc: str = "" 2362 x, y, w, h = cls.x + 1, cls.y + 1, cls.width - 2, cls.height - 2 2363 bx, by, bw, bh = cls.box_x + 1, cls.box_y + 1, cls.box_width - 2, cls.box_height - 2 2364 reset: bool = bool(net.stats[net.nic]["download"]["offset"]) 2365 2366 if cls.resized or cls.redraw: 2367 out_misc += cls._draw_bg() 2368 Key.mouse["b"] = [[x+w - len(net.nic[:10]) - 9 + i, y-1] for i in range(4)] 2369 Key.mouse["n"] = [[x+w - 5 + i, y-1] for i in range(4)] 2370 Key.mouse["z"] = [[x+w - len(net.nic[:10]) - 14 + i, y-1] for i in range(4)] 2371 2372 2373 out_misc += (f'{Mv.to(y-1, x+w - 25)}{THEME.net_box}{Symbol.h_line * (10 - len(net.nic[:10]))}{Symbol.title_left}{Fx.b if reset else ""}{THEME.hi_fg("z")}{THEME.title("ero")}' 2374 f'{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}' 2375 f'{THEME.net_box}{Symbol.title_left}{Fx.b}{THEME.hi_fg("<b")} {THEME.title(net.nic[:10])} {THEME.hi_fg("n>")}{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}') 2376 if w - len(net.nic[:10]) - 20 > 6: 2377 Key.mouse["a"] = [[x+w - 20 - len(net.nic[:10]) + i, y-1] for i in range(4)] 2378 out_misc += (f'{Mv.to(y-1, x+w - 21 - len(net.nic[:10]))}{THEME.net_box(Symbol.title_left)}{Fx.b if net.auto_min else ""}{THEME.hi_fg("a")}{THEME.title("uto")}' 2379 f'{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}') 2380 if w - len(net.nic[:10]) - 20 > 13: 2381 Key.mouse["y"] = [[x+w - 26 - len(net.nic[:10]) + i, y-1] for i in range(4)] 2382 out_misc += (f'{Mv.to(y-1, x+w - 27 - len(net.nic[:10]))}{THEME.net_box(Symbol.title_left)}{Fx.b if CONFIG.net_sync else ""}{THEME.title("s")}{THEME.hi_fg("y")}{THEME.title("nc")}' 2383 f'{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}') 2384 if net.address and w - len(net.nic[:10]) - len(net.address) - 20 > 15: 2385 out_misc += (f'{Mv.to(y-1, x+7)}{THEME.net_box(Symbol.title_left)}{Fx.b}{THEME.title(net.address)}{Fx.ub}{THEME.net_box(Symbol.title_right)}{Term.fg}') 2386 Draw.buffer("net_misc", out_misc, only_save=True) 2387 2388 cy = 0 2389 for direction in ["download", "upload"]: 2390 strings = net.strings[net.nic][direction] 2391 stats = net.stats[net.nic][direction] 2392 if cls.redraw: stats["redraw"] = True 2393 if stats["redraw"] or cls.resized: 2394 Graphs.net[direction] = Graph(w - bw - 3, cls.graph_height[direction], THEME.gradient[direction], stats["speed"], max_value=net.sync_top if CONFIG.net_sync else stats["graph_top"], 2395 invert=direction != "download", color_max_value=net.net_min.get(direction) if CONFIG.net_color_fixed else None, round_up_low=True) 2396 out += f'{Mv.to(y if direction == "download" else y + cls.graph_height["download"], x)}{Graphs.net[direction](None if stats["redraw"] else stats["speed"][-1])}' 2397 2398 out += (f'{Mv.to(by+cy, bx)}{THEME.main_fg}{cls.symbols[direction]} {strings["byte_ps"]:<10.10}' + 2399 ("" if bw < 20 else f'{Mv.to(by+cy, bx+bw - 12)}{"(" + strings["bit_ps"] + ")":>12.12}')) 2400 cy += 1 if bh != 3 else 2 2401 if bh >= 6: 2402 out += f'{Mv.to(by+cy, bx)}{cls.symbols[direction]} {"Top:"}{Mv.to(by+cy, bx+bw - 12)}{"(" + strings["top"] + ")":>12.12}' 2403 cy += 1 2404 if bh >= 4: 2405 out += f'{Mv.to(by+cy, bx)}{cls.symbols[direction]} {"Total:"}{Mv.to(by+cy, bx+bw - 10)}{strings["total"]:>10.10}' 2406 if bh > 2 and bh % 2: cy += 2 2407 else: cy += 1 2408 stats["redraw"] = False 2409 2410 out += (f'{Mv.to(y, x)}{THEME.graph_text(net.sync_string if CONFIG.net_sync else net.strings[net.nic]["download"]["graph_top"])}' 2411 f'{Mv.to(y+h-1, x)}{THEME.graph_text(net.sync_string if CONFIG.net_sync else net.strings[net.nic]["upload"]["graph_top"])}') 2412 2413 Draw.buffer(cls.buffer, f'{out_misc}{out}{Term.fg}', only_save=Menu.active) 2414 cls.redraw = cls.resized = False 2415 2416class ProcBox(Box): 2417 name = "proc" 2418 num = 4 2419 height_p = 68 2420 width_p = 55 2421 min_w: int = 44 2422 min_h: int = 16 2423 x = 1 2424 y = 1 2425 current_y: int = 0 2426 current_h: int = 0 2427 select_max: int = 0 2428 selected: int = 0 2429 selected_pid: int = 0 2430 last_selection: int = 0 2431 filtering: bool = False 2432 moved: bool = False 2433 start: int = 1 2434 count: int = 0 2435 s_len: int = 0 2436 detailed: bool = False 2437 detailed_x: int = 0 2438 detailed_y: int = 0 2439 detailed_width: int = 0 2440 detailed_height: int = 8 2441 resized: bool = True 2442 redraw: bool = True 2443 buffer: str = "proc" 2444 pid_counter: Dict[int, int] = {} 2445 Box.buffers.append(buffer) 2446 2447 @classmethod 2448 def _calc_size(cls): 2449 if not "proc" in cls.boxes: 2450 cls.width = Term.width 2451 return 2452 width_p: int; height_p: int 2453 if not "net" in cls.boxes and not "mem" in cls.boxes: 2454 width_p = 100 2455 else: 2456 width_p = cls.width_p 2457 2458 if not "cpu" in cls.boxes: 2459 height_p = 100 2460 else: 2461 height_p = cls.height_p 2462 2463 cls.width = round(Term.width * width_p / 100) 2464 cls.height = round(Term.height * height_p / 100) 2465 if cls.height + Box._b_cpu_h > Term.height: cls.height = Term.height - Box._b_cpu_h 2466 cls.x = Term.width - cls.width + 1 2467 cls.y = Box._b_cpu_h + 1 2468 cls.current_y = cls.y 2469 cls.current_h = cls.height 2470 cls.select_max = cls.height - 3 2471 cls.redraw = True 2472 cls.resized = True 2473 2474 @classmethod 2475 def _draw_bg(cls) -> str: 2476 if not "proc" in cls.boxes: return "" 2477 return create_box(box=cls, line_color=THEME.proc_box) 2478 2479 @classmethod 2480 def selector(cls, key: str, mouse_pos: Tuple[int, int] = (0, 0)): 2481 old: Tuple[int, int] = (cls.start, cls.selected) 2482 new_sel: int 2483 if key in ["up", "k"]: 2484 if cls.selected == 1 and cls.start > 1: 2485 cls.start -= 1 2486 elif cls.selected == 1: 2487 cls.selected = 0 2488 elif cls.selected > 1: 2489 cls.selected -= 1 2490 elif key in ["down", "j"]: 2491 if cls.selected == 0 and ProcCollector.detailed and cls.last_selection: 2492 cls.selected = cls.last_selection 2493 cls.last_selection = 0 2494 if cls.selected == cls.select_max and cls.start < ProcCollector.num_procs - cls.select_max + 1: 2495 cls.start += 1 2496 elif cls.selected < cls.select_max: 2497 cls.selected += 1 2498 elif key == "mouse_scroll_up" and cls.start > 1: 2499 cls.start -= 5 2500 elif key == "mouse_scroll_down" and cls.start < ProcCollector.num_procs - cls.select_max + 1: 2501 cls.start += 5 2502 elif key == "page_up" and cls.start > 1: 2503 cls.start -= cls.select_max 2504 elif key == "page_down" and cls.start < ProcCollector.num_procs - cls.select_max + 1: 2505 cls.start += cls.select_max 2506 elif key == "home": 2507 if cls.start > 1: cls.start = 1 2508 elif cls.selected > 0: cls.selected = 0 2509 elif key == "end": 2510 if cls.start < ProcCollector.num_procs - cls.select_max + 1: cls.start = ProcCollector.num_procs - cls.select_max + 1 2511 elif cls.selected < cls.select_max: cls.selected = cls.select_max 2512 elif key == "mouse_click": 2513 if mouse_pos[0] > cls.x + cls.width - 4 and cls.current_y + 1 < mouse_pos[1] < cls.current_y + 1 + cls.select_max + 1: 2514 if mouse_pos[1] == cls.current_y + 2: 2515 cls.start = 1 2516 elif mouse_pos[1] == cls.current_y + 1 + cls.select_max: 2517 cls.start = ProcCollector.num_procs - cls.select_max + 1 2518 else: 2519 cls.start = round((mouse_pos[1] - cls.current_y) * ((ProcCollector.num_procs - cls.select_max - 2) / (cls.select_max - 2))) 2520 else: 2521 new_sel = mouse_pos[1] - cls.current_y - 1 if mouse_pos[1] >= cls.current_y - 1 else 0 2522 if new_sel > 0 and new_sel == cls.selected: 2523 Key.list.insert(0, "enter") 2524 return 2525 elif new_sel > 0 and new_sel != cls.selected: 2526 if cls.last_selection: cls.last_selection = 0 2527 cls.selected = new_sel 2528 elif key == "mouse_unselect": 2529 cls.selected = 0 2530 2531 if cls.start > ProcCollector.num_procs - cls.select_max + 1 and ProcCollector.num_procs > cls.select_max: cls.start = ProcCollector.num_procs - cls.select_max + 1 2532 elif cls.start > ProcCollector.num_procs: cls.start = ProcCollector.num_procs 2533 if cls.start < 1: cls.start = 1 2534 if cls.selected > ProcCollector.num_procs and ProcCollector.num_procs < cls.select_max: cls.selected = ProcCollector.num_procs 2535 elif cls.selected > cls.select_max: cls.selected = cls.select_max 2536 if cls.selected < 0: cls.selected = 0 2537 2538 if old != (cls.start, cls.selected): 2539 cls.moved = True 2540 Collector.collect(ProcCollector, proc_interrupt=True, redraw=True, only_draw=True) 2541 2542 2543 @classmethod 2544 def _draw_fg(cls): 2545 if not "proc" in cls.boxes: return 2546 proc = ProcCollector 2547 if proc.proc_interrupt: return 2548 if proc.redraw: cls.redraw = True 2549 out: str = "" 2550 out_misc: str = "" 2551 n: int = 0 2552 x, y, w, h = cls.x + 1, cls.current_y + 1, cls.width - 2, cls.current_h - 2 2553 prog_len: int; arg_len: int; val: int; c_color: str; m_color: str; t_color: str; sort_pos: int; tree_len: int; is_selected: bool; calc: int 2554 dgx: int; dgw: int; dx: int; dw: int; dy: int 2555 l_count: int = 0 2556 scroll_pos: int = 0 2557 killed: bool = True 2558 indent: str = "" 2559 offset: int = 0 2560 tr_show: bool = True 2561 usr_show: bool = True 2562 vals: List[str] 2563 g_color: str = "" 2564 s_len: int = 0 2565 if proc.search_filter: s_len = len(proc.search_filter[:10]) 2566 loc_string: str = f'{cls.start + cls.selected - 1}/{proc.num_procs}' 2567 end: str = "" 2568 2569 if proc.detailed: 2570 dgx, dgw = x, w // 3 2571 dw = w - dgw - 1 2572 if dw > 120: 2573 dw = 120 2574 dgw = w - 121 2575 dx = x + dgw + 2 2576 dy = cls.y + 1 2577 2578 if w > 67: 2579 arg_len = w - 53 - (1 if proc.num_procs > cls.select_max else 0) 2580 prog_len = 15 2581 else: 2582 arg_len = 0 2583 prog_len = w - 38 - (1 if proc.num_procs > cls.select_max else 0) 2584 if prog_len < 15: 2585 tr_show = False 2586 prog_len += 5 2587 if prog_len < 12: 2588 usr_show = False 2589 prog_len += 9 2590 2591 if CONFIG.proc_tree: 2592 tree_len = arg_len + prog_len + 6 2593 arg_len = 0 2594 2595 #* Buttons and titles only redrawn if needed 2596 if cls.resized or cls.redraw: 2597 s_len += len(CONFIG.proc_sorting) 2598 if cls.resized or s_len != cls.s_len or proc.detailed: 2599 cls.s_len = s_len 2600 for k in ["e", "r", "c", "T", "K", "I", "enter", "left", " ", "f", "delete"]: 2601 if k in Key.mouse: del Key.mouse[k] 2602 if proc.detailed: 2603 killed = proc.details.get("killed", False) 2604 main = THEME.main_fg if cls.selected == 0 and not killed else THEME.inactive_fg 2605 hi = THEME.hi_fg if cls.selected == 0 and not killed else THEME.inactive_fg 2606 title = THEME.title if cls.selected == 0 and not killed else THEME.inactive_fg 2607 if cls.current_y != cls.y + 8 or cls.resized or Graphs.detailed_cpu is NotImplemented: 2608 cls.current_y = cls.y + 8 2609 cls.current_h = cls.height - 8 2610 for i in range(7): out_misc += f'{Mv.to(dy+i, x)}{" " * w}' 2611 out_misc += (f'{Mv.to(dy+7, x-1)}{THEME.proc_box}{Symbol.title_right}{Symbol.h_line*w}{Symbol.title_left}' 2612 f'{Mv.to(dy+7, x+1)}{THEME.proc_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg(SUPERSCRIPT[cls.num])}{THEME.title(cls.name)}{Fx.ub}{THEME.proc_box(Symbol.title_right)}{THEME.div_line}') 2613 for i in range(7): 2614 out_misc += f'{Mv.to(dy + i, dgx + dgw + 1)}{Symbol.v_line}' 2615 2616 out_misc += (f'{Mv.to(dy-1, x-1)}{THEME.proc_box}{Symbol.left_up}{Symbol.h_line*w}{Symbol.right_up}' 2617 f'{Mv.to(dy-1, dgx + dgw + 1)}{Symbol.div_up}' 2618 f'{Mv.to(dy-1, x+1)}{THEME.proc_box(Symbol.title_left)}{Fx.b}{THEME.title(str(proc.details["pid"]))}{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2619 f'{THEME.proc_box(Symbol.title_left)}{Fx.b}{THEME.title(proc.details["name"][:(dgw - 11)])}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') 2620 2621 if cls.selected == 0: 2622 Key.mouse["enter"] = [[dx+dw-10 + i, dy-1] for i in range(7)] 2623 if cls.selected == 0 and not killed: 2624 Key.mouse["T"] = [[dx+2 + i, dy-1] for i in range(9)] 2625 2626 out_misc += (f'{Mv.to(dy-1, dx+dw - 11)}{THEME.proc_box(Symbol.title_left)}{Fx.b}{title if cls.selected > 0 else THEME.title}close{Fx.ub} {main if cls.selected > 0 else THEME.main_fg}{Symbol.enter}{THEME.proc_box(Symbol.title_right)}' 2627 f'{Mv.to(dy-1, dx+1)}{THEME.proc_box(Symbol.title_left)}{Fx.b}{hi}T{title}erminate{Fx.ub}{THEME.proc_box(Symbol.title_right)}') 2628 if dw > 28: 2629 if cls.selected == 0 and not killed and not "K" in Key.mouse: Key.mouse["K"] = [[dx + 13 + i, dy-1] for i in range(4)] 2630 out_misc += f'{THEME.proc_box(Symbol.title_left)}{Fx.b}{hi}K{title}ill{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2631 if dw > 39: 2632 if cls.selected == 0 and not killed and not "I" in Key.mouse: Key.mouse["I"] = [[dx + 19 + i, dy-1] for i in range(9)] 2633 out_misc += f'{THEME.proc_box(Symbol.title_left)}{Fx.b}{hi}I{title}nterrupt{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2634 2635 if Graphs.detailed_cpu is NotImplemented or cls.resized: 2636 Graphs.detailed_cpu = Graph(dgw+1, 7, THEME.gradient["cpu"], proc.details_cpu) 2637 Graphs.detailed_mem = Graph(dw // 3, 1, None, proc.details_mem) 2638 2639 cls.select_max = cls.height - 11 2640 y = cls.y + 9 2641 h = cls.height - 10 2642 2643 else: 2644 if cls.current_y != cls.y or cls.resized: 2645 cls.current_y = cls.y 2646 cls.current_h = cls.height 2647 y, h = cls.y + 1, cls.height - 2 2648 out_misc += (f'{Mv.to(y-1, x-1)}{THEME.proc_box}{Symbol.left_up}{Symbol.h_line*w}{Symbol.right_up}' 2649 f'{Mv.to(y-1, x+1)}{THEME.proc_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg(SUPERSCRIPT[cls.num])}{THEME.title(cls.name)}{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2650 f'{Mv.to(y+7, x-1)}{THEME.proc_box(Symbol.v_line)}{Mv.r(w)}{THEME.proc_box(Symbol.v_line)}') 2651 cls.select_max = cls.height - 3 2652 2653 2654 sort_pos = x + w - len(CONFIG.proc_sorting) - 7 2655 if not "left" in Key.mouse: 2656 Key.mouse["left"] = [[sort_pos + i, y-1] for i in range(3)] 2657 Key.mouse["right"] = [[sort_pos + len(CONFIG.proc_sorting) + 3 + i, y-1] for i in range(3)] 2658 2659 2660 out_misc += (f'{Mv.to(y-1, x + 8)}{THEME.proc_box(Symbol.h_line * (w - 9))}' + 2661 ("" if not proc.detailed else f"{Mv.to(dy+7, dgx + dgw + 1)}{THEME.proc_box(Symbol.div_down)}") + 2662 f'{Mv.to(y-1, sort_pos)}{THEME.proc_box(Symbol.title_left)}{Fx.b}{THEME.hi_fg("<")} {THEME.title(CONFIG.proc_sorting)} ' 2663 f'{THEME.hi_fg(">")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') 2664 2665 2666 if w > 29 + s_len: 2667 if not "e" in Key.mouse: Key.mouse["e"] = [[sort_pos - 5 + i, y-1] for i in range(4)] 2668 out_misc += (f'{Mv.to(y-1, sort_pos - 6)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_tree else ""}' 2669 f'{THEME.title("tre")}{THEME.hi_fg("e")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') 2670 if w > 37 + s_len: 2671 if not "r" in Key.mouse: Key.mouse["r"] = [[sort_pos - 14 + i, y-1] for i in range(7)] 2672 out_misc += (f'{Mv.to(y-1, sort_pos - 15)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_reversed else ""}' 2673 f'{THEME.hi_fg("r")}{THEME.title("everse")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') 2674 if w > 47 + s_len: 2675 if not "c" in Key.mouse: Key.mouse["c"] = [[sort_pos - 24 + i, y-1] for i in range(8)] 2676 out_misc += (f'{Mv.to(y-1, sort_pos - 25)}{THEME.proc_box(Symbol.title_left)}{Fx.b if CONFIG.proc_per_core else ""}' 2677 f'{THEME.title("per-")}{THEME.hi_fg("c")}{THEME.title("ore")}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') 2678 2679 if not "f" in Key.mouse or cls.resized: Key.mouse["f"] = [[x+6 + i, y-1] for i in range(6 if not proc.search_filter else 2 + len(proc.search_filter[-10:]))] 2680 if proc.search_filter: 2681 if not "delete" in Key.mouse: Key.mouse["delete"] = [[x+12 + len(proc.search_filter[-10:]) + i, y-1] for i in range(3)] 2682 elif "delete" in Key.mouse: 2683 del Key.mouse["delete"] 2684 out_misc += (f'{Mv.to(y-1, x + 8)}{THEME.proc_box(Symbol.title_left)}{Fx.b if cls.filtering or proc.search_filter else ""}{THEME.hi_fg("F" if cls.filtering and proc.case_sensitive else "f")}{THEME.title}' + 2685 ("ilter" if not proc.search_filter and not cls.filtering else f' {proc.search_filter[-(10 if w < 83 else w - 74):]}{(Fx.bl + "█" + Fx.ubl) if cls.filtering else THEME.hi_fg(" del")}') + 2686 f'{THEME.proc_box(Symbol.title_right)}') 2687 2688 main = THEME.inactive_fg if cls.selected == 0 else THEME.main_fg 2689 hi = THEME.inactive_fg if cls.selected == 0 else THEME.hi_fg 2690 title = THEME.inactive_fg if cls.selected == 0 else THEME.title 2691 out_misc += (f'{Mv.to(y+h, x + 1)}{THEME.proc_box}{Symbol.h_line*(w-4)}' 2692 f'{Mv.to(y+h, x+1)}{THEME.proc_box(Symbol.title_left)}{main}{Symbol.up} {Fx.b}{THEME.main_fg("select")} {Fx.ub}' 2693 f'{THEME.inactive_fg if cls.selected == cls.select_max else THEME.main_fg}{Symbol.down}{THEME.proc_box(Symbol.title_right)}' 2694 f'{THEME.proc_box(Symbol.title_left)}{title}{Fx.b}info {Fx.ub}{main}{Symbol.enter}{THEME.proc_box(Symbol.title_right)}') 2695 if not "enter" in Key.mouse: Key.mouse["enter"] = [[x + 14 + i, y+h] for i in range(6)] 2696 if w - len(loc_string) > 34: 2697 if not "T" in Key.mouse: Key.mouse["T"] = [[x + 22 + i, y+h] for i in range(9)] 2698 out_misc += f'{THEME.proc_box(Symbol.title_left)}{Fx.b}{hi}T{title}erminate{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2699 if w - len(loc_string) > 40: 2700 if not "K" in Key.mouse: Key.mouse["K"] = [[x + 33 + i, y+h] for i in range(4)] 2701 out_misc += f'{THEME.proc_box(Symbol.title_left)}{Fx.b}{hi}K{title}ill{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2702 if w - len(loc_string) > 51: 2703 if not "I" in Key.mouse: Key.mouse["I"] = [[x + 39 + i, y+h] for i in range(9)] 2704 out_misc += f'{THEME.proc_box(Symbol.title_left)}{Fx.b}{hi}I{title}nterrupt{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2705 if CONFIG.proc_tree and w - len(loc_string) > 65: 2706 if not " " in Key.mouse: Key.mouse[" "] = [[x + 50 + i, y+h] for i in range(12)] 2707 out_misc += f'{THEME.proc_box(Symbol.title_left)}{Fx.b}{hi}spc {title}collapse{Fx.ub}{THEME.proc_box(Symbol.title_right)}' 2708 2709 #* Processes labels 2710 selected: str = CONFIG.proc_sorting 2711 label: str 2712 if selected == "memory": selected = "mem" 2713 if selected == "threads" and not CONFIG.proc_tree and not arg_len: selected = "tr" 2714 if CONFIG.proc_tree: 2715 label = (f'{THEME.title}{Fx.b}{Mv.to(y, x)}{" Tree:":<{tree_len-2}}' + (f'{"Threads: ":<9}' if tr_show else " "*4) + (f'{"User:":<9}' if usr_show else "") + f'Mem%{"Cpu%":>11}{Fx.ub}{THEME.main_fg} ' + 2716 (" " if proc.num_procs > cls.select_max else "")) 2717 if selected in ["pid", "program", "arguments"]: selected = "tree" 2718 else: 2719 label = (f'{THEME.title}{Fx.b}{Mv.to(y, x)}{"Pid:":>7} {"Program:" if prog_len > 8 else "Prg:":<{prog_len}}' + (f'{"Arguments:":<{arg_len-4}}' if arg_len else "") + 2720 ((f'{"Threads:":<9}' if arg_len else f'{"Tr:":^5}') if tr_show else "") + (f'{"User:":<9}' if usr_show else "") + f'Mem%{"Cpu%":>11}{Fx.ub}{THEME.main_fg} ' + 2721 (" " if proc.num_procs > cls.select_max else "")) 2722 if selected == "program" and prog_len <= 8: selected = "prg" 2723 selected = selected.split(" ")[0].capitalize() 2724 if CONFIG.proc_mem_bytes: label = label.replace("Mem%", "MemB") 2725 label = label.replace(selected, f'{Fx.u}{selected}{Fx.uu}') 2726 out_misc += label 2727 2728 Draw.buffer("proc_misc", out_misc, only_save=True) 2729 2730 #* Detailed box draw 2731 if proc.detailed: 2732 if proc.details["status"] == psutil.STATUS_RUNNING: stat_color = Fx.b 2733 elif proc.details["status"] in [psutil.STATUS_DEAD, psutil.STATUS_STOPPED, psutil.STATUS_ZOMBIE]: stat_color = f'{THEME.inactive_fg}' 2734 else: stat_color = "" 2735 expand = proc.expand 2736 iw = (dw - 3) // (4 + expand) 2737 iw2 = iw - 1 2738 out += (f'{Mv.to(dy, dgx)}{Graphs.detailed_cpu(None if cls.moved or proc.details["killed"] else proc.details_cpu[-1])}' 2739 f'{Mv.to(dy, dgx)}{THEME.title}{Fx.b}{0 if proc.details["killed"] else proc.details["cpu_percent"]}%{Mv.r(1)}{"" if SYSTEM == "MacOS" else (("C" if dgw < 20 else "Core") + str(proc.details["cpu_num"]))}') 2740 for i, l in enumerate(["C", "P", "U"]): 2741 out += f'{Mv.to(dy+2+i, dgx)}{l}' 2742 for i, l in enumerate(["C", "M", "D"]): 2743 out += f'{Mv.to(dy+4+i, dx+1)}{l}' 2744 out += (f'{Mv.to(dy, dx+1)} {"Status:":^{iw}.{iw2}}{"Elapsed:":^{iw}.{iw2}}' + 2745 (f'{"Parent:":^{iw}.{iw2}}' if dw > 28 else "") + (f'{"User:":^{iw}.{iw2}}' if dw > 38 else "") + 2746 (f'{"Threads:":^{iw}.{iw2}}' if expand > 0 else "") + (f'{"Nice:":^{iw}.{iw2}}' if expand > 1 else "") + 2747 (f'{"IO Read:":^{iw}.{iw2}}' if expand > 2 else "") + (f'{"IO Write:":^{iw}.{iw2}}' if expand > 3 else "") + 2748 (f'{"TTY:":^{iw}.{iw2}}' if expand > 4 else "") + 2749 f'{Mv.to(dy+1, dx+1)}{Fx.ub}{THEME.main_fg}{stat_color}{proc.details["status"]:^{iw}.{iw2}}{Fx.ub}{THEME.main_fg}{proc.details["uptime"]:^{iw}.{iw2}} ' + 2750 (f'{proc.details["parent_name"]:^{iw}.{iw2}}' if dw > 28 else "") + (f'{proc.details["username"]:^{iw}.{iw2}}' if dw > 38 else "") + 2751 (f'{proc.details["threads"]:^{iw}.{iw2}}' if expand > 0 else "") + (f'{proc.details["nice"]:^{iw}.{iw2}}' if expand > 1 else "") + 2752 (f'{proc.details["io_read"]:^{iw}.{iw2}}' if expand > 2 else "") + (f'{proc.details["io_write"]:^{iw}.{iw2}}' if expand > 3 else "") + 2753 (f'{proc.details["terminal"][-(iw2):]:^{iw}.{iw2}}' if expand > 4 else "") + 2754 f'{Mv.to(dy+3, dx)}{THEME.title}{Fx.b}{("Memory: " if dw > 42 else "M:") + str(round(proc.details["memory_percent"], 1)) + "%":>{dw//3-1}}{Fx.ub} {THEME.inactive_fg}{"⡀"*(dw//3)}' 2755 f'{Mv.l(dw//3)}{THEME.proc_misc}{Graphs.detailed_mem(None if cls.moved else proc.details_mem[-1])} ' 2756 f'{THEME.title}{Fx.b}{proc.details["memory_bytes"]:.{dw//3 - 2}}{THEME.main_fg}{Fx.ub}') 2757 cy = dy + (4 if len(proc.details["cmdline"]) > dw - 5 else 5) 2758 for i in range(ceil(len(proc.details["cmdline"]) / (dw - 5))): 2759 out += f'{Mv.to(cy+i, dx + 3)}{proc.details["cmdline"][((dw-5)*i):][:(dw-5)]:{"^" if i == 0 else "<"}{dw-5}}' 2760 if i == 2: break 2761 2762 #* Checking for selection out of bounds 2763 if cls.start > proc.num_procs - cls.select_max + 1 and proc.num_procs > cls.select_max: cls.start = proc.num_procs - cls.select_max + 1 2764 elif cls.start > proc.num_procs: cls.start = proc.num_procs 2765 if cls.start < 1: cls.start = 1 2766 if cls.selected > proc.num_procs and proc.num_procs < cls.select_max: cls.selected = proc.num_procs 2767 elif cls.selected > cls.select_max: cls.selected = cls.select_max 2768 if cls.selected < 0: cls.selected = 0 2769 2770 #* Start iteration over all processes and info 2771 cy = 1 2772 for n, (pid, items) in enumerate(proc.processes.items(), start=1): 2773 if n < cls.start: continue 2774 l_count += 1 2775 if l_count == cls.selected: 2776 is_selected = True 2777 cls.selected_pid = pid 2778 else: is_selected = False 2779 2780 indent, name, cmd, threads, username, mem, mem_b, cpu = [items.get(v, d) for v, d in [("indent", ""), ("name", ""), ("cmd", ""), ("threads", 0), ("username", "?"), ("mem", 0.0), ("mem_b", 0), ("cpu", 0.0)]] 2781 2782 if CONFIG.proc_tree: 2783 arg_len = 0 2784 offset = tree_len - len(f'{indent}{pid}') 2785 if offset < 1: offset = 0 2786 indent = f'{indent:.{tree_len - len(str(pid))}}' 2787 if offset - len(name) > 12: 2788 cmd = cmd.split(" ")[0].split("/")[-1] 2789 if not cmd.startswith(name): 2790 offset = len(name) 2791 arg_len = tree_len - len(f'{indent}{pid} {name} ') + 2 2792 cmd = f'({cmd[:(arg_len-4)]})' 2793 else: 2794 offset = prog_len - 1 2795 if cpu > 1.0 or pid in Graphs.pid_cpu: 2796 if pid not in Graphs.pid_cpu: 2797 Graphs.pid_cpu[pid] = Graph(5, 1, None, [0]) 2798 cls.pid_counter[pid] = 0 2799 elif cpu < 1.0: 2800 cls.pid_counter[pid] += 1 2801 if cls.pid_counter[pid] > 10: 2802 del cls.pid_counter[pid], Graphs.pid_cpu[pid] 2803 else: 2804 cls.pid_counter[pid] = 0 2805 2806 end = f'{THEME.main_fg}{Fx.ub}' if CONFIG.proc_colors else Fx.ub 2807 if cls.selected > cy: calc = cls.selected - cy 2808 elif 0 < cls.selected <= cy: calc = cy - cls.selected 2809 else: calc = cy 2810 if CONFIG.proc_colors and not is_selected: 2811 vals = [] 2812 for v in [int(cpu), int(mem), int(threads // 3)]: 2813 if CONFIG.proc_gradient: 2814 val = ((v if v <= 100 else 100) + 100) - calc * 100 // cls.select_max 2815 vals += [f'{THEME.gradient["proc_color" if val < 100 else "process"][val if val < 100 else val - 100]}'] 2816 else: 2817 vals += [f'{THEME.gradient["process"][v if v <= 100 else 100]}'] 2818 c_color, m_color, t_color = vals 2819 else: 2820 c_color = m_color = t_color = Fx.b 2821 if CONFIG.proc_gradient and not is_selected: 2822 g_color = f'{THEME.gradient["proc"][calc * 100 // cls.select_max]}' 2823 if is_selected: 2824 c_color = m_color = t_color = g_color = end = "" 2825 out += f'{THEME.selected_bg}{THEME.selected_fg}{Fx.b}' 2826 2827 #* Creates one line for a process with all gathered information 2828 out += (f'{Mv.to(y+cy, x)}{g_color}{indent}{pid:>{(1 if CONFIG.proc_tree else 7)}} ' + 2829 f'{c_color}{name:<{offset}.{offset}} {end}' + 2830 (f'{g_color}{cmd:<{arg_len}.{arg_len-1}}' if arg_len else "") + 2831 (t_color + (f'{threads:>4} ' if threads < 1000 else "999> ") + end if tr_show else "") + 2832 (g_color + (f'{username:<9.9}' if len(username) < 10 else f'{username[:8]:<8}+') if usr_show else "") + 2833 m_color + ((f'{mem:>4.1f}' if mem < 100 else f'{mem:>4.0f} ') if not CONFIG.proc_mem_bytes else f'{floating_humanizer(mem_b, short=True):>4.4}') + end + 2834 f' {THEME.inactive_fg}{"⡀"*5}{THEME.main_fg}{g_color}{c_color}' + (f' {cpu:>4.1f} ' if cpu < 100 else f'{cpu:>5.0f} ') + end + 2835 (" " if proc.num_procs > cls.select_max else "")) 2836 2837 #* Draw small cpu graph for process if cpu usage was above 1% in the last 10 updates 2838 if pid in Graphs.pid_cpu: 2839 out += f'{Mv.to(y+cy, x + w - (12 if proc.num_procs > cls.select_max else 11))}{c_color if CONFIG.proc_colors else THEME.proc_misc}{Graphs.pid_cpu[pid](None if cls.moved else round(cpu))}{THEME.main_fg}' 2840 2841 if is_selected: out += f'{Fx.ub}{Term.fg}{Term.bg}{Mv.to(y+cy, x + w - 1)}{" " if proc.num_procs > cls.select_max else ""}' 2842 2843 cy += 1 2844 if cy == h: break 2845 if cy < h: 2846 for i in range(h-cy): 2847 out += f'{Mv.to(y+cy+i, x)}{" " * w}' 2848 2849 #* Draw scrollbar if needed 2850 if proc.num_procs > cls.select_max: 2851 if cls.resized: 2852 Key.mouse["mouse_scroll_up"] = [[x+w-2+i, y] for i in range(3)] 2853 Key.mouse["mouse_scroll_down"] = [[x+w-2+i, y+h-1] for i in range(3)] 2854 scroll_pos = round(cls.start * (cls.select_max - 2) / (proc.num_procs - (cls.select_max - 2))) 2855 if scroll_pos < 0 or cls.start == 1: scroll_pos = 0 2856 elif scroll_pos > h - 3 or cls.start >= proc.num_procs - cls.select_max: scroll_pos = h - 3 2857 out += (f'{Mv.to(y, x+w-1)}{Fx.b}{THEME.main_fg}↑{Mv.to(y+h-1, x+w-1)}↓{Fx.ub}' 2858 f'{Mv.to(y+1+scroll_pos, x+w-1)}█') 2859 elif "scroll_up" in Key.mouse: 2860 del Key.mouse["scroll_up"], Key.mouse["scroll_down"] 2861 2862 #* Draw current selection and number of processes 2863 out += (f'{Mv.to(y+h, x + w - 3 - len(loc_string))}{THEME.proc_box}{Symbol.title_left}{THEME.title}' 2864 f'{Fx.b}{loc_string}{Fx.ub}{THEME.proc_box(Symbol.title_right)}') 2865 2866 #* Clean up dead processes graphs and counters 2867 cls.count += 1 2868 if cls.count == 100: 2869 cls.count = 0 2870 for p in list(cls.pid_counter): 2871 if not psutil.pid_exists(p): 2872 del cls.pid_counter[p], Graphs.pid_cpu[p] 2873 2874 Draw.buffer(cls.buffer, f'{out_misc}{out}{Term.fg}', only_save=Menu.active) 2875 cls.redraw = cls.resized = cls.moved = False 2876 2877class Collector: 2878 '''Data collector master class 2879 * .start(): Starts collector thread 2880 * .stop(): Stops collector thread 2881 * .collect(*collectors: Collector, draw_now: bool = True, interrupt: bool = False): queues up collectors to run''' 2882 stopping: bool = False 2883 started: bool = False 2884 draw_now: bool = False 2885 redraw: bool = False 2886 only_draw: bool = False 2887 thread: threading.Thread 2888 collect_run = threading.Event() 2889 collect_idle = threading.Event() 2890 collect_idle.set() 2891 collect_done = threading.Event() 2892 collect_queue: List = [] 2893 collect_interrupt: bool = False 2894 proc_interrupt: bool = False 2895 use_draw_list: bool = False 2896 proc_counter: int = 1 2897 2898 @classmethod 2899 def start(cls): 2900 cls.stopping = False 2901 cls.thread = threading.Thread(target=cls._runner, args=()) 2902 cls.thread.start() 2903 cls.started = True 2904 2905 @classmethod 2906 def stop(cls): 2907 if cls.started and cls.thread.is_alive(): 2908 cls.stopping = True 2909 cls.started = False 2910 cls.collect_queue = [] 2911 cls.collect_idle.set() 2912 cls.collect_done.set() 2913 try: 2914 cls.thread.join() 2915 except: 2916 pass 2917 2918 @classmethod 2919 def _runner(cls): 2920 '''This is meant to run in it's own thread, collecting and drawing when collect_run is set''' 2921 draw_buffers: List[str] = [] 2922 debugged: bool = False 2923 try: 2924 while not cls.stopping: 2925 if CONFIG.draw_clock and CONFIG.update_ms != 1000: Box.draw_clock() 2926 cls.collect_run.wait(0.1) 2927 if not cls.collect_run.is_set(): 2928 continue 2929 draw_buffers = [] 2930 cls.collect_interrupt = False 2931 cls.collect_run.clear() 2932 cls.collect_idle.clear() 2933 cls.collect_done.clear() 2934 if DEBUG and not debugged: TimeIt.start("Collect and draw") 2935 while cls.collect_queue: 2936 collector = cls.collect_queue.pop() 2937 if not cls.only_draw: 2938 collector._collect() 2939 collector._draw() 2940 if cls.use_draw_list: draw_buffers.append(collector.buffer) 2941 if cls.collect_interrupt: break 2942 if DEBUG and not debugged: TimeIt.stop("Collect and draw"); debugged = True 2943 if cls.draw_now and not Menu.active and not cls.collect_interrupt: 2944 if cls.use_draw_list: Draw.out(*draw_buffers) 2945 else: Draw.out() 2946 if CONFIG.draw_clock and CONFIG.update_ms == 1000: Box.draw_clock() 2947 cls.collect_idle.set() 2948 cls.collect_done.set() 2949 except Exception as e: 2950 errlog.exception(f'Data collection thread failed with exception: {e}') 2951 cls.collect_idle.set() 2952 cls.collect_done.set() 2953 clean_quit(1, thread=True) 2954 2955 @classmethod 2956 def collect(cls, *collectors, draw_now: bool = True, interrupt: bool = False, proc_interrupt: bool = False, redraw: bool = False, only_draw: bool = False): 2957 '''Setup collect queue for _runner''' 2958 cls.collect_interrupt = interrupt 2959 cls.proc_interrupt = proc_interrupt 2960 cls.collect_idle.wait() 2961 cls.collect_interrupt = False 2962 cls.proc_interrupt = False 2963 cls.use_draw_list = False 2964 cls.draw_now = draw_now 2965 cls.redraw = redraw 2966 cls.only_draw = only_draw 2967 2968 if collectors: 2969 cls.collect_queue = [*collectors] 2970 cls.use_draw_list = True 2971 if ProcCollector in cls.collect_queue: 2972 cls.proc_counter = 1 2973 2974 else: 2975 cls.collect_queue = list(cls.__subclasses__()) 2976 if CONFIG.proc_update_mult > 1: 2977 if cls.proc_counter > 1: 2978 cls.collect_queue.remove(ProcCollector) 2979 if cls.proc_counter == CONFIG.proc_update_mult: 2980 cls.proc_counter = 0 2981 cls.proc_counter += 1 2982 2983 cls.collect_run.set() 2984 2985 2986class CpuCollector(Collector): 2987 '''Collects cpu usage for cpu and cores, cpu frequency, load_avg, uptime and cpu temps''' 2988 cpu_usage: List[List[int]] = [] 2989 cpu_upper: List[int] = [] 2990 cpu_lower: List[int] = [] 2991 cpu_temp: List[List[int]] = [] 2992 cpu_temp_high: int = 0 2993 cpu_temp_crit: int = 0 2994 for _ in range(THREADS + 1): 2995 cpu_usage.append([]) 2996 cpu_temp.append([]) 2997 freq_error: bool = False 2998 cpu_freq: int = 0 2999 load_avg: List[float] = [] 3000 uptime: str = "" 3001 buffer: str = CpuBox.buffer 3002 sensor_method: str = "" 3003 got_sensors: bool = False 3004 sensor_swap: bool = False 3005 cpu_temp_only: bool = False 3006 3007 @classmethod 3008 def get_sensors(cls): 3009 '''Check if we can get cpu temps and return method of getting temps''' 3010 cls.sensor_method = "" 3011 if SYSTEM == "MacOS": 3012 try: 3013 if which("coretemp") and subprocess.check_output(["coretemp", "-p"], universal_newlines=True).strip().replace("-", "").isdigit(): 3014 cls.sensor_method = "coretemp" 3015 elif which("osx-cpu-temp") and subprocess.check_output("osx-cpu-temp", universal_newlines=True).rstrip().endswith("°C"): 3016 cls.sensor_method = "osx-cpu-temp" 3017 except: pass 3018 elif CONFIG.cpu_sensor != "Auto" and CONFIG.cpu_sensor in CONFIG.cpu_sensors: 3019 cls.sensor_method = "psutil" 3020 elif hasattr(psutil, "sensors_temperatures"): 3021 try: 3022 temps = psutil.sensors_temperatures() 3023 if temps: 3024 for name, entries in temps.items(): 3025 if name.lower().startswith("cpu"): 3026 cls.sensor_method = "psutil" 3027 break 3028 for entry in entries: 3029 if entry.label.startswith(("Package", "Core 0", "Tdie", "CPU")): 3030 cls.sensor_method = "psutil" 3031 break 3032 except: pass 3033 if not cls.sensor_method and SYSTEM == "Linux": 3034 try: 3035 if which("vcgencmd") and subprocess.check_output(["vcgencmd", "measure_temp"], universal_newlines=True).strip().endswith("'C"): 3036 cls.sensor_method = "vcgencmd" 3037 except: pass 3038 cls.got_sensors = bool(cls.sensor_method) 3039 3040 @classmethod 3041 def _collect(cls): 3042 cls.cpu_usage[0].append(ceil(psutil.cpu_percent(percpu=False))) 3043 if len(cls.cpu_usage[0]) > Term.width * 4: 3044 del cls.cpu_usage[0][0] 3045 3046 cpu_times_percent = psutil.cpu_times_percent() 3047 for x in ["upper", "lower"]: 3048 if getattr(CONFIG, "cpu_graph_" + x) == "total": 3049 setattr(cls, "cpu_" + x, cls.cpu_usage[0]) 3050 else: 3051 getattr(cls, "cpu_" + x).append(ceil(getattr(cpu_times_percent, getattr(CONFIG, "cpu_graph_" + x)))) 3052 if len(getattr(cls, "cpu_" + x)) > Term.width * 4: 3053 del getattr(cls, "cpu_" + x)[0] 3054 3055 for n, thread in enumerate(psutil.cpu_percent(percpu=True), start=1): 3056 cls.cpu_usage[n].append(ceil(thread)) 3057 if len(cls.cpu_usage[n]) > Term.width * 2: 3058 del cls.cpu_usage[n][0] 3059 try: 3060 if CONFIG.show_cpu_freq and hasattr(psutil.cpu_freq(), "current"): 3061 freq: float = psutil.cpu_freq().current 3062 cls.cpu_freq = round(freq * (1 if freq > 10 else 1000)) 3063 elif cls.cpu_freq > 0: 3064 cls.cpu_freq = 0 3065 except Exception as e: 3066 if not cls.freq_error: 3067 cls.freq_error = True 3068 errlog.error("Exception while getting cpu frequency!") 3069 errlog.exception(f'{e}') 3070 else: 3071 pass 3072 cls.load_avg = [round(lavg, 2) for lavg in psutil.getloadavg()] 3073 cls.uptime = str(timedelta(seconds=round(time()-psutil.boot_time(),0)))[:-3].replace(" days,", "d").replace(" day,", "d") 3074 3075 if CONFIG.check_temp and cls.got_sensors: 3076 cls._collect_temps() 3077 3078 @classmethod 3079 def _collect_temps(cls): 3080 temp: int = 1000 3081 cores: List[int] = [] 3082 core_dict: Dict[int, int] = {} 3083 entry_int: int = 0 3084 cpu_type: str = "" 3085 c_max: int = 0 3086 s_name: str = "_-_" 3087 s_label: str = "_-_" 3088 if cls.sensor_method == "psutil": 3089 try: 3090 if CONFIG.cpu_sensor != "Auto": 3091 s_name, s_label = CONFIG.cpu_sensor.split(":", 1) 3092 for name, entries in psutil.sensors_temperatures().items(): 3093 for num, entry in enumerate(entries, 1): 3094 if name == s_name and (entry.label == s_label or str(num) == s_label): 3095 if entry.label.startswith("Package"): 3096 cpu_type = "intel" 3097 elif entry.label.startswith("Tdie"): 3098 cpu_type = "ryzen" 3099 else: 3100 cpu_type = "other" 3101 if getattr(entry, "high", None) != None and entry.high > 1: cls.cpu_temp_high = round(entry.high) 3102 else: cls.cpu_temp_high = 80 3103 if getattr(entry, "critical", None) != None and entry.critical > 1: cls.cpu_temp_crit = round(entry.critical) 3104 else: cls.cpu_temp_crit = 95 3105 temp = round(entry.current) 3106 elif entry.label.startswith(("Package", "Tdie")) and cpu_type in ["", "other"] and s_name == "_-_" and hasattr(entry, "current"): 3107 if not cls.cpu_temp_high or cls.sensor_swap or cpu_type == "other": 3108 cls.sensor_swap = False 3109 if getattr(entry, "high", None) != None and entry.high > 1: cls.cpu_temp_high = round(entry.high) 3110 else: cls.cpu_temp_high = 80 3111 if getattr(entry, "critical", None) != None and entry.critical > 1: cls.cpu_temp_crit = round(entry.critical) 3112 else: cls.cpu_temp_crit = 95 3113 cpu_type = "intel" if entry.label.startswith("Package") else "ryzen" 3114 temp = round(entry.current) 3115 elif (entry.label.startswith(("Core", "Tccd", "CPU")) or (name.lower().startswith("cpu") and not entry.label)) and hasattr(entry, "current"): 3116 if entry.label.startswith(("Core", "Tccd")): 3117 entry_int = int(entry.label.replace("Core", "").replace("Tccd", "")) 3118 if entry_int in core_dict and cpu_type != "ryzen": 3119 if c_max == 0: 3120 c_max = max(core_dict) + 1 3121 if c_max < THREADS // 2 and (entry_int + c_max) not in core_dict: 3122 core_dict[(entry_int + c_max)] = round(entry.current) 3123 continue 3124 elif entry_int in core_dict: 3125 continue 3126 core_dict[entry_int] = round(entry.current) 3127 continue 3128 elif cpu_type in ["intel", "ryzen"]: 3129 continue 3130 if not cpu_type: 3131 cpu_type = "other" 3132 if not cls.cpu_temp_high or cls.sensor_swap: 3133 cls.sensor_swap = False 3134 if getattr(entry, "high", None) != None and entry.high > 1: cls.cpu_temp_high = round(entry.high) 3135 else: cls.cpu_temp_high = 60 if name == "cpu_thermal" else 80 3136 if getattr(entry, "critical", None) != None and entry.critical > 1: cls.cpu_temp_crit = round(entry.critical) 3137 else: cls.cpu_temp_crit = 80 if name == "cpu_thermal" else 95 3138 temp = round(entry.current) 3139 cores.append(round(entry.current)) 3140 if core_dict: 3141 if not temp or temp == 1000: 3142 temp = sum(core_dict.values()) // len(core_dict) 3143 if not cls.cpu_temp_high or not cls.cpu_temp_crit: 3144 cls.cpu_temp_high, cls.cpu_temp_crit = 80, 95 3145 cls.cpu_temp[0].append(temp) 3146 if cpu_type == "ryzen": 3147 ccds: int = len(core_dict) 3148 cores_per_ccd: int = CORES // ccds 3149 z: int = 1 3150 for x in range(THREADS): 3151 if x == CORES: 3152 z = 1 3153 if CORE_MAP[x] + 1 > cores_per_ccd * z: 3154 z += 1 3155 if z in core_dict: 3156 cls.cpu_temp[x+1].append(core_dict[z]) 3157 else: 3158 for x in range(THREADS): 3159 if CORE_MAP[x] in core_dict: 3160 cls.cpu_temp[x+1].append(core_dict[CORE_MAP[x]]) 3161 3162 elif len(cores) == THREADS / 2: 3163 cls.cpu_temp[0].append(temp) 3164 for n, t in enumerate(cores, start=1): 3165 try: 3166 cls.cpu_temp[n].append(t) 3167 cls.cpu_temp[THREADS // 2 + n].append(t) 3168 except IndexError: 3169 break 3170 3171 else: 3172 cls.cpu_temp[0].append(temp) 3173 if len(cores) > 1: 3174 for n, t in enumerate(cores, start=1): 3175 try: 3176 cls.cpu_temp[n].append(t) 3177 except IndexError: 3178 break 3179 except Exception as e: 3180 errlog.exception(f'{e}') 3181 cls.got_sensors = False 3182 CpuBox._calc_size() 3183 3184 else: 3185 try: 3186 if cls.sensor_method == "coretemp": 3187 temp = max(0, int(subprocess.check_output(["coretemp", "-p"], universal_newlines=True).strip())) 3188 cores = [max(0, int(x)) for x in subprocess.check_output("coretemp", universal_newlines=True).split()] 3189 if len(cores) == THREADS / 2: 3190 cls.cpu_temp[0].append(temp) 3191 for n, t in enumerate(cores, start=1): 3192 try: 3193 cls.cpu_temp[n].append(t) 3194 cls.cpu_temp[THREADS // 2 + n].append(t) 3195 except IndexError: 3196 break 3197 else: 3198 cores.insert(0, temp) 3199 for n, t in enumerate(cores): 3200 try: 3201 cls.cpu_temp[n].append(t) 3202 except IndexError: 3203 break 3204 if not cls.cpu_temp_high: 3205 cls.cpu_temp_high = 85 3206 cls.cpu_temp_crit = 100 3207 elif cls.sensor_method == "osx-cpu-temp": 3208 temp = max(0, round(float(subprocess.check_output("osx-cpu-temp", universal_newlines=True).strip()[:-2]))) 3209 if not cls.cpu_temp_high: 3210 cls.cpu_temp_high = 85 3211 cls.cpu_temp_crit = 100 3212 elif cls.sensor_method == "vcgencmd": 3213 temp = max(0, round(float(subprocess.check_output(["vcgencmd", "measure_temp"], universal_newlines=True).strip()[5:-2]))) 3214 if not cls.cpu_temp_high: 3215 cls.cpu_temp_high = 60 3216 cls.cpu_temp_crit = 80 3217 except Exception as e: 3218 errlog.exception(f'{e}') 3219 cls.got_sensors = False 3220 CpuBox._calc_size() 3221 else: 3222 if not cores: 3223 cls.cpu_temp[0].append(temp) 3224 3225 if not core_dict and len(cores) <= 1: 3226 cls.cpu_temp_only = True 3227 if len(cls.cpu_temp[0]) > 5: 3228 for n in range(len(cls.cpu_temp)): 3229 if cls.cpu_temp[n]: 3230 del cls.cpu_temp[n][0] 3231 3232 @classmethod 3233 def _draw(cls): 3234 CpuBox._draw_fg() 3235 3236class MemCollector(Collector): 3237 '''Collects memory and disks information''' 3238 values: Dict[str, int] = {} 3239 vlist: Dict[str, List[int]] = {} 3240 percent: Dict[str, int] = {} 3241 string: Dict[str, str] = {} 3242 3243 swap_values: Dict[str, int] = {} 3244 swap_vlist: Dict[str, List[int]] = {} 3245 swap_percent: Dict[str, int] = {} 3246 swap_string: Dict[str, str] = {} 3247 3248 disks: Dict[str, Dict] 3249 disk_hist: Dict[str, Tuple] = {} 3250 timestamp: float = time() 3251 disks_io_dict: Dict[str, Dict[str, List[int]]] = {} 3252 recheck_diskutil: bool = True 3253 diskutil_map: Dict[str, str] = {} 3254 3255 io_error: bool = False 3256 3257 old_disks: List[str] = [] 3258 old_io_disks: List[str] = [] 3259 3260 fstab_filter: List[str] = [] 3261 3262 excludes: List[str] = ["squashfs", "nullfs"] 3263 if SYSTEM == "BSD": excludes += ["devfs", "tmpfs", "procfs", "linprocfs", "gvfs", "fusefs"] 3264 3265 buffer: str = MemBox.buffer 3266 3267 @classmethod 3268 def _collect(cls): 3269 #* Collect memory 3270 mem = psutil.virtual_memory() 3271 if hasattr(mem, "cached"): 3272 cls.values["cached"] = mem.cached 3273 else: 3274 cls.values["cached"] = mem.active 3275 cls.values["total"], cls.values["free"], cls.values["available"] = mem.total, mem.free, mem.available 3276 cls.values["used"] = cls.values["total"] - cls.values["available"] 3277 3278 for key, value in cls.values.items(): 3279 cls.string[key] = floating_humanizer(value) 3280 if key == "total": continue 3281 cls.percent[key] = round(value * 100 / cls.values["total"]) 3282 if CONFIG.mem_graphs: 3283 if not key in cls.vlist: cls.vlist[key] = [] 3284 cls.vlist[key].append(cls.percent[key]) 3285 if len(cls.vlist[key]) > MemBox.width: del cls.vlist[key][0] 3286 3287 #* Collect swap 3288 if CONFIG.show_swap or CONFIG.swap_disk: 3289 swap = psutil.swap_memory() 3290 cls.swap_values["total"], cls.swap_values["free"] = swap.total, swap.free 3291 cls.swap_values["used"] = cls.swap_values["total"] - cls.swap_values["free"] 3292 3293 if swap.total: 3294 if not MemBox.swap_on: 3295 MemBox.redraw = True 3296 MemBox.swap_on = True 3297 for key, value in cls.swap_values.items(): 3298 cls.swap_string[key] = floating_humanizer(value) 3299 if key == "total": continue 3300 cls.swap_percent[key] = round(value * 100 / cls.swap_values["total"]) 3301 if CONFIG.mem_graphs: 3302 if not key in cls.swap_vlist: cls.swap_vlist[key] = [] 3303 cls.swap_vlist[key].append(cls.swap_percent[key]) 3304 if len(cls.swap_vlist[key]) > MemBox.width: del cls.swap_vlist[key][0] 3305 else: 3306 if MemBox.swap_on: 3307 MemBox.redraw = True 3308 MemBox.swap_on = False 3309 else: 3310 if MemBox.swap_on: 3311 MemBox.redraw = True 3312 MemBox.swap_on = False 3313 3314 3315 if not CONFIG.show_disks: return 3316 #* Collect disks usage 3317 disk_read: int = 0 3318 disk_write: int = 0 3319 dev_name: str 3320 disk_name: str 3321 filtering: Tuple = () 3322 filter_exclude: bool = False 3323 io_string_r: str 3324 io_string_w: str 3325 u_percent: int 3326 cls.disks = {} 3327 3328 if CONFIG.disks_filter: 3329 if CONFIG.disks_filter.startswith("exclude="): 3330 filter_exclude = True 3331 filtering = tuple(v.strip() for v in CONFIG.disks_filter.replace("exclude=", "").strip().split(",")) 3332 else: 3333 filtering = tuple(v.strip() for v in CONFIG.disks_filter.strip().split(",")) 3334 try: 3335 io_counters = psutil.disk_io_counters(perdisk=SYSTEM != "BSD", nowrap=True) 3336 except ValueError as e: 3337 if not cls.io_error: 3338 cls.io_error = True 3339 errlog.error(f'Non fatal error during disk io collection!') 3340 if psutil.version_info[0] < 5 or (psutil.version_info[0] == 5 and psutil.version_info[1] < 7): 3341 errlog.error(f'Caused by outdated psutil version.') 3342 errlog.exception(f'{e}') 3343 io_counters = None 3344 3345 if SYSTEM == "MacOS" and cls.recheck_diskutil: 3346 cls.recheck_diskutil = False 3347 try: 3348 dutil_out = subprocess.check_output(["diskutil", "list", "physical"], universal_newlines=True) 3349 for line in dutil_out.split("\n"): 3350 line = line.replace("\u2068", "").replace("\u2069", "") 3351 if line.startswith("/dev/"): 3352 xdisk = line.split()[0].replace("/dev/", "") 3353 elif "Container" in line: 3354 ydisk = line.split()[3] 3355 if xdisk and ydisk: 3356 cls.diskutil_map[xdisk] = ydisk 3357 xdisk = ydisk = "" 3358 except: 3359 pass 3360 3361 if CONFIG.use_fstab and SYSTEM != "MacOS" and not cls.fstab_filter: 3362 try: 3363 with open('/etc/fstab','r') as fstab: 3364 for line in fstab: 3365 line = line.strip() 3366 if line and not line.startswith('#'): 3367 mount_data = (line.split()) 3368 if mount_data[2].lower() != "swap": 3369 cls.fstab_filter += [mount_data[1]] 3370 errlog.debug(f'new fstab_filter set : {cls.fstab_filter}') 3371 except IOError: 3372 CONFIG.use_fstab = False 3373 errlog.warning(f'Error reading fstab, use_fstab flag reset to {CONFIG.use_fstab}') 3374 if not CONFIG.use_fstab and cls.fstab_filter: 3375 cls.fstab_filter = [] 3376 errlog.debug(f'use_fstab flag has been turned to {CONFIG.use_fstab}, fstab_filter cleared') 3377 3378 for disk in psutil.disk_partitions(all=CONFIG.use_fstab or not CONFIG.only_physical): 3379 disk_io = None 3380 io_string_r = io_string_w = "" 3381 if CONFIG.use_fstab and disk.mountpoint not in cls.fstab_filter: 3382 continue 3383 disk_name = disk.mountpoint.rsplit('/', 1)[-1] if not disk.mountpoint == "/" else "root" 3384 if cls.excludes and disk.fstype in cls.excludes: 3385 continue 3386 if filtering and ((not filter_exclude and not disk.mountpoint in filtering) or (filter_exclude and disk.mountpoint in filtering)): 3387 continue 3388 if SYSTEM == "MacOS" and disk.mountpoint == "/private/var/vm": 3389 continue 3390 try: 3391 disk_u = psutil.disk_usage(disk.mountpoint) 3392 except: 3393 pass 3394 3395 u_percent = round(getattr(disk_u, "percent", 0)) 3396 cls.disks[disk.device] = { "name" : disk_name, "used_percent" : u_percent, "free_percent" : 100 - u_percent } 3397 for name in ["total", "used", "free"]: 3398 cls.disks[disk.device][name] = floating_humanizer(getattr(disk_u, name, 0)) 3399 3400 #* Collect disk io 3401 if io_counters: 3402 try: 3403 if SYSTEM != "BSD": 3404 dev_name = os.path.realpath(disk.device).rsplit('/', 1)[-1] 3405 if not dev_name in io_counters: 3406 for names in io_counters: 3407 if names in dev_name: 3408 disk_io = io_counters[names] 3409 break 3410 else: 3411 if cls.diskutil_map: 3412 for names, items in cls.diskutil_map.items(): 3413 if items in dev_name and names in io_counters: 3414 disk_io = io_counters[names] 3415 else: 3416 disk_io = io_counters[dev_name] 3417 elif disk.mountpoint == "/": 3418 disk_io = io_counters 3419 else: 3420 raise Exception 3421 disk_read = round((disk_io.read_bytes - cls.disk_hist[disk.device][0]) / (time() - cls.timestamp)) #type: ignore 3422 disk_write = round((disk_io.write_bytes - cls.disk_hist[disk.device][1]) / (time() - cls.timestamp)) #type: ignore 3423 if not disk.device in cls.disks_io_dict: 3424 cls.disks_io_dict[disk.device] = {"read" : [], "write" : [], "rw" : []} 3425 cls.disks_io_dict[disk.device]["read"].append(disk_read >> 20) 3426 cls.disks_io_dict[disk.device]["write"].append(disk_write >> 20) 3427 cls.disks_io_dict[disk.device]["rw"].append((disk_read + disk_write) >> 20) 3428 3429 if len(cls.disks_io_dict[disk.device]["read"]) > MemBox.width: 3430 del cls.disks_io_dict[disk.device]["read"][0], cls.disks_io_dict[disk.device]["write"][0], cls.disks_io_dict[disk.device]["rw"][0] 3431 3432 except: 3433 disk_read = disk_write = 0 3434 else: 3435 disk_read = disk_write = 0 3436 3437 if disk_io: 3438 cls.disk_hist[disk.device] = (disk_io.read_bytes, disk_io.write_bytes) 3439 if CONFIG.io_mode or MemBox.disks_width > 30: 3440 if disk_read > 0: 3441 io_string_r = f'▲{floating_humanizer(disk_read, short=True)}' 3442 if disk_write > 0: 3443 io_string_w = f'▼{floating_humanizer(disk_write, short=True)}' 3444 if CONFIG.io_mode: 3445 cls.disks[disk.device]["io_r"] = io_string_r 3446 cls.disks[disk.device]["io_w"] = io_string_w 3447 elif disk_read + disk_write > 0: 3448 io_string_r += f'▼▲{floating_humanizer(disk_read + disk_write, short=True)}' 3449 3450 cls.disks[disk.device]["io"] = io_string_r + (" " if io_string_w and io_string_r else "") + io_string_w 3451 3452 if CONFIG.swap_disk and MemBox.swap_on: 3453 cls.disks["__swap"] = { "name" : "swap", "used_percent" : cls.swap_percent["used"], "free_percent" : cls.swap_percent["free"], "io" : "" } 3454 for name in ["total", "used", "free"]: 3455 cls.disks["__swap"][name] = cls.swap_string[name] 3456 if len(cls.disks) > 2: 3457 try: 3458 new = { list(cls.disks)[0] : cls.disks.pop(list(cls.disks)[0])} 3459 new["__swap"] = cls.disks.pop("__swap") 3460 new.update(cls.disks) 3461 cls.disks = new 3462 except: 3463 pass 3464 3465 if cls.old_disks != list(cls.disks) or cls.old_io_disks != list(cls.disks_io_dict): 3466 MemBox.redraw = True 3467 cls.recheck_diskutil = True 3468 cls.old_disks = list(cls.disks) 3469 cls.old_io_disks = list(cls.disks_io_dict) 3470 3471 cls.timestamp = time() 3472 3473 @classmethod 3474 def _draw(cls): 3475 MemBox._draw_fg() 3476 3477class NetCollector(Collector): 3478 '''Collects network stats''' 3479 buffer: str = NetBox.buffer 3480 nics: List[str] = [] 3481 nic_i: int = 0 3482 nic: str = "" 3483 new_nic: str = "" 3484 nic_error: bool = False 3485 reset: bool = False 3486 graph_raise: Dict[str, int] = {"download" : 5, "upload" : 5} 3487 graph_lower: Dict[str, int] = {"download" : 5, "upload" : 5} 3488 #min_top: int = 10<<10 3489 #* Stats structure = stats[netword device][download, upload][total, last, top, graph_top, offset, speed, redraw, graph_raise, graph_low] = int, List[int], bool 3490 stats: Dict[str, Dict[str, Dict[str, Any]]] = {} 3491 #* Strings structure strings[network device][download, upload][total, byte_ps, bit_ps, top, graph_top] = str 3492 strings: Dict[str, Dict[str, Dict[str, str]]] = {} 3493 switched: bool = False 3494 timestamp: float = time() 3495 net_min: Dict[str, int] = {"download" : -1, "upload" : -1} 3496 auto_min: bool = CONFIG.net_auto 3497 net_iface: str = CONFIG.net_iface 3498 sync_top: int = 0 3499 sync_string: str = "" 3500 address: str = "" 3501 3502 @classmethod 3503 def _get_nics(cls): 3504 '''Get a list of all network devices sorted by highest throughput''' 3505 cls.nic_i = 0 3506 cls.nics = [] 3507 cls.nic = "" 3508 try: 3509 io_all = psutil.net_io_counters(pernic=True) 3510 except Exception as e: 3511 if not cls.nic_error: 3512 cls.nic_error = True 3513 errlog.exception(f'{e}') 3514 if not io_all: return 3515 up_stat = psutil.net_if_stats() 3516 for nic in sorted(io_all.keys(), key=lambda nic: (getattr(io_all[nic], "bytes_recv", 0) + getattr(io_all[nic], "bytes_sent", 0)), reverse=True): 3517 if nic not in up_stat or not up_stat[nic].isup: 3518 continue 3519 cls.nics.append(nic) 3520 if not cls.nics: cls.nics = [""] 3521 cls.nic = cls.nics[cls.nic_i] 3522 if cls.net_iface and cls.net_iface in cls.nics: 3523 cls.nic = cls.net_iface 3524 cls.nic_i = cls.nics.index(cls.nic) 3525 3526 3527 @classmethod 3528 def switch(cls, key: str): 3529 if cls.net_iface: cls.net_iface = "" 3530 if len(cls.nics) < 2 and cls.nic in cls.nics: 3531 return 3532 3533 if cls.nic_i == -1: 3534 cls.nic_i = 0 if key == "n" else -1 3535 else: 3536 cls.nic_i += +1 if key == "n" else -1 3537 3538 cls.nic_i %= len(cls.nics) 3539 cls.new_nic = cls.nics[cls.nic_i] 3540 cls.switched = True 3541 Collector.collect(NetCollector, redraw=True) 3542 3543 @classmethod 3544 def _collect(cls): 3545 speed: int 3546 stat: Dict 3547 up_stat = psutil.net_if_stats() 3548 3549 if sorted(cls.nics) != sorted(nic for nic in up_stat if up_stat[nic].isup): 3550 old_nic = cls.nic 3551 cls._get_nics() 3552 cls.nic = old_nic 3553 if cls.nic not in cls.nics: 3554 cls.nic_i = -1 3555 else: 3556 cls.nic_i = cls.nics.index(cls.nic) 3557 3558 if cls.switched: 3559 cls.nic = cls.new_nic 3560 cls.switched = False 3561 3562 if not cls.nic or cls.nic not in up_stat: 3563 cls._get_nics() 3564 if not cls.nic: return 3565 NetBox.redraw = True 3566 try: 3567 io_all = psutil.net_io_counters(pernic=True)[cls.nic] 3568 except KeyError: 3569 pass 3570 return 3571 if not cls.nic in cls.stats: 3572 cls.stats[cls.nic] = {} 3573 cls.strings[cls.nic] = { "download" : {}, "upload" : {}} 3574 for direction, value in ["download", io_all.bytes_recv], ["upload", io_all.bytes_sent]: 3575 cls.stats[cls.nic][direction] = { "total" : value, "last" : value, "top" : 0, "graph_top" : 0, "offset" : 0, "speed" : [], "redraw" : True, "graph_raise" : 0, "graph_lower" : 7 } 3576 for v in ["total", "byte_ps", "bit_ps", "top", "graph_top"]: 3577 cls.strings[cls.nic][direction][v] = "" 3578 3579 cls.stats[cls.nic]["download"]["total"] = io_all.bytes_recv 3580 cls.stats[cls.nic]["upload"]["total"] = io_all.bytes_sent 3581 if cls.nic in psutil.net_if_addrs(): 3582 cls.address = getattr(psutil.net_if_addrs()[cls.nic][0], "address", "") 3583 3584 for direction in ["download", "upload"]: 3585 stat = cls.stats[cls.nic][direction] 3586 strings = cls.strings[cls.nic][direction] 3587 #* Calculate current speed 3588 stat["speed"].append(round((stat["total"] - stat["last"]) / (time() - cls.timestamp))) 3589 stat["last"] = stat["total"] 3590 speed = stat["speed"][-1] 3591 3592 if cls.net_min[direction] == -1: 3593 cls.net_min[direction] = units_to_bytes(getattr(CONFIG, "net_" + direction)) 3594 stat["graph_top"] = cls.net_min[direction] 3595 stat["graph_lower"] = 7 3596 if not cls.auto_min: 3597 stat["redraw"] = True 3598 strings["graph_top"] = floating_humanizer(stat["graph_top"], short=True) 3599 3600 if stat["offset"] and stat["offset"] > stat["total"]: 3601 cls.reset = True 3602 3603 if cls.reset: 3604 if not stat["offset"]: 3605 stat["offset"] = stat["total"] 3606 else: 3607 stat["offset"] = 0 3608 if direction == "upload": 3609 cls.reset = False 3610 NetBox.redraw = True 3611 3612 if len(stat["speed"]) > NetBox.width * 2: 3613 del stat["speed"][0] 3614 3615 strings["total"] = floating_humanizer(stat["total"] - stat["offset"]) 3616 strings["byte_ps"] = floating_humanizer(stat["speed"][-1], per_second=True) 3617 strings["bit_ps"] = floating_humanizer(stat["speed"][-1], bit=True, per_second=True) 3618 3619 if speed > stat["top"] or not stat["top"]: 3620 stat["top"] = speed 3621 strings["top"] = floating_humanizer(stat["top"], bit=True, per_second=True) 3622 3623 if cls.auto_min: 3624 if speed > stat["graph_top"]: 3625 stat["graph_raise"] += 1 3626 if stat["graph_lower"] > 0: stat["graph_lower"] -= 1 3627 elif speed < stat["graph_top"] // 10: 3628 stat["graph_lower"] += 1 3629 if stat["graph_raise"] > 0: stat["graph_raise"] -= 1 3630 3631 if stat["graph_raise"] >= 5 or stat["graph_lower"] >= 5: 3632 if stat["graph_raise"] >= 5: 3633 stat["graph_top"] = round(max(stat["speed"][-5:]) / 0.8) 3634 elif stat["graph_lower"] >= 5: 3635 stat["graph_top"] = max(10 << 10, max(stat["speed"][-5:]) * 3) 3636 stat["graph_raise"] = 0 3637 stat["graph_lower"] = 0 3638 stat["redraw"] = True 3639 strings["graph_top"] = floating_humanizer(stat["graph_top"], short=True) 3640 3641 cls.timestamp = time() 3642 3643 if CONFIG.net_sync: 3644 c_max: int = max(cls.stats[cls.nic]["download"]["graph_top"], cls.stats[cls.nic]["upload"]["graph_top"]) 3645 if c_max != cls.sync_top: 3646 cls.sync_top = c_max 3647 cls.sync_string = floating_humanizer(cls.sync_top, short=True) 3648 NetBox.redraw = True 3649 3650 @classmethod 3651 def _draw(cls): 3652 NetBox._draw_fg() 3653 3654 3655class ProcCollector(Collector): 3656 '''Collects process stats''' 3657 buffer: str = ProcBox.buffer 3658 search_filter: str = "" 3659 case_sensitive: bool = False 3660 processes: Dict = {} 3661 num_procs: int = 0 3662 det_cpu: float = 0.0 3663 detailed: bool = False 3664 detailed_pid: Union[int, None] = None 3665 details: Dict[str, Any] = {} 3666 details_cpu: List[int] = [] 3667 details_mem: List[int] = [] 3668 expand: int = 0 3669 collapsed: Dict = {} 3670 tree_counter: int = 0 3671 p_values: List[str] = ["pid", "name", "cmdline", "num_threads", "username", "memory_percent", "cpu_percent", "cpu_times", "create_time"] 3672 sort_expr: Dict = {} 3673 sort_expr["pid"] = compile("p.info['pid']", "str", "eval") 3674 sort_expr["program"] = compile("'' if p.info['name'] == 0.0 else p.info['name']", "str", "eval") 3675 sort_expr["arguments"] = compile("' '.join(str(p.info['cmdline'])) or ('' if p.info['name'] == 0.0 else p.info['name'])", "str", "eval") 3676 sort_expr["threads"] = compile("0 if p.info['num_threads'] == 0.0 else p.info['num_threads']", "str", "eval") 3677 sort_expr["user"] = compile("'' if p.info['username'] == 0.0 else p.info['username']", "str", "eval") 3678 sort_expr["memory"] = compile("p.info['memory_percent']", "str", "eval") 3679 sort_expr["cpu lazy"] = compile("(sum(p.info['cpu_times'][:2] if not p.info['cpu_times'] == 0.0 else [0.0, 0.0]) * 1000 / (time() - p.info['create_time']))", "str", "eval") 3680 sort_expr["cpu responsive"] = compile("(p.info['cpu_percent'] if CONFIG.proc_per_core else (p.info['cpu_percent'] / THREADS))", "str", "eval") 3681 3682 @classmethod 3683 def _collect(cls): 3684 '''List all processess with pid, name, arguments, threads, username, memory percent and cpu percent''' 3685 if not "proc" in Box.boxes: return 3686 out: Dict = {} 3687 cls.det_cpu = 0.0 3688 sorting: str = CONFIG.proc_sorting 3689 reverse: bool = not CONFIG.proc_reversed 3690 proc_per_cpu: bool = CONFIG.proc_per_core 3691 search: List[str] = [] 3692 if cls.search_filter: 3693 if cls.case_sensitive: 3694 search = [i.strip() for i in cls.search_filter.split(",")] 3695 else: 3696 search = [i.strip() for i in cls.search_filter.lower().split(",")] 3697 err: float = 0.0 3698 n: int = 0 3699 3700 if CONFIG.proc_tree and sorting == "arguments": 3701 sorting = "program" 3702 3703 sort_cmd = cls.sort_expr[sorting] 3704 3705 if CONFIG.proc_tree: 3706 cls._tree(sort_cmd=sort_cmd, reverse=reverse, proc_per_cpu=proc_per_cpu, search=search) 3707 else: 3708 for p in sorted(psutil.process_iter(cls.p_values + (["memory_info"] if CONFIG.proc_mem_bytes else []), err), key=lambda p: eval(sort_cmd), reverse=reverse): 3709 if cls.collect_interrupt or cls.proc_interrupt: 3710 return 3711 if p.info["name"] == "idle" or p.info["name"] == err or p.info["pid"] == err: 3712 continue 3713 if p.info["cmdline"] == err: 3714 p.info["cmdline"] = "" 3715 if p.info["username"] == err: 3716 p.info["username"] = "" 3717 if p.info["num_threads"] == err: 3718 p.info["num_threads"] = 0 3719 if search: 3720 if cls.detailed and p.info["pid"] == cls.detailed_pid: 3721 cls.det_cpu = p.info["cpu_percent"] 3722 for value in [ p.info["name"], " ".join(p.info["cmdline"]), str(p.info["pid"]), p.info["username"] ]: 3723 if not cls.case_sensitive: 3724 value = value.lower() 3725 for s in search: 3726 if s in value: 3727 break 3728 else: continue 3729 break 3730 else: continue 3731 3732 cpu = p.info["cpu_percent"] if proc_per_cpu else round(p.info["cpu_percent"] / THREADS, 2) 3733 mem = p.info["memory_percent"] 3734 if CONFIG.proc_mem_bytes and hasattr(p.info["memory_info"], "rss"): 3735 mem_b = p.info["memory_info"].rss 3736 else: 3737 mem_b = 0 3738 3739 cmd = " ".join(p.info["cmdline"]) or "[" + p.info["name"] + "]" 3740 3741 out[p.info["pid"]] = { 3742 "name" : p.info["name"], 3743 "cmd" : cmd, 3744 "threads" : p.info["num_threads"], 3745 "username" : p.info["username"], 3746 "mem" : mem, 3747 "mem_b" : mem_b, 3748 "cpu" : cpu } 3749 3750 n += 1 3751 3752 cls.num_procs = n 3753 cls.processes = out.copy() 3754 3755 if cls.detailed: 3756 cls.expand = ((ProcBox.width - 2) - ((ProcBox.width - 2) // 3) - 40) // 10 3757 if cls.expand > 5: cls.expand = 5 3758 if cls.detailed and not cls.details.get("killed", False): 3759 try: 3760 c_pid = cls.detailed_pid 3761 det = psutil.Process(c_pid) 3762 except (psutil.NoSuchProcess, psutil.ZombieProcess): 3763 cls.details["killed"] = True 3764 cls.details["status"] = psutil.STATUS_DEAD 3765 ProcBox.redraw = True 3766 else: 3767 attrs: List[str] = ["status", "memory_info", "create_time"] 3768 if not SYSTEM == "MacOS": attrs.extend(["cpu_num"]) 3769 if cls.expand: 3770 attrs.extend(["nice", "terminal"]) 3771 if not SYSTEM == "MacOS": attrs.extend(["io_counters"]) 3772 3773 if not c_pid in cls.processes: attrs.extend(["pid", "name", "cmdline", "num_threads", "username", "memory_percent"]) 3774 3775 cls.details = det.as_dict(attrs=attrs, ad_value="") 3776 if det.parent() != None: cls.details["parent_name"] = det.parent().name() 3777 else: cls.details["parent_name"] = "" 3778 3779 cls.details["pid"] = c_pid 3780 if c_pid in cls.processes: 3781 cls.details["name"] = cls.processes[c_pid]["name"] 3782 cls.details["cmdline"] = cls.processes[c_pid]["cmd"] 3783 cls.details["threads"] = f'{cls.processes[c_pid]["threads"]}' 3784 cls.details["username"] = cls.processes[c_pid]["username"] 3785 cls.details["memory_percent"] = cls.processes[c_pid]["mem"] 3786 cls.details["cpu_percent"] = round(cls.processes[c_pid]["cpu"] * (1 if CONFIG.proc_per_core else THREADS)) 3787 else: 3788 cls.details["cmdline"] = " ".join(cls.details["cmdline"]) or "[" + cls.details["name"] + "]" 3789 cls.details["threads"] = f'{cls.details["num_threads"]}' 3790 cls.details["cpu_percent"] = round(cls.det_cpu) 3791 3792 cls.details["killed"] = False 3793 if SYSTEM == "MacOS": 3794 cls.details["cpu_num"] = -1 3795 cls.details["io_counters"] = "" 3796 3797 3798 if hasattr(cls.details["memory_info"], "rss"): cls.details["memory_bytes"] = floating_humanizer(cls.details["memory_info"].rss) # type: ignore 3799 else: cls.details["memory_bytes"] = "? Bytes" 3800 3801 if isinstance(cls.details["create_time"], float): 3802 uptime = timedelta(seconds=round(time()-cls.details["create_time"],0)) 3803 if uptime.days > 0: cls.details["uptime"] = f'{uptime.days}d {str(uptime).split(",")[1][:-3].strip()}' 3804 else: cls.details["uptime"] = f'{uptime}' 3805 else: cls.details["uptime"] = "??:??:??" 3806 3807 if cls.expand: 3808 if cls.expand > 1 : cls.details["nice"] = f'{cls.details["nice"]}' 3809 if SYSTEM == "BSD": 3810 if cls.expand > 2: 3811 if hasattr(cls.details["io_counters"], "read_count"): cls.details["io_read"] = f'{cls.details["io_counters"].read_count}' 3812 else: cls.details["io_read"] = "?" 3813 if cls.expand > 3: 3814 if hasattr(cls.details["io_counters"], "write_count"): cls.details["io_write"] = f'{cls.details["io_counters"].write_count}' 3815 else: cls.details["io_write"] = "?" 3816 else: 3817 if cls.expand > 2: 3818 if hasattr(cls.details["io_counters"], "read_bytes"): cls.details["io_read"] = floating_humanizer(cls.details["io_counters"].read_bytes) 3819 else: cls.details["io_read"] = "?" 3820 if cls.expand > 3: 3821 if hasattr(cls.details["io_counters"], "write_bytes"): cls.details["io_write"] = floating_humanizer(cls.details["io_counters"].write_bytes) 3822 else: cls.details["io_write"] = "?" 3823 if cls.expand > 4 : cls.details["terminal"] = f'{cls.details["terminal"]}'.replace("/dev/", "") 3824 3825 cls.details_cpu.append(cls.details["cpu_percent"]) 3826 mem = cls.details["memory_percent"] 3827 if mem > 80: mem = round(mem) 3828 elif mem > 60: mem = round(mem * 1.2) 3829 elif mem > 30: mem = round(mem * 1.5) 3830 elif mem > 10: mem = round(mem * 2) 3831 elif mem > 5: mem = round(mem * 10) 3832 else: mem = round(mem * 20) 3833 cls.details_mem.append(mem) 3834 if len(cls.details_cpu) > ProcBox.width: del cls.details_cpu[0] 3835 if len(cls.details_mem) > ProcBox.width: del cls.details_mem[0] 3836 3837 @classmethod 3838 def _tree(cls, sort_cmd, reverse: bool, proc_per_cpu: bool, search: List[str]): 3839 '''List all processess in a tree view with pid, name, threads, username, memory percent and cpu percent''' 3840 out: Dict = {} 3841 err: float = 0.0 3842 det_cpu: float = 0.0 3843 infolist: Dict = {} 3844 cls.tree_counter += 1 3845 tree = defaultdict(list) 3846 n: int = 0 3847 for p in sorted(psutil.process_iter(cls.p_values + (["memory_info"] if CONFIG.proc_mem_bytes else []), err), key=lambda p: eval(sort_cmd), reverse=reverse): 3848 if cls.collect_interrupt: return 3849 try: 3850 tree[p.ppid()].append(p.pid) 3851 except (psutil.NoSuchProcess, psutil.ZombieProcess): 3852 pass 3853 else: 3854 infolist[p.pid] = p.info 3855 n += 1 3856 if 0 in tree and 0 in tree[0]: 3857 tree[0].remove(0) 3858 3859 def create_tree(pid: int, tree: defaultdict, indent: str = "", inindent: str = " ", found: bool = False, depth: int = 0, collapse_to: Union[None, int] = None): 3860 nonlocal infolist, proc_per_cpu, search, out, det_cpu 3861 name: str; threads: int; username: str; mem: float; cpu: float; collapse: bool = False 3862 cont: bool = True 3863 getinfo: Dict = {} 3864 if cls.collect_interrupt: return 3865 try: 3866 name = psutil.Process(pid).name() 3867 if name == "idle": return 3868 except psutil.Error: 3869 pass 3870 cont = False 3871 name = "" 3872 if pid in infolist: 3873 getinfo = infolist[pid] 3874 3875 if search and not found: 3876 if cls.detailed and pid == cls.detailed_pid: 3877 det_cpu = getinfo["cpu_percent"] 3878 if "username" in getinfo and isinstance(getinfo["username"], float): getinfo["username"] = "" 3879 if "cmdline" in getinfo and isinstance(getinfo["cmdline"], float): getinfo["cmdline"] = "" 3880 for value in [ name, str(pid), getinfo.get("username", ""), " ".join(getinfo.get("cmdline", "")) ]: 3881 if not cls.case_sensitive: 3882 value = value.lower() 3883 for s in search: 3884 if s in value: 3885 found = True 3886 break 3887 else: continue 3888 break 3889 else: cont = False 3890 if cont: 3891 if getinfo: 3892 if getinfo["num_threads"] == err: threads = 0 3893 else: threads = getinfo["num_threads"] 3894 if getinfo["username"] == err: username = "" 3895 else: username = getinfo["username"] 3896 cpu = getinfo["cpu_percent"] if proc_per_cpu else round(getinfo["cpu_percent"] / THREADS, 2) 3897 mem = getinfo["memory_percent"] 3898 if getinfo["cmdline"] == err: cmd = "" 3899 else: cmd = " ".join(getinfo["cmdline"]) or "[" + getinfo["name"] + "]" 3900 if CONFIG.proc_mem_bytes and hasattr(getinfo["memory_info"], "rss"): 3901 mem_b = getinfo["memory_info"].rss 3902 else: 3903 mem_b = 0 3904 else: 3905 threads = mem_b = 0 3906 username = "" 3907 mem = cpu = 0.0 3908 3909 if pid in cls.collapsed: 3910 collapse = cls.collapsed[pid] 3911 else: 3912 collapse = depth > CONFIG.tree_depth 3913 cls.collapsed[pid] = collapse 3914 3915 if collapse_to and not search: 3916 out[collapse_to]["threads"] += threads 3917 out[collapse_to]["mem"] += mem 3918 out[collapse_to]["mem_b"] += mem_b 3919 out[collapse_to]["cpu"] += cpu 3920 else: 3921 if pid in tree and len(tree[pid]) > 0: 3922 sign: str = "+" if collapse else "-" 3923 inindent = inindent.replace(" ├─ ", "[" + sign + "]─").replace(" └─ ", "[" + sign + "]─") 3924 out[pid] = { 3925 "indent" : inindent, 3926 "name": name, 3927 "cmd" : cmd, 3928 "threads" : threads, 3929 "username" : username, 3930 "mem" : mem, 3931 "mem_b" : mem_b, 3932 "cpu" : cpu, 3933 "depth" : depth, 3934 } 3935 3936 if search: collapse = False 3937 elif collapse and not collapse_to: 3938 collapse_to = pid 3939 3940 if pid not in tree: 3941 return 3942 children = tree[pid][:-1] 3943 3944 for child in children: 3945 create_tree(child, tree, indent + " │ ", indent + " ├─ ", found=found, depth=depth+1, collapse_to=collapse_to) 3946 create_tree(tree[pid][-1], tree, indent + " ", indent + " └─ ", depth=depth+1, collapse_to=collapse_to) 3947 3948 create_tree(min(tree), tree) 3949 cls.det_cpu = det_cpu 3950 3951 if cls.collect_interrupt: return 3952 if cls.tree_counter >= 100: 3953 cls.tree_counter = 0 3954 for pid in list(cls.collapsed): 3955 if not psutil.pid_exists(pid): 3956 del cls.collapsed[pid] 3957 cls.num_procs = len(out) 3958 cls.processes = out.copy() 3959 3960 @classmethod 3961 def sorting(cls, key: str): 3962 index: int = CONFIG.sorting_options.index(CONFIG.proc_sorting) + (1 if key in ["right", "l"] else -1) 3963 if index >= len(CONFIG.sorting_options): index = 0 3964 elif index < 0: index = len(CONFIG.sorting_options) - 1 3965 CONFIG.proc_sorting = CONFIG.sorting_options[index] 3966 if "left" in Key.mouse: del Key.mouse["left"] 3967 Collector.collect(ProcCollector, interrupt=True, redraw=True) 3968 3969 @classmethod 3970 def _draw(cls): 3971 ProcBox._draw_fg() 3972 3973class Menu: 3974 '''Holds all menus''' 3975 active: bool = False 3976 close: bool = False 3977 resized: bool = True 3978 menus: Dict[str, Dict[str, str]] = {} 3979 menu_length: Dict[str, int] = {} 3980 background: str = "" 3981 for name, menu in MENUS.items(): 3982 menu_length[name] = len(menu["normal"][0]) 3983 menus[name] = {} 3984 for sel in ["normal", "selected"]: 3985 menus[name][sel] = "" 3986 for i in range(len(menu[sel])): 3987 menus[name][sel] += Fx.trans(f'{Color.fg(MENU_COLORS[sel][i])}{menu[sel][i]}') 3988 if i < len(menu[sel]) - 1: menus[name][sel] += f'{Mv.d(1)}{Mv.l(len(menu[sel][i]))}' 3989 3990 @classmethod 3991 def main(cls): 3992 if Term.width < 80 or Term.height < 24: 3993 errlog.warning(f'The menu system only works on a terminal size of 80x24 or above!') 3994 return 3995 out: str = "" 3996 banner: str = "" 3997 redraw: bool = True 3998 key: str = "" 3999 mx: int = 0 4000 my: int = 0 4001 skip: bool = False 4002 mouse_over: bool = False 4003 mouse_items: Dict[str, Dict[str, int]] = {} 4004 cls.active = True 4005 cls.resized = True 4006 menu_names: List[str] = list(cls.menus.keys()) 4007 menu_index: int = 0 4008 menu_current: str = menu_names[0] 4009 cls.background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' 4010 4011 while not cls.close: 4012 key = "" 4013 if cls.resized: 4014 banner = (f'{Banner.draw(Term.height // 2 - 10, center=True)}{Mv.d(1)}{Mv.l(46)}{Colors.black_bg}{Colors.default}{Fx.b}← esc' 4015 f'{Mv.r(30)}{Fx.i}Version: {VERSION}{Fx.ui}{Fx.ub}{Term.bg}{Term.fg}') 4016 if UpdateChecker.version != VERSION: 4017 banner += f'{Mv.to(Term.height, 1)}{Fx.b}{THEME.title}New release {UpdateChecker.version} available at https://github.com/aristocratos/bpytop{Fx.ub}{Term.fg}' 4018 cy = 0 4019 for name, menu in cls.menus.items(): 4020 ypos = Term.height // 2 - 2 + cy 4021 xpos = Term.width // 2 - (cls.menu_length[name] // 2) 4022 mouse_items[name] = { "x1" : xpos, "x2" : xpos + cls.menu_length[name] - 1, "y1" : ypos, "y2" : ypos + 2 } 4023 cy += 3 4024 redraw = True 4025 cls.resized = False 4026 4027 if redraw: 4028 out = "" 4029 for name, menu in cls.menus.items(): 4030 out += f'{Mv.to(mouse_items[name]["y1"], mouse_items[name]["x1"])}{menu["selected" if name == menu_current else "normal"]}' 4031 4032 if skip and redraw: 4033 Draw.now(out) 4034 elif not skip: 4035 Draw.now(f'{cls.background}{banner}{out}') 4036 skip = redraw = False 4037 4038 if Key.input_wait(Timer.left(), mouse=True): 4039 if Key.mouse_moved(): 4040 mx, my = Key.get_mouse() 4041 for name, pos in mouse_items.items(): 4042 if pos["x1"] <= mx <= pos["x2"] and pos["y1"] <= my <= pos["y2"]: 4043 mouse_over = True 4044 if name != menu_current: 4045 menu_current = name 4046 menu_index = menu_names.index(name) 4047 redraw = True 4048 break 4049 else: 4050 mouse_over = False 4051 else: 4052 key = Key.get() 4053 4054 if key == "mouse_click" and not mouse_over: 4055 key = "M" 4056 4057 if key == "q": 4058 clean_quit() 4059 elif key in ["escape", "M"]: 4060 cls.close = True 4061 break 4062 elif key in ["up", "mouse_scroll_up", "shift_tab"]: 4063 menu_index -= 1 4064 if menu_index < 0: menu_index = len(menu_names) - 1 4065 menu_current = menu_names[menu_index] 4066 redraw = True 4067 elif key in ["down", "mouse_scroll_down", "tab"]: 4068 menu_index += 1 4069 if menu_index > len(menu_names) - 1: menu_index = 0 4070 menu_current = menu_names[menu_index] 4071 redraw = True 4072 elif key == "enter" or (key == "mouse_click" and mouse_over): 4073 if menu_current == "quit": 4074 clean_quit() 4075 elif menu_current == "options": 4076 cls.options() 4077 cls.resized = True 4078 elif menu_current == "help": 4079 cls.help() 4080 cls.resized = True 4081 4082 if Timer.not_zero() and not cls.resized: 4083 skip = True 4084 else: 4085 Collector.collect() 4086 Collector.collect_done.wait(2) 4087 if CONFIG.background_update: cls.background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' 4088 Timer.stamp() 4089 4090 4091 Draw.now(f'{Draw.saved_buffer()}') 4092 cls.background = "" 4093 cls.active = False 4094 cls.close = False 4095 4096 @classmethod 4097 def help(cls): 4098 if Term.width < 80 or Term.height < 24: 4099 errlog.warning(f'The menu system only works on a terminal size of 80x24 or above!') 4100 return 4101 out: str = "" 4102 out_misc : str = "" 4103 redraw: bool = True 4104 key: str = "" 4105 skip: bool = False 4106 main_active: bool = cls.active 4107 cls.active = True 4108 cls.resized = True 4109 if not cls.background: 4110 cls.background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' 4111 help_items: Dict[str, str] = { 4112 "(Mouse 1)" : "Clicks buttons and selects in process list.", 4113 "Selected (Mouse 1)" : "Show detailed information for selected process.", 4114 "(Mouse scroll)" : "Scrolls any scrollable list/text under cursor.", 4115 "(Esc, shift+m)" : "Toggles main menu.", 4116 "(m)" : "Cycle view presets, order: full->proc->stat->user.", 4117 "(1)" : "Toggle CPU box.", 4118 "(2)" : "Toggle MEM box.", 4119 "(3)" : "Toggle NET box.", 4120 "(4)" : "Toggle PROC box.", 4121 "(d)" : "Toggle disks view in MEM box.", 4122 "(F2, o)" : "Shows options.", 4123 "(F1, shift+h)" : "Shows this window.", 4124 "(ctrl+z)" : "Sleep program and put in background.", 4125 "(ctrl+c, q)" : "Quits program.", 4126 "(+) / (-)" : "Add/Subtract 100ms to/from update timer.", 4127 "(Up, k) (Down, j)" : "Select in process list.", 4128 "(Enter)" : "Show detailed information for selected process.", 4129 "(Spacebar)" : "Expand/collapse the selected process in tree view.", 4130 "(Pg Up) (Pg Down)" : "Jump 1 page in process list.", 4131 "(Home) (End)" : "Jump to first or last page in process list.", 4132 "(Left, h) (Right, l)" : "Select previous/next sorting column.", 4133 "(b) (n)" : "Select previous/next network device.", 4134 "(s)" : "Toggle showing swap as a disk.", 4135 "(i)" : "Toggle disks io mode with big graphs.", 4136 "(z)" : "Toggle totals reset for current network device", 4137 "(a)" : "Toggle auto scaling for the network graphs.", 4138 "(y)" : "Toggle synced scaling mode for network graphs.", 4139 "(f)" : "Input a NON case-sensitive process filter.", 4140 "(shift+f)" : "Input a case-sensitive process filter.", 4141 "(c)" : "Toggle per-core cpu usage of processes.", 4142 "(r)" : "Reverse sorting order in processes box.", 4143 "(e)" : "Toggle processes tree view.", 4144 "(delete)" : "Clear any entered filter.", 4145 "Selected (shift+t)" : "Terminate selected process with SIGTERM - 15.", 4146 "Selected (shift+k)" : "Kill selected process with SIGKILL - 9.", 4147 "Selected (shift+i)" : "Interrupt selected process with SIGINT - 2.", 4148 "_1" : " ", 4149 "_2" : "For bug reporting and project updates, visit:", 4150 "_3" : "https://github.com/aristocratos/bpytop", 4151 } 4152 4153 while not cls.close: 4154 key = "" 4155 if cls.resized: 4156 y = 8 if Term.height < len(help_items) + 10 else Term.height // 2 - len(help_items) // 2 + 4 4157 out_misc = (f'{Banner.draw(y-7, center=True)}{Mv.d(1)}{Mv.l(46)}{Colors.black_bg}{Colors.default}{Fx.b}← esc' 4158 f'{Mv.r(30)}{Fx.i}Version: {VERSION}{Fx.ui}{Fx.ub}{Term.bg}{Term.fg}') 4159 x = Term.width//2-36 4160 h, w = Term.height-2-y, 72 4161 if len(help_items) > h: 4162 pages = ceil(len(help_items) / h) 4163 else: 4164 h = len(help_items) 4165 pages = 0 4166 page = 1 4167 out_misc += create_box(x, y, w, h+3, "help", line_color=THEME.div_line) 4168 redraw = True 4169 cls.resized = False 4170 4171 if redraw: 4172 out = "" 4173 cy = 0 4174 if pages: 4175 out += (f'{Mv.to(y, x+56)}{THEME.div_line(Symbol.title_left)}{Fx.b}{THEME.title("pg")}{Fx.ub}{THEME.main_fg(Symbol.up)} {Fx.b}{THEME.title}{page}/{pages} ' 4176 f'pg{Fx.ub}{THEME.main_fg(Symbol.down)}{THEME.div_line(Symbol.title_right)}') 4177 out += f'{Mv.to(y+1, x+1)}{THEME.title}{Fx.b}{"Keys:":^20}Description:{THEME.main_fg}' 4178 for n, (keys, desc) in enumerate(help_items.items()): 4179 if pages and n < (page - 1) * h: continue 4180 out += f'{Mv.to(y+2+cy, x+1)}{Fx.b}{("" if keys.startswith("_") else keys):^20.20}{Fx.ub}{desc:50.50}' 4181 cy += 1 4182 if cy == h: break 4183 if cy < h: 4184 for i in range(h-cy): 4185 out += f'{Mv.to(y+2+cy+i, x+1)}{" " * (w-2)}' 4186 4187 if skip and redraw: 4188 Draw.now(out) 4189 elif not skip: 4190 Draw.now(f'{cls.background}{out_misc}{out}') 4191 skip = redraw = False 4192 4193 if Key.input_wait(Timer.left()): 4194 key = Key.get() 4195 4196 if key == "mouse_click": 4197 mx, my = Key.get_mouse() 4198 if x <= mx < x + w and y <= my < y + h + 3: 4199 if pages and my == y and x + 56 < mx < x + 61: 4200 key = "up" 4201 elif pages and my == y and x + 63 < mx < x + 68: 4202 key = "down" 4203 else: 4204 key = "escape" 4205 4206 if key == "q": 4207 clean_quit() 4208 elif key in ["escape", "M", "enter", "backspace", "H", "f1"]: 4209 cls.close = True 4210 break 4211 elif key in ["up", "mouse_scroll_up", "page_up"] and pages: 4212 page -= 1 4213 if page < 1: page = pages 4214 redraw = True 4215 elif key in ["down", "mouse_scroll_down", "page_down"] and pages: 4216 page += 1 4217 if page > pages: page = 1 4218 redraw = True 4219 4220 if Timer.not_zero() and not cls.resized: 4221 skip = True 4222 else: 4223 Collector.collect() 4224 Collector.collect_done.wait(2) 4225 if CONFIG.background_update: cls.background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' 4226 Timer.stamp() 4227 4228 if main_active: 4229 cls.close = False 4230 return 4231 Draw.now(f'{Draw.saved_buffer()}') 4232 cls.background = "" 4233 cls.active = False 4234 cls.close = False 4235 4236 @classmethod 4237 def options(cls): 4238 if Term.width < 80 or Term.height < 24: 4239 errlog.warning(f'The menu system only works on a terminal size of 80x24 or above!') 4240 return 4241 out: str = "" 4242 out_misc : str = "" 4243 redraw: bool = True 4244 selected_cat: str = "" 4245 selected_int: int = 0 4246 option_items: Dict[str, List[str]] = {} 4247 cat_list: List[str] = [] 4248 cat_int: int = 0 4249 change_cat: bool = False 4250 key: str = "" 4251 skip: bool = False 4252 main_active: bool = cls.active 4253 cls.active = True 4254 cls.resized = True 4255 d_quote: str 4256 inputting: bool = False 4257 input_val: str = "" 4258 Theme.refresh() 4259 if not cls.background: 4260 cls.background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' 4261 categories: Dict[str, Dict[str, List[str]]] = { 4262 "system" : { 4263 "color_theme" : [ 4264 'Set color theme.', 4265 '', 4266 'Choose from all theme files in', 4267 '"/usr/[local/]share/bpytop/themes" and', 4268 '"~/.config/bpytop/themes".', 4269 '', 4270 '"Default" for builtin default theme.', 4271 'User themes are prefixed by a plus sign "+".', 4272 '', 4273 'For theme updates see:', 4274 'https://github.com/aristocratos/bpytop'], 4275 "theme_background" : [ 4276 'If the theme set background should be shown.', 4277 '', 4278 'Set to False if you want terminal background', 4279 'transparency.'], 4280 "truecolor" : [ 4281 'Sets if 24-bit truecolor should be used.', 4282 '(Requires restart to take effect!)', 4283 '', 4284 'Will convert 24-bit colors to 256 color', 4285 '(6x6x6 color cube) if False.', 4286 '', 4287 'Set to False if your terminal doesn\'t have', 4288 'truecolor support and can\'t convert to', 4289 '256-color.'], 4290 "shown_boxes" : [ 4291 'Manually set which boxes to show.', 4292 '', 4293 'Available values are "cpu mem net proc".', 4294 'Seperate values with whitespace.', 4295 '', 4296 'Toggle between presets with mode key "m".'], 4297 "update_ms" : [ 4298 'Update time in milliseconds.', 4299 '', 4300 'Recommended 2000 ms or above for better sample', 4301 'times for graphs.', 4302 '', 4303 'Min value: 100 ms', 4304 'Max value: 86400000 ms = 24 hours.'], 4305 "draw_clock" : [ 4306 'Draw a clock at top of screen.', 4307 '(Only visible if cpu box is enabled!)', 4308 '', 4309 'Formatting according to strftime, empty', 4310 'string to disable.', 4311 '', 4312 'Custom formatting options:', 4313 '"/host" = hostname', 4314 '"/user" = username', 4315 '"/uptime" = system uptime', 4316 '', 4317 'Examples of strftime formats:', 4318 '"%X" = locale HH:MM:SS', 4319 '"%H" = 24h hour, "%I" = 12h hour', 4320 '"%M" = minute, "%S" = second', 4321 '"%d" = day, "%m" = month, "%y" = year'], 4322 "background_update" : [ 4323 'Update main ui when menus are showing.', 4324 '', 4325 'True or False.', 4326 '', 4327 'Set this to false if the menus is flickering', 4328 'too much for a comfortable experience.'], 4329 "show_battery" : [ 4330 'Show battery stats.', 4331 '(Only visible if cpu box is enabled!)', 4332 '', 4333 'Show battery stats in the top right corner', 4334 'if a battery is present.'], 4335 "show_init" : [ 4336 'Show init screen at startup.', 4337 '', 4338 'The init screen is purely cosmetical and', 4339 'slows down start to show status messages.'], 4340 "update_check" : [ 4341 'Check for updates at start.', 4342 '', 4343 'Checks for latest version from:', 4344 'https://github.com/aristocratos/bpytop'], 4345 "log_level" : [ 4346 'Set loglevel for error.log', 4347 '', 4348 'Levels are: "ERROR" "WARNING" "INFO" "DEBUG".', 4349 'The level set includes all lower levels,', 4350 'i.e. "DEBUG" will show all logging info.'] 4351 }, 4352 "cpu" : { 4353 "cpu_graph_upper" : [ 4354 'Sets the CPU stat shown in upper half of', 4355 'the CPU graph.', 4356 '', 4357 '"total" = Total cpu usage.', 4358 '"user" = User mode cpu usage.', 4359 '"system" = Kernel mode cpu usage.', 4360 'See:', 4361 'https://psutil.readthedocs.io/en/latest/', 4362 '#psutil.cpu_times', 4363 'for attributes available on specific platforms.'], 4364 "cpu_graph_lower" : [ 4365 'Sets the CPU stat shown in lower half of', 4366 'the CPU graph.', 4367 '', 4368 '"total" = Total cpu usage.', 4369 '"user" = User mode cpu usage.', 4370 '"system" = Kernel mode cpu usage.', 4371 'See:', 4372 'https://psutil.readthedocs.io/en/latest/', 4373 '#psutil.cpu_times', 4374 'for attributes available on specific platforms.'], 4375 "cpu_invert_lower" : [ 4376 'Toggles orientation of the lower CPU graph.', 4377 '', 4378 'True or False.'], 4379 "cpu_single_graph" : [ 4380 'Completely disable the lower CPU graph.', 4381 '', 4382 'Shows only upper CPU graph and resizes it', 4383 'to fit to box height.', 4384 '', 4385 'True or False.'], 4386 "check_temp" : [ 4387 'Enable cpu temperature reporting.', 4388 '', 4389 'True or False.'], 4390 "cpu_sensor" : [ 4391 'Cpu temperature sensor', 4392 '', 4393 'Select the sensor that corresponds to', 4394 'your cpu temperature.', 4395 'Set to "Auto" for auto detection.'], 4396 "show_coretemp" : [ 4397 'Show temperatures for cpu cores.', 4398 '', 4399 'Only works if check_temp is True and', 4400 'the system is reporting core temps.'], 4401 "temp_scale" : [ 4402 'Which temperature scale to use.', 4403 '', 4404 'Celsius, default scale.', 4405 '', 4406 'Fahrenheit, the american one.', 4407 '', 4408 'Kelvin, 0 = absolute zero, 1 degree change', 4409 'equals 1 degree change in Celsius.', 4410 '', 4411 'Rankine, 0 = abosulte zero, 1 degree change', 4412 'equals 1 degree change in Fahrenheit.'], 4413 "show_cpu_freq" : [ 4414 'Show CPU frequency', 4415 '', 4416 'Can cause slowdowns on systems with many', 4417 'cores and psutil versions below 5.8.1'], 4418 "custom_cpu_name" : [ 4419 'Custom cpu model name in cpu percentage box.', 4420 '', 4421 'Empty string to disable.'], 4422 "show_uptime" : [ 4423 'Shows the system uptime in the CPU box.', 4424 '', 4425 'Can also be shown in the clock by using', 4426 '"/uptime" in the formatting.', 4427 '', 4428 'True or False.'], 4429 }, 4430 "mem" : { 4431 "mem_graphs" : [ 4432 'Show graphs for memory values.', 4433 '', 4434 'True or False.'], 4435 "show_disks" : [ 4436 'Split memory box to also show disks.', 4437 '', 4438 'True or False.'], 4439 "show_io_stat" : [ 4440 'Toggle small IO stat graphs.', 4441 '', 4442 'Toggles the small IO graphs for the regular', 4443 'disk usage view.', 4444 '', 4445 'True or False.'], 4446 "io_mode" : [ 4447 'Toggles io mode for disks.', 4448 '', 4449 'Shows big graphs for disk read/write speeds', 4450 'instead of used/free percentage meters.', 4451 '', 4452 'True or False.'], 4453 "io_graph_combined" : [ 4454 'Toggle combined read and write graphs.', 4455 '', 4456 'Only has effect if "io mode" is True.', 4457 '', 4458 'True or False.'], 4459 "io_graph_speeds" : [ 4460 'Set top speeds for the io graphs.', 4461 '', 4462 'Manually set which speed in MiB/s that equals', 4463 '100 percent in the io graphs.', 4464 '(10 MiB/s by default).', 4465 '', 4466 'Format: "device:speed" seperate disks with a', 4467 'comma ",".', 4468 '', 4469 'Example: "/dev/sda:100, /dev/sdb:20".'], 4470 "show_swap" : [ 4471 'If swap memory should be shown in memory box.', 4472 '', 4473 'True or False.'], 4474 "swap_disk" : [ 4475 'Show swap as a disk.', 4476 '', 4477 'Ignores show_swap value above.', 4478 'Inserts itself after first disk.'], 4479 "only_physical" : [ 4480 'Filter out non physical disks.', 4481 '', 4482 'Set this to False to include network disks,', 4483 'RAM disks and similar.', 4484 '', 4485 'True or False.'], 4486 "use_fstab" : [ 4487 'Read disks list from /etc/fstab.', 4488 '(Has no effect on macOS X)', 4489 '', 4490 'This also disables only_physical.', 4491 '', 4492 'True or False.'], 4493 "disks_filter" : [ 4494 'Optional filter for shown disks.', 4495 '', 4496 'Should be full path of a mountpoint,', 4497 '"root" replaces "/", separate multiple values', 4498 'with a comma ",".', 4499 'Begin line with "exclude=" to change to exclude', 4500 'filter.', 4501 'Oterwise defaults to "most include" filter.', 4502 '', 4503 'Example: disks_filter="exclude=/boot, /home/user"'], 4504 }, 4505 "net" : { 4506 "net_download" : [ 4507 'Fixed network graph download value.', 4508 '', 4509 'Default "10M" = 10 MibiBytes.', 4510 'Possible units:', 4511 '"K" (KiB), "M" (MiB), "G" (GiB).', 4512 '', 4513 'Append "bit" for bits instead of bytes,', 4514 'i.e "100Mbit"', 4515 '', 4516 'Can be toggled with auto button.'], 4517 "net_upload" : [ 4518 'Fixed network graph upload value.', 4519 '', 4520 'Default "10M" = 10 MibiBytes.', 4521 'Possible units:', 4522 '"K" (KiB), "M" (MiB), "G" (GiB).', 4523 '', 4524 'Append "bit" for bits instead of bytes,', 4525 'i.e "100Mbit"', 4526 '', 4527 'Can be toggled with auto button.'], 4528 "net_auto" : [ 4529 'Start in network graphs auto rescaling mode.', 4530 '', 4531 'Ignores any values set above at start and', 4532 'rescales down to 10KibiBytes at the lowest.', 4533 '', 4534 'True or False.'], 4535 "net_sync" : [ 4536 'Network scale sync.', 4537 '', 4538 'Syncs the scaling for download and upload to', 4539 'whichever currently has the highest scale.', 4540 '', 4541 'True or False.'], 4542 "net_color_fixed" : [ 4543 'Set network graphs color gradient to fixed.', 4544 '', 4545 'If True the network graphs color is based', 4546 'on the total bandwidth usage instead of', 4547 'the current autoscaling.', 4548 '', 4549 'The bandwidth usage is based on the', 4550 '"net_download" and "net_upload" values set', 4551 'above.'], 4552 "net_iface" : [ 4553 'Network Interface.', 4554 '', 4555 'Manually set the starting Network Interface.', 4556 'Will otherwise automatically choose the NIC', 4557 'with the highest total download since boot.'], 4558 }, 4559 "proc" : { 4560 "proc_update_mult" : [ 4561 'Processes update multiplier.', 4562 'Sets how often the process list is updated as', 4563 'a multiplier of "update_ms".', 4564 '', 4565 'Set to 2 or higher to greatly decrease bpytop', 4566 'cpu usage. (Only integers)'], 4567 "proc_sorting" : [ 4568 'Processes sorting option.', 4569 '', 4570 'Possible values: "pid", "program", "arguments",', 4571 '"threads", "user", "memory", "cpu lazy" and', 4572 '"cpu responsive".', 4573 '', 4574 '"cpu lazy" updates top process over time,', 4575 '"cpu responsive" updates top process directly.'], 4576 "proc_reversed" : [ 4577 'Reverse processes sorting order.', 4578 '', 4579 'True or False.'], 4580 "proc_tree" : [ 4581 'Processes tree view.', 4582 '', 4583 'Set true to show processes grouped by parents,', 4584 'with lines drawn between parent and child', 4585 'process.'], 4586 "tree_depth" : [ 4587 'Process tree auto collapse depth.', 4588 '', 4589 'Sets the depth where the tree view will auto', 4590 'collapse processes at.'], 4591 "proc_colors" : [ 4592 'Enable colors in process view.', 4593 '', 4594 'Uses the cpu graph gradient colors.'], 4595 "proc_gradient" : [ 4596 'Enable process view gradient fade.', 4597 '', 4598 'Fades from top or current selection.', 4599 'Max fade value is equal to current themes', 4600 '"inactive_fg" color value.'], 4601 "proc_per_core" : [ 4602 'Process usage per core.', 4603 '', 4604 'If process cpu usage should be of the core', 4605 'it\'s running on or usage of the total', 4606 'available cpu power.', 4607 '', 4608 'If true and process is multithreaded', 4609 'cpu usage can reach over 100%.'], 4610 "proc_mem_bytes" : [ 4611 'Show memory as bytes in process list.', 4612 ' ', 4613 'True or False.'], 4614 } 4615 } 4616 4617 loglevel_i: int = CONFIG.log_levels.index(CONFIG.log_level) 4618 cpu_sensor_i: int = CONFIG.cpu_sensors.index(CONFIG.cpu_sensor) 4619 cpu_graph_i: Dict[str, int] = { "cpu_graph_upper" : CONFIG.cpu_percent_fields.index(CONFIG.cpu_graph_upper), 4620 "cpu_graph_lower" : CONFIG.cpu_percent_fields.index(CONFIG.cpu_graph_lower)} 4621 temp_scale_i: int = CONFIG.temp_scales.index(CONFIG.temp_scale) 4622 color_i: int 4623 max_opt_len: int = max([len(categories[x]) for x in categories]) * 2 4624 cat_list = list(categories) 4625 while not cls.close: 4626 key = "" 4627 if cls.resized or change_cat: 4628 cls.resized = change_cat = False 4629 selected_cat = list(categories)[cat_int] 4630 option_items = categories[cat_list[cat_int]] 4631 option_len: int = len(option_items) * 2 4632 y = 12 if Term.height < max_opt_len + 13 else Term.height // 2 - max_opt_len // 2 + 7 4633 out_misc = (f'{Banner.draw(y-10, center=True)}{Mv.d(1)}{Mv.l(46)}{Colors.black_bg}{Colors.default}{Fx.b}← esc' 4634 f'{Mv.r(30)}{Fx.i}Version: {VERSION}{Fx.ui}{Fx.ub}{Term.bg}{Term.fg}') 4635 x = Term.width//2-38 4636 x2 = x + 27 4637 h, w, w2 = min(Term.height-1-y, option_len), 26, 50 4638 h -= h % 2 4639 color_i = list(Theme.themes).index(THEME.current) 4640 out_misc += create_box(x, y - 3, w+w2+1, 3, f'tab{Symbol.right}', line_color=THEME.div_line) 4641 out_misc += create_box(x, y, w, h+2, "options", line_color=THEME.div_line) 4642 redraw = True 4643 4644 cat_width = floor((w+w2) / len(categories)) 4645 out_misc += f'{Fx.b}' 4646 for cx, cat in enumerate(categories): 4647 out_misc += f'{Mv.to(y-2, x + 1 + (cat_width * cx) + round(cat_width / 2 - len(cat) / 2 ))}' 4648 if cat == selected_cat: 4649 out_misc += f'{THEME.hi_fg}[{THEME.title}{Fx.u}{cat}{Fx.uu}{THEME.hi_fg}]' 4650 else: 4651 out_misc += f'{THEME.hi_fg}{SUPERSCRIPT[cx+1]}{THEME.title}{cat}' 4652 out_misc += f'{Fx.ub}' 4653 if option_len > h: 4654 pages = ceil(option_len / h) 4655 else: 4656 h = option_len 4657 pages = 0 4658 page = pages if selected_int == -1 and pages > 0 else 1 4659 selected_int = 0 if selected_int >= 0 else len(option_items) - 1 4660 if redraw: 4661 out = "" 4662 cy = 0 4663 4664 selected = list(option_items)[selected_int] 4665 if pages: 4666 out += (f'{Mv.to(y+h+1, x+11)}{THEME.div_line(Symbol.title_left)}{Fx.b}{THEME.title("pg")}{Fx.ub}{THEME.main_fg(Symbol.up)} {Fx.b}{THEME.title}{page}/{pages} ' 4667 f'pg{Fx.ub}{THEME.main_fg(Symbol.down)}{THEME.div_line(Symbol.title_right)}') 4668 #out += f'{Mv.to(y+1, x+1)}{THEME.title}{Fx.b}{"Keys:":^20}Description:{THEME.main_fg}' 4669 for n, opt in enumerate(option_items): 4670 if pages and n < (page - 1) * ceil(h / 2): continue 4671 value = getattr(CONFIG, opt) 4672 t_color = f'{THEME.selected_bg}{THEME.selected_fg}' if opt == selected else f'{THEME.title}' 4673 v_color = "" if opt == selected else f'{THEME.title}' 4674 d_quote = '"' if isinstance(value, str) else "" 4675 if opt == "color_theme": 4676 counter = f' {color_i + 1}/{len(Theme.themes)}' 4677 elif opt == "proc_sorting": 4678 counter = f' {CONFIG.sorting_options.index(CONFIG.proc_sorting) + 1}/{len(CONFIG.sorting_options)}' 4679 elif opt == "log_level": 4680 counter = f' {loglevel_i + 1}/{len(CONFIG.log_levels)}' 4681 elif opt == "cpu_sensor": 4682 counter = f' {cpu_sensor_i + 1}/{len(CONFIG.cpu_sensors)}' 4683 elif opt in ["cpu_graph_upper", "cpu_graph_lower"]: 4684 counter = f' {cpu_graph_i[opt] + 1}/{len(CONFIG.cpu_percent_fields)}' 4685 elif opt == "temp_scale": 4686 counter = f' {temp_scale_i + 1}/{len(CONFIG.temp_scales)}' 4687 else: 4688 counter = "" 4689 out += f'{Mv.to(y+1+cy, x+1)}{t_color}{Fx.b}{opt.replace("_", " ").capitalize() + counter:^24.24}{Fx.ub}{Mv.to(y+2+cy, x+1)}{v_color}' 4690 if opt == selected: 4691 if isinstance(value, bool) or opt in ["color_theme", "proc_sorting", "log_level", "cpu_sensor", "cpu_graph_upper", "cpu_graph_lower", "temp_scale"]: 4692 out += f'{t_color} {Symbol.left}{v_color}{d_quote + str(value) + d_quote:^20.20}{t_color}{Symbol.right} ' 4693 elif inputting: 4694 out += f'{str(input_val)[-17:] + Fx.bl + "█" + Fx.ubl + "" + Symbol.enter:^33.33}' 4695 else: 4696 out += ((f'{t_color} {Symbol.left}{v_color}' if type(value) is int else " ") + 4697 f'{str(value) + " " + Symbol.enter:^20.20}' + (f'{t_color}{Symbol.right} ' if type(value) is int else " ")) 4698 else: 4699 out += f'{d_quote + str(value) + d_quote:^24.24}' 4700 out += f'{Term.bg}' 4701 if opt == selected: 4702 h2 = len(option_items[opt]) + 2 4703 y2 = y + (selected_int * 2) - ((page-1) * h) 4704 if y2 + h2 > Term.height: y2 = Term.height - h2 4705 out += f'{create_box(x2, y2, w2, h2, "description", line_color=THEME.div_line)}{THEME.main_fg}' 4706 for n, desc in enumerate(option_items[opt]): 4707 out += f'{Mv.to(y2+1+n, x2+2)}{desc:.48}' 4708 cy += 2 4709 if cy >= h: break 4710 if cy < h: 4711 for i in range(h-cy): 4712 out += f'{Mv.to(y+1+cy+i, x+1)}{" " * (w-2)}' 4713 4714 4715 if not skip or redraw: 4716 Draw.now(f'{cls.background}{out_misc}{out}') 4717 skip = redraw = False 4718 4719 if Key.input_wait(Timer.left()): 4720 key = Key.get() 4721 redraw = True 4722 has_sel = False 4723 if key == "mouse_click" and not inputting: 4724 mx, my = Key.get_mouse() 4725 if x < mx < x + w + w2 and y - 4 < my < y: 4726 # if my == y - 2: 4727 for cx, cat in enumerate(categories): 4728 ccx = x + (cat_width * cx) + round(cat_width / 2 - len(cat) / 2 ) 4729 if ccx - 2 < mx < ccx + 2 + len(cat): 4730 key = str(cx+1) 4731 break 4732 elif x < mx < x + w and y < my < y + h + 2: 4733 mouse_sel = ceil((my - y) / 2) - 1 + ceil((page-1) * (h / 2)) 4734 if pages and my == y+h+1 and x+11 < mx < x+16: 4735 key = "page_up" 4736 elif pages and my == y+h+1 and x+19 < mx < x+24: 4737 key = "page_down" 4738 elif my == y+h+1: 4739 pass 4740 elif mouse_sel == selected_int: 4741 if mx < x + 6: 4742 key = "left" 4743 elif mx > x + 19: 4744 key = "right" 4745 else: 4746 key = "enter" 4747 elif mouse_sel < len(option_items): 4748 selected_int = mouse_sel 4749 has_sel = True 4750 else: 4751 key = "escape" 4752 if inputting: 4753 if key in ["escape", "mouse_click"]: 4754 inputting = False 4755 elif key == "enter": 4756 inputting = False 4757 if str(getattr(CONFIG, selected)) != input_val: 4758 if selected == "update_ms": 4759 if not input_val or int(input_val) < 100: 4760 CONFIG.update_ms = 100 4761 elif int(input_val) > 86399900: 4762 CONFIG.update_ms = 86399900 4763 else: 4764 CONFIG.update_ms = int(input_val) 4765 elif selected == "proc_update_mult": 4766 if not input_val or int(input_val) < 1: 4767 CONFIG.proc_update_mult = 1 4768 else: 4769 CONFIG.proc_update_mult = int(input_val) 4770 Collector.proc_counter = 1 4771 elif selected == "tree_depth": 4772 if not input_val or int(input_val) < 0: 4773 CONFIG.tree_depth = 0 4774 else: 4775 CONFIG.tree_depth = int(input_val) 4776 ProcCollector.collapsed = {} 4777 elif selected == "shown_boxes": 4778 new_boxes: List = [] 4779 for box in input_val.split(): 4780 if box in ["cpu", "mem", "net", "proc"]: 4781 new_boxes.append(box) 4782 CONFIG.shown_boxes = " ".join(new_boxes) 4783 Box.view_mode = "user" 4784 Box.view_modes["user"] = CONFIG.shown_boxes.split() 4785 Draw.clear(saved=True) 4786 elif isinstance(getattr(CONFIG, selected), str): 4787 setattr(CONFIG, selected, input_val) 4788 if selected.startswith("net_"): 4789 NetCollector.net_min = {"download" : -1, "upload" : -1} 4790 elif selected == "draw_clock": 4791 Box.clock_on = len(CONFIG.draw_clock) > 0 4792 if not Box.clock_on: Draw.clear("clock", saved=True) 4793 elif selected == "io_graph_speeds": 4794 MemBox.graph_speeds = {} 4795 Term.refresh(force=True) 4796 cls.resized = False 4797 elif key == "backspace" and len(input_val): 4798 input_val = input_val[:-1] 4799 elif key == "delete": 4800 input_val = "" 4801 elif isinstance(getattr(CONFIG, selected), str) and len(key) == 1: 4802 input_val += key 4803 elif isinstance(getattr(CONFIG, selected), int) and key.isdigit(): 4804 input_val += key 4805 elif key == "q": 4806 clean_quit() 4807 elif key in ["escape", "o", "M", "f2"]: 4808 cls.close = True 4809 break 4810 elif key == "tab" or (key == "down" and selected_int == len(option_items) - 1 and (page == pages or pages == 0)): 4811 if cat_int == len(categories) - 1: 4812 cat_int = 0 4813 else: 4814 cat_int += 1 4815 change_cat = True 4816 elif key == "shift_tab" or (key == "up" and selected_int == 0 and page == 1): 4817 if cat_int == 0: 4818 cat_int = len(categories) - 1 4819 else: 4820 cat_int -= 1 4821 change_cat = True 4822 selected_int = -1 if key != "shift_tab" else 0 4823 elif key in list(map(str, range(1, len(cat_list)+1))) and key != str(cat_int + 1): 4824 cat_int = int(key) - 1 4825 change_cat = True 4826 elif key == "enter" and selected in ["update_ms", "disks_filter", "custom_cpu_name", "net_download", 4827 "net_upload", "draw_clock", "tree_depth", "proc_update_mult", "shown_boxes", "net_iface", "io_graph_speeds"]: 4828 inputting = True 4829 input_val = str(getattr(CONFIG, selected)) 4830 elif key == "left" and selected == "update_ms" and CONFIG.update_ms - 100 >= 100: 4831 CONFIG.update_ms -= 100 4832 Box.draw_update_ms() 4833 elif key == "right" and selected == "update_ms" and CONFIG.update_ms + 100 <= 86399900: 4834 CONFIG.update_ms += 100 4835 Box.draw_update_ms() 4836 elif key == "left" and selected == "proc_update_mult" and CONFIG.proc_update_mult > 1: 4837 CONFIG.proc_update_mult -= 1 4838 Collector.proc_counter = 1 4839 elif key == "right" and selected == "proc_update_mult": 4840 CONFIG.proc_update_mult += 1 4841 Collector.proc_counter = 1 4842 elif key == "left" and selected == "tree_depth" and CONFIG.tree_depth > 0: 4843 CONFIG.tree_depth -= 1 4844 ProcCollector.collapsed = {} 4845 elif key == "right" and selected == "tree_depth": 4846 CONFIG.tree_depth += 1 4847 ProcCollector.collapsed = {} 4848 elif key in ["left", "right"] and isinstance(getattr(CONFIG, selected), bool): 4849 setattr(CONFIG, selected, not getattr(CONFIG, selected)) 4850 if selected == "check_temp": 4851 if CONFIG.check_temp: 4852 CpuCollector.get_sensors() 4853 else: 4854 CpuCollector.sensor_method = "" 4855 CpuCollector.got_sensors = False 4856 if selected in ["net_auto", "net_color_fixed", "net_sync"]: 4857 if selected == "net_auto": NetCollector.auto_min = CONFIG.net_auto 4858 NetBox.redraw = True 4859 if selected == "theme_background": 4860 Term.bg = f'{THEME.main_bg}' if CONFIG.theme_background else "\033[49m" 4861 Draw.now(Term.bg) 4862 if selected == "show_battery": 4863 Draw.clear("battery", saved=True) 4864 Term.refresh(force=True) 4865 cls.resized = False 4866 elif key in ["left", "right"] and selected == "color_theme" and len(Theme.themes) > 1: 4867 if key == "left": 4868 color_i -= 1 4869 if color_i < 0: color_i = len(Theme.themes) - 1 4870 elif key == "right": 4871 color_i += 1 4872 if color_i > len(Theme.themes) - 1: color_i = 0 4873 Collector.collect_idle.wait() 4874 CONFIG.color_theme = list(Theme.themes)[color_i] 4875 THEME(CONFIG.color_theme) 4876 Term.refresh(force=True) 4877 Timer.finish() 4878 elif key in ["left", "right"] and selected == "proc_sorting": 4879 ProcCollector.sorting(key) 4880 elif key in ["left", "right"] and selected == "log_level": 4881 if key == "left": 4882 loglevel_i -= 1 4883 if loglevel_i < 0: loglevel_i = len(CONFIG.log_levels) - 1 4884 elif key == "right": 4885 loglevel_i += 1 4886 if loglevel_i > len(CONFIG.log_levels) - 1: loglevel_i = 0 4887 CONFIG.log_level = CONFIG.log_levels[loglevel_i] 4888 errlog.setLevel(getattr(logging, CONFIG.log_level)) 4889 errlog.info(f'Loglevel set to {CONFIG.log_level}') 4890 elif key in ["left", "right"] and selected in ["cpu_graph_upper", "cpu_graph_lower"]: 4891 if key == "left": 4892 cpu_graph_i[selected] -= 1 4893 if cpu_graph_i[selected] < 0: cpu_graph_i[selected] = len(CONFIG.cpu_percent_fields) - 1 4894 if key == "right": 4895 cpu_graph_i[selected] += 1 4896 if cpu_graph_i[selected] > len(CONFIG.cpu_percent_fields) - 1: cpu_graph_i[selected] = 0 4897 setattr(CONFIG, selected, CONFIG.cpu_percent_fields[cpu_graph_i[selected]]) 4898 setattr(CpuCollector, selected.replace("_graph", ""), []) 4899 Term.refresh(force=True) 4900 cls.resized = False 4901 elif key in ["left", "right"] and selected == "temp_scale": 4902 if key == "left": 4903 temp_scale_i -= 1 4904 if temp_scale_i < 0: temp_scale_i = len(CONFIG.temp_scales) - 1 4905 if key == "right": 4906 temp_scale_i += 1 4907 if temp_scale_i > len(CONFIG.temp_scales) - 1: temp_scale_i = 0 4908 CONFIG.temp_scale = CONFIG.temp_scales[temp_scale_i] 4909 Term.refresh(force=True) 4910 cls.resized = False 4911 elif key in ["left", "right"] and selected == "cpu_sensor" and len(CONFIG.cpu_sensors) > 1: 4912 if key == "left": 4913 cpu_sensor_i -= 1 4914 if cpu_sensor_i < 0: cpu_sensor_i = len(CONFIG.cpu_sensors) - 1 4915 elif key == "right": 4916 cpu_sensor_i += 1 4917 if cpu_sensor_i > len(CONFIG.cpu_sensors) - 1: cpu_sensor_i = 0 4918 Collector.collect_idle.wait() 4919 CpuCollector.sensor_swap = True 4920 CONFIG.cpu_sensor = CONFIG.cpu_sensors[cpu_sensor_i] 4921 if CONFIG.check_temp and (CpuCollector.sensor_method != "psutil" or CONFIG.cpu_sensor == "Auto"): 4922 CpuCollector.get_sensors() 4923 Term.refresh(force=True) 4924 cls.resized = False 4925 elif key in ["up", "mouse_scroll_up"]: 4926 selected_int -= 1 4927 if selected_int < 0: selected_int = len(option_items) - 1 4928 page = floor(selected_int * 2 / h) + 1 4929 elif key in ["down", "mouse_scroll_down"]: 4930 selected_int += 1 4931 if selected_int > len(option_items) - 1: selected_int = 0 4932 page = floor(selected_int * 2 / h) + 1 4933 elif key == "page_up": 4934 if not pages or page == 1: 4935 selected_int = 0 4936 else: 4937 page -= 1 4938 if page < 1: page = pages 4939 selected_int = (page-1) * ceil(h / 2) 4940 elif key == "page_down": 4941 if not pages or page == pages: 4942 selected_int = len(option_items) - 1 4943 else: 4944 page += 1 4945 if page > pages: page = 1 4946 selected_int = (page-1) * ceil(h / 2) 4947 elif has_sel: 4948 pass 4949 else: 4950 redraw = False 4951 4952 if Timer.not_zero() and not cls.resized: 4953 skip = True 4954 else: 4955 Collector.collect() 4956 Collector.collect_done.wait(2) 4957 if CONFIG.background_update: cls.background = f'{THEME.inactive_fg}' + Fx.uncolor(f'{Draw.saved_buffer()}') + f'{Term.fg}' 4958 Timer.stamp() 4959 4960 if main_active: 4961 cls.close = False 4962 return 4963 Draw.now(f'{Draw.saved_buffer()}') 4964 cls.background = "" 4965 cls.active = False 4966 cls.close = False 4967 4968class Timer: 4969 timestamp: float 4970 return_zero = False 4971 4972 @classmethod 4973 def stamp(cls): 4974 cls.timestamp = time() 4975 4976 @classmethod 4977 def not_zero(cls) -> bool: 4978 if cls.return_zero: 4979 cls.return_zero = False 4980 return False 4981 return cls.timestamp + (CONFIG.update_ms / 1000) > time() 4982 4983 @classmethod 4984 def left(cls) -> float: 4985 t_left: float = cls.timestamp + (CONFIG.update_ms / 1000) - time() 4986 if t_left > CONFIG.update_ms / 1000: 4987 cls.stamp() 4988 return CONFIG.update_ms / 1000 4989 return t_left 4990 4991 @classmethod 4992 def finish(cls): 4993 cls.return_zero = True 4994 cls.timestamp = time() - (CONFIG.update_ms / 1000) 4995 Key.break_wait() 4996 4997class UpdateChecker: 4998 version: str = VERSION 4999 thread: threading.Thread 5000 5001 @classmethod 5002 def run(cls): 5003 cls.thread = threading.Thread(target=cls._checker) 5004 cls.thread.start() 5005 5006 @classmethod 5007 def _checker(cls): 5008 try: 5009 with urllib.request.urlopen("https://github.com/aristocratos/bpytop/raw/master/bpytop.py", timeout=5) as source: # type: ignore 5010 for line in source: 5011 line = line.decode("utf-8") 5012 if line.startswith("VERSION: str ="): 5013 cls.version = line[(line.index("=")+1):].strip('" \n') 5014 break 5015 except Exception as e: 5016 errlog.exception(f'{e}') 5017 else: 5018 if cls.version != VERSION and which("notify-send"): 5019 try: 5020 subprocess.run(["notify-send", "-u", "normal", "BpyTop Update!", 5021 f'New version of BpyTop available!\nCurrent version: {VERSION}\nNew version: {cls.version}\nDownload at github.com/aristocratos/bpytop', 5022 "-i", "update-notifier", "-t", "10000"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 5023 except Exception as e: 5024 errlog.exception(f'{e}') 5025 5026class Init: 5027 running: bool = True 5028 initbg_colors: List[str] = [] 5029 initbg_data: List[int] 5030 initbg_up: Graph 5031 initbg_down: Graph 5032 resized = False 5033 5034 @classmethod 5035 def start(cls): 5036 Draw.buffer("init", z=1) 5037 Draw.buffer("initbg", z=10) 5038 for i in range(51): 5039 for _ in range(2): cls.initbg_colors.append(Color.fg(i, i, i)) 5040 Draw.buffer("banner", (f'{Banner.draw(Term.height // 2 - 10, center=True)}{Mv.d(1)}{Mv.l(11)}{Colors.black_bg}{Colors.default}' 5041 f'{Fx.b}{Fx.i}Version: {VERSION}{Fx.ui}{Fx.ub}{Term.bg}{Term.fg}{Color.fg("#50")}'), z=2) 5042 for _i in range(7): 5043 perc = f'{str(round((_i + 1) * 14 + 2)) + "%":>5}' 5044 Draw.buffer("+banner", f'{Mv.to(Term.height // 2 - 2 + _i, Term.width // 2 - 28)}{Fx.trans(perc)}{Symbol.v_line}') 5045 5046 Draw.out("banner") 5047 Draw.buffer("+init!", f'{Color.fg("#cc")}{Fx.b}{Mv.to(Term.height // 2 - 2, Term.width // 2 - 21)}{Mv.save}') 5048 5049 cls.initbg_data = [randint(0, 100) for _ in range(Term.width * 2)] 5050 cls.initbg_up = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=True) 5051 cls.initbg_down = Graph(Term.width, Term.height // 2, cls.initbg_colors, cls.initbg_data, invert=False) 5052 5053 @classmethod 5054 def success(cls): 5055 if not CONFIG.show_init or cls.resized: return 5056 cls.draw_bg(5) 5057 Draw.buffer("+init!", f'{Mv.restore}{Symbol.ok}\n{Mv.r(Term.width // 2 - 22)}{Mv.save}') 5058 5059 @staticmethod 5060 def fail(err): 5061 if CONFIG.show_init: 5062 Draw.buffer("+init!", f'{Mv.restore}{Symbol.fail}') 5063 sleep(2) 5064 errlog.exception(f'{err}') 5065 clean_quit(1, errmsg=f'Error during init! See {CONFIG_DIR}/error.log for more information.') 5066 5067 @classmethod 5068 def draw_bg(cls, times: int = 5): 5069 for _ in range(times): 5070 sleep(0.05) 5071 x = randint(0, 100) 5072 Draw.buffer("initbg", f'{Fx.ub}{Mv.to(0, 0)}{cls.initbg_up(x)}{Mv.to(Term.height // 2, 0)}{cls.initbg_down(x)}') 5073 Draw.out("initbg", "banner", "init") 5074 5075 @classmethod 5076 def done(cls): 5077 cls.running = False 5078 if not CONFIG.show_init: return 5079 if cls.resized: 5080 Draw.now(Term.clear) 5081 else: 5082 cls.draw_bg(10) 5083 Draw.clear("initbg", "banner", "init", saved=True) 5084 if cls.resized: return 5085 del cls.initbg_up, cls.initbg_down, cls.initbg_data, cls.initbg_colors 5086 5087 5088#? Functions -------------------------------------------------------------------------------------> 5089 5090def get_cpu_name() -> str: 5091 '''Fetch a suitable CPU identifier from the CPU model name string''' 5092 name: str = "" 5093 nlist: List = [] 5094 command: str = "" 5095 cmd_out: str = "" 5096 rem_line: str = "" 5097 if SYSTEM == "Linux": 5098 command = "cat /proc/cpuinfo" 5099 rem_line = "model name" 5100 elif SYSTEM == "MacOS": 5101 command ="sysctl -n machdep.cpu.brand_string" 5102 elif SYSTEM == "BSD": 5103 command ="sysctl hw.model" 5104 rem_line = "hw.model" 5105 5106 try: 5107 cmd_out = subprocess.check_output("LANG=C " + command, shell=True, universal_newlines=True) 5108 except: 5109 pass 5110 if rem_line: 5111 for line in cmd_out.split("\n"): 5112 if rem_line in line: 5113 name = re.sub( ".*" + rem_line + ".*:", "", line,1).lstrip() 5114 else: 5115 name = cmd_out 5116 nlist = name.split(" ") 5117 try: 5118 if "Xeon" in name and "CPU" in name: 5119 name = nlist[nlist.index("CPU")+(-1 if name.endswith(("CPU", "z")) else 1)] 5120 elif "Ryzen" in name: 5121 name = " ".join(nlist[nlist.index("Ryzen"):nlist.index("Ryzen")+3]) 5122 elif "Duo" in name and "@" in name: 5123 name = " ".join(nlist[:nlist.index("@")]) 5124 elif "CPU" in name and not nlist[0] == "CPU" and not nlist[nlist.index("CPU")-1].isdigit(): 5125 name = nlist[nlist.index("CPU")-1] 5126 except: 5127 pass 5128 5129 name = name.replace("Processor", "").replace("CPU", "").replace("(R)", "").replace("(TM)", "").replace("Intel", "") 5130 name = re.sub(r"\d?\.?\d+[mMgG][hH][zZ]", "", name) 5131 name = " ".join(name.split()) 5132 5133 return name 5134 5135def get_cpu_core_mapping() -> List[int]: 5136 mapping: List[int] = [] 5137 core_ids: List[int] = [] 5138 5139 if SYSTEM == "Linux" and os.path.isfile("/proc/cpuinfo"): 5140 try: 5141 mapping = [0] * THREADS 5142 num = 0 5143 with open("/proc/cpuinfo", "r") as f: 5144 for line in f: 5145 if line.startswith("processor"): 5146 num = int(line.strip()[(line.index(": ")+2):]) 5147 if num > THREADS - 1: 5148 break 5149 elif line.startswith("core id"): 5150 core_id = int(line.strip()[(line.index(": ")+2):]) 5151 if core_id not in core_ids: 5152 core_ids.append(core_id) 5153 mapping[num] = core_ids.index(core_id) 5154 if num < THREADS - 1: 5155 raise Exception 5156 except: 5157 mapping = [] 5158 5159 if not mapping: 5160 mapping = [] 5161 for _ in range(THREADS // CORES): 5162 mapping.extend([x for x in range(CORES)]) 5163 5164 return mapping 5165 5166def create_box(x: int = 0, y: int = 0, width: int = 0, height: int = 0, title: str = "", title2: str = "", line_color: Color = None, title_color: Color = None, fill: bool = True, box = None) -> str: 5167 '''Create a box from a box object or by given arguments''' 5168 out: str = f'{Term.fg}{Term.bg}' 5169 num: int = 0 5170 if not line_color: line_color = THEME.div_line 5171 if not title_color: title_color = THEME.title 5172 5173 #* Get values from box class if given 5174 if box: 5175 x = box.x 5176 y = box.y 5177 width = box.width 5178 height = box.height 5179 title = box.name 5180 num = box.num 5181 hlines: Tuple[int, int] = (y, y + height - 1) 5182 5183 out += f'{line_color}' 5184 5185 #* Draw all horizontal lines 5186 for hpos in hlines: 5187 out += f'{Mv.to(hpos, x)}{Symbol.h_line * (width - 1)}' 5188 5189 #* Draw all vertical lines and fill if enabled 5190 for hpos in range(hlines[0]+1, hlines[1]): 5191 out += f'{Mv.to(hpos, x)}{Symbol.v_line}{" " * (width-2) if fill else Mv.r(width-2)}{Symbol.v_line}' 5192 5193 #* Draw corners 5194 out += f'{Mv.to(y, x)}{Symbol.left_up}\ 5195 {Mv.to(y, x + width - 1)}{Symbol.right_up}\ 5196 {Mv.to(y + height - 1, x)}{Symbol.left_down}\ 5197 {Mv.to(y + height - 1, x + width - 1)}{Symbol.right_down}' 5198 5199 #* Draw titles if enabled 5200 if title: 5201 numbered: str = "" if not num else f'{THEME.hi_fg(SUPERSCRIPT[num])}' 5202 out += f'{Mv.to(y, x + 2)}{Symbol.title_left}{Fx.b}{numbered}{title_color}{title}{Fx.ub}{line_color}{Symbol.title_right}' 5203 if title2: 5204 out += f'{Mv.to(hlines[1], x + 2)}{Symbol.title_left}{title_color}{Fx.b}{title2}{Fx.ub}{line_color}{Symbol.title_right}' 5205 5206 return f'{out}{Term.fg}{Mv.to(y + 1, x + 1)}' 5207 5208def now_sleeping(signum, frame): 5209 """Reset terminal settings and stop background input read before putting to sleep""" 5210 Key.stop() 5211 Collector.stop() 5212 Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off, Term.mouse_direct_off, Term.title()) 5213 Term.echo(True) 5214 os.kill(os.getpid(), signal.SIGSTOP) 5215 5216def now_awake(signum, frame): 5217 """Set terminal settings and restart background input read""" 5218 Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor, Term.mouse_on, Term.title("BpyTOP")) 5219 Term.echo(False) 5220 Key.start() 5221 Term.refresh() 5222 Box.calc_sizes() 5223 Box.draw_bg() 5224 Collector.start() 5225 5226def quit_sigint(signum, frame): 5227 """SIGINT redirection to clean_quit()""" 5228 clean_quit() 5229 5230def clean_quit(errcode: int = 0, errmsg: str = "", thread: bool = False): 5231 """Stop background input read, save current config and reset terminal settings before quitting""" 5232 global THREAD_ERROR 5233 if thread: 5234 THREAD_ERROR = errcode 5235 interrupt_main() 5236 return 5237 if THREAD_ERROR: errcode = THREAD_ERROR 5238 Key.stop() 5239 Collector.stop() 5240 if not errcode: CONFIG.save_config() 5241 Draw.now(Term.clear, Term.normal_screen, Term.show_cursor, Term.mouse_off, Term.mouse_direct_off, Term.title()) 5242 Term.echo(True) 5243 if errcode == 0: 5244 errlog.info(f'Exiting. Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n') 5245 else: 5246 errlog.warning(f'Exiting with errorcode ({errcode}). Runtime {timedelta(seconds=round(time() - SELF_START, 0))} \n') 5247 if not errmsg: errmsg = f'Bpytop exited with errorcode ({errcode}). See {CONFIG_DIR}/error.log for more information!' 5248 if errmsg: print(errmsg) 5249 5250 raise SystemExit(errcode) 5251 5252def floating_humanizer(value: Union[float, int], bit: bool = False, per_second: bool = False, start: int = 0, short: bool = False) -> str: 5253 '''Scales up in steps of 1024 to highest possible unit and returns string with unit suffixed 5254 * bit=True or defaults to bytes 5255 * start=int to set 1024 multiplier starting unit 5256 * short=True always returns 0 decimals and shortens unit to 1 character 5257 ''' 5258 out: str = "" 5259 mult: int = 8 if bit else 1 5260 selector: int = start 5261 unit: Tuple[str, ...] = UNITS["bit"] if bit else UNITS["byte"] 5262 5263 if isinstance(value, float): value = round(value * 100 * mult) 5264 elif value > 0: value *= 100 * mult 5265 else: value = 0 5266 5267 while len(f'{value}') > 5 and value >= 102400: 5268 value >>= 10 5269 if value < 100: 5270 out = f'{value}' 5271 break 5272 selector += 1 5273 else: 5274 if len(f'{value}') == 4 and selector > 0: 5275 out = f'{value}'[:-2] + "." + f'{value}'[-2] 5276 elif len(f'{value}') == 3 and selector > 0: 5277 out = f'{value}'[:-2] + "." + f'{value}'[-2:] 5278 elif len(f'{value}') >= 2: 5279 out = f'{value}'[:-2] 5280 else: 5281 out = f'{value}' 5282 5283 5284 if short: 5285 if "." in out: 5286 out = f'{round(float(out))}' 5287 if len(out) > 3: 5288 out = f'{int(out[0]) + 1}' 5289 selector += 1 5290 out += f'{"" if short else " "}{unit[selector][0] if short else unit[selector]}' 5291 if per_second: out += "ps" if bit else "/s" 5292 5293 return out 5294 5295def units_to_bytes(value: str) -> int: 5296 if not value: return 0 5297 out: int = 0 5298 mult: int = 0 5299 bit: bool = False 5300 value_i: int = 0 5301 units: Dict[str, int] = {"k" : 1, "m" : 2, "g" : 3} 5302 try: 5303 if value.lower().endswith("s"): 5304 value = value[:-1] 5305 if value.lower().endswith("bit"): 5306 bit = True 5307 value = value[:-3] 5308 elif value.lower().endswith("byte"): 5309 value = value[:-4] 5310 5311 if value[-1].lower() in units: 5312 mult = units[value[-1].lower()] 5313 value = value[:-1] 5314 5315 if "." in value and value.replace(".", "").isdigit(): 5316 if mult > 0: 5317 value_i = round(float(value) * 1024) 5318 mult -= 1 5319 else: 5320 value_i = round(float(value)) 5321 elif value.isdigit(): 5322 value_i = int(value) 5323 5324 out = int(value_i) << (10 * mult) 5325 if bit: out = round(out / 8) 5326 except ValueError: 5327 out = 0 5328 return out 5329 5330def min_max(value: int, min_value: int=0, max_value: int=100) -> int: 5331 return max(min_value, min(value, max_value)) 5332 5333def readfile(file: str, default: str = "") -> str: 5334 out: Union[str, None] = None 5335 if os.path.isfile(file): 5336 try: 5337 with open(file, "r") as f: 5338 out = f.read().strip() 5339 except: 5340 pass 5341 return default if out is None else out 5342 5343def temperature(value: int, scale: str = "celsius") -> Tuple[int, str]: 5344 """Returns a tuple with integer value and string unit converted from an integer in celsius to: celsius, fahrenheit, kelvin or rankine.""" 5345 if scale == "celsius": 5346 return (value, "°C") 5347 elif scale == "fahrenheit": 5348 return (round(value * 1.8 + 32), "°F") 5349 elif scale == "kelvin": 5350 return (round(value + 273.15), "K ") 5351 elif scale == "rankine": 5352 return (round(value * 1.8 + 491.67), "°R") 5353 else: 5354 return (0, "") 5355 5356def process_keys(): 5357 mouse_pos: Tuple[int, int] = (0, 0) 5358 filtered: bool = False 5359 box_keys = {"1" : "cpu", "2" : "mem", "3" : "net", "4" : "proc"} 5360 while Key.has_key(): 5361 key = Key.get() 5362 found: bool = True 5363 if key in ["mouse_scroll_up", "mouse_scroll_down", "mouse_click"]: 5364 mouse_pos = Key.get_mouse() 5365 if mouse_pos[0] >= ProcBox.x and ProcBox.current_y + 1 <= mouse_pos[1] < ProcBox.current_y + ProcBox.current_h - 1: 5366 pass 5367 elif key == "mouse_click": 5368 key = "mouse_unselect" 5369 else: 5370 key = "_null" 5371 5372 if ProcBox.filtering: 5373 if key in ["enter", "mouse_click", "mouse_unselect"]: 5374 ProcBox.filtering = False 5375 Collector.collect(ProcCollector, redraw=True, only_draw=True) 5376 continue 5377 elif key in ["escape", "delete"]: 5378 ProcCollector.search_filter = "" 5379 ProcBox.filtering = False 5380 elif len(key) == 1: 5381 ProcCollector.search_filter += key 5382 elif key == "backspace" and len(ProcCollector.search_filter) > 0: 5383 ProcCollector.search_filter = ProcCollector.search_filter[:-1] 5384 else: 5385 continue 5386 Collector.collect(ProcCollector, proc_interrupt=True, redraw=True) 5387 if filtered: Collector.collect_done.wait(0.1) 5388 filtered = True 5389 continue 5390 5391 if key == "_null": 5392 continue 5393 elif key == "q": 5394 clean_quit() 5395 elif key == "+" and CONFIG.update_ms + 100 <= 86399900: 5396 CONFIG.update_ms += 100 5397 Box.draw_update_ms() 5398 elif key == "-" and CONFIG.update_ms - 100 >= 100: 5399 CONFIG.update_ms -= 100 5400 Box.draw_update_ms() 5401 elif key in ["M", "escape"]: 5402 Menu.main() 5403 elif key in ["o", "f2"]: 5404 Menu.options() 5405 elif key in ["H", "f1"]: 5406 Menu.help() 5407 elif key == "m": 5408 if list(Box.view_modes).index(Box.view_mode) + 1 > len(list(Box.view_modes)) - 1: 5409 Box.view_mode = list(Box.view_modes)[0] 5410 else: 5411 Box.view_mode = list(Box.view_modes)[(list(Box.view_modes).index(Box.view_mode) + 1)] 5412 CONFIG.shown_boxes = " ".join(Box.view_modes[Box.view_mode]) 5413 Draw.clear(saved=True) 5414 Term.refresh(force=True) 5415 elif key in box_keys: 5416 boxes = CONFIG.shown_boxes.split() 5417 if box_keys[key] in boxes: 5418 boxes.remove(box_keys[key]) 5419 else: 5420 boxes.append(box_keys[key]) 5421 CONFIG.shown_boxes = " ".join(boxes) 5422 Box.view_mode = "user" 5423 Box.view_modes["user"] = CONFIG.shown_boxes.split() 5424 Draw.clear(saved=True) 5425 Term.refresh(force=True) 5426 else: 5427 found = False 5428 5429 if found: continue 5430 5431 if "proc" in Box.boxes: 5432 if key in ["left", "right", "h", "l"]: 5433 ProcCollector.sorting(key) 5434 elif key == " " and CONFIG.proc_tree and ProcBox.selected > 0: 5435 if ProcBox.selected_pid in ProcCollector.collapsed: 5436 ProcCollector.collapsed[ProcBox.selected_pid] = not ProcCollector.collapsed[ProcBox.selected_pid] 5437 Collector.collect(ProcCollector, interrupt=True, redraw=True) 5438 elif key == "e": 5439 CONFIG.proc_tree = not CONFIG.proc_tree 5440 Collector.collect(ProcCollector, interrupt=True, redraw=True) 5441 elif key == "r": 5442 CONFIG.proc_reversed = not CONFIG.proc_reversed 5443 Collector.collect(ProcCollector, interrupt=True, redraw=True) 5444 elif key == "c": 5445 CONFIG.proc_per_core = not CONFIG.proc_per_core 5446 Collector.collect(ProcCollector, interrupt=True, redraw=True) 5447 elif key in ["f", "F"]: 5448 ProcBox.filtering = True 5449 ProcCollector.case_sensitive = key == "F" 5450 if not ProcCollector.search_filter: ProcBox.start = 0 5451 Collector.collect(ProcCollector, redraw=True, only_draw=True) 5452 elif key in ["T", "K", "I"] and (ProcBox.selected > 0 or ProcCollector.detailed): 5453 pid: int = ProcBox.selected_pid if ProcBox.selected > 0 else ProcCollector.detailed_pid # type: ignore 5454 if psutil.pid_exists(pid): 5455 if key == "T": sig = signal.SIGTERM 5456 elif key == "K": sig = signal.SIGKILL 5457 elif key == "I": sig = signal.SIGINT 5458 try: 5459 os.kill(pid, sig) 5460 except Exception as e: 5461 errlog.error(f'Exception when sending signal {sig} to pid {pid}') 5462 errlog.exception(f'{e}') 5463 elif key == "delete" and ProcCollector.search_filter: 5464 ProcCollector.search_filter = "" 5465 Collector.collect(ProcCollector, proc_interrupt=True, redraw=True) 5466 elif key == "enter": 5467 if ProcBox.selected > 0 and ProcCollector.detailed_pid != ProcBox.selected_pid and psutil.pid_exists(ProcBox.selected_pid): 5468 ProcCollector.detailed = True 5469 ProcBox.last_selection = ProcBox.selected 5470 ProcBox.selected = 0 5471 ProcCollector.detailed_pid = ProcBox.selected_pid 5472 ProcBox.resized = True 5473 Collector.proc_counter = 1 5474 elif ProcCollector.detailed: 5475 ProcBox.selected = ProcBox.last_selection 5476 ProcBox.last_selection = 0 5477 ProcCollector.detailed = False 5478 ProcCollector.detailed_pid = None 5479 ProcBox.resized = True 5480 Collector.proc_counter = 1 5481 else: 5482 continue 5483 ProcCollector.details = {} 5484 ProcCollector.details_cpu = [] 5485 ProcCollector.details_mem = [] 5486 Graphs.detailed_cpu = NotImplemented 5487 Graphs.detailed_mem = NotImplemented 5488 Collector.collect(ProcCollector, proc_interrupt=True, redraw=True) 5489 elif key in ["up", "down", "mouse_scroll_up", "mouse_scroll_down", "page_up", "page_down", "home", "end", "mouse_click", "mouse_unselect", "j", "k"]: 5490 ProcBox.selector(key, mouse_pos) 5491 5492 if "net" in Box.boxes: 5493 if key in ["b", "n"]: 5494 NetCollector.switch(key) 5495 elif key == "z": 5496 NetCollector.reset = not NetCollector.reset 5497 Collector.collect(NetCollector, redraw=True) 5498 elif key == "y": 5499 CONFIG.net_sync = not CONFIG.net_sync 5500 Collector.collect(NetCollector, redraw=True) 5501 elif key == "a": 5502 NetCollector.auto_min = not NetCollector.auto_min 5503 NetCollector.net_min = {"download" : -1, "upload" : -1} 5504 Collector.collect(NetCollector, redraw=True) 5505 5506 if "mem" in Box.boxes: 5507 if key == "g": 5508 CONFIG.mem_graphs = not CONFIG.mem_graphs 5509 Collector.collect(MemCollector, interrupt=True, redraw=True) 5510 elif key == "s": 5511 Collector.collect_idle.wait() 5512 CONFIG.swap_disk = not CONFIG.swap_disk 5513 Collector.collect(MemCollector, interrupt=True, redraw=True) 5514 elif key == "d": 5515 Collector.collect_idle.wait() 5516 CONFIG.show_disks = not CONFIG.show_disks 5517 Collector.collect(MemCollector, interrupt=True, redraw=True) 5518 elif key == "i": 5519 Collector.collect_idle.wait() 5520 CONFIG.io_mode = not CONFIG.io_mode 5521 Collector.collect(MemCollector, interrupt=True, redraw=True) 5522 5523 5524 5525 5526 5527#? Pre main --------------------------------------------------------------------------------------> 5528 5529 5530CPU_NAME: str = get_cpu_name() 5531 5532CORE_MAP: List[int] = get_cpu_core_mapping() 5533 5534THEME: Theme 5535 5536def main(): 5537 global THEME 5538 5539 Term.width = os.get_terminal_size().columns 5540 Term.height = os.get_terminal_size().lines 5541 5542 #? Init --------------------------------------------------------------------------------------> 5543 if DEBUG: TimeIt.start("Init") 5544 5545 #? Switch to alternate screen, clear screen, hide cursor, enable mouse reporting and disable input echo 5546 Draw.now(Term.alt_screen, Term.clear, Term.hide_cursor, Term.mouse_on, Term.title("BpyTOP")) 5547 Term.echo(False) 5548 #Term.refresh(force=True) 5549 5550 #? Start a thread checking for updates while running init 5551 if CONFIG.update_check: UpdateChecker.run() 5552 5553 #? Draw banner and init status 5554 if CONFIG.show_init and not Init.resized: 5555 Init.start() 5556 5557 #? Load theme 5558 if CONFIG.show_init: 5559 Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Loading theme and creating colors... ")}{Mv.save}') 5560 try: 5561 THEME = Theme(CONFIG.color_theme) 5562 except Exception as e: 5563 Init.fail(e) 5564 else: 5565 Init.success() 5566 5567 #? Setup boxes 5568 if CONFIG.show_init: 5569 Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Doing some maths and drawing... ")}{Mv.save}') 5570 try: 5571 if CONFIG.check_temp: CpuCollector.get_sensors() 5572 Box.calc_sizes() 5573 Box.draw_bg(now=False) 5574 except Exception as e: 5575 Init.fail(e) 5576 else: 5577 Init.success() 5578 5579 #? Setup signal handlers for SIGSTP, SIGCONT, SIGINT and SIGWINCH 5580 if CONFIG.show_init: 5581 Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Setting up signal handlers... ")}{Mv.save}') 5582 try: 5583 signal.signal(signal.SIGTSTP, now_sleeping) #* Ctrl-Z 5584 signal.signal(signal.SIGCONT, now_awake) #* Resume 5585 signal.signal(signal.SIGINT, quit_sigint) #* Ctrl-C 5586 signal.signal(signal.SIGWINCH, Term.refresh) #* Terminal resized 5587 except Exception as e: 5588 Init.fail(e) 5589 else: 5590 Init.success() 5591 5592 #? Start a separate thread for reading keyboard input 5593 if CONFIG.show_init: 5594 Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting input reader thread... ")}{Mv.save}') 5595 try: 5596 if isinstance(sys.stdin, io.TextIOWrapper) and sys.version_info >= (3, 7): 5597 sys.stdin.reconfigure(errors="ignore") # type: ignore 5598 Key.start() 5599 except Exception as e: 5600 Init.fail(e) 5601 else: 5602 Init.success() 5603 5604 #? Start a separate thread for data collection and drawing 5605 if CONFIG.show_init: 5606 Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Starting data collection and drawer thread... ")}{Mv.save}') 5607 try: 5608 Collector.start() 5609 except Exception as e: 5610 Init.fail(e) 5611 else: 5612 Init.success() 5613 5614 #? Collect data and draw to buffer 5615 if CONFIG.show_init: 5616 Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Collecting data and drawing... ")}{Mv.save}') 5617 try: 5618 Collector.collect(draw_now=False) 5619 pass 5620 except Exception as e: 5621 Init.fail(e) 5622 else: 5623 Init.success() 5624 5625 #? Draw to screen 5626 if CONFIG.show_init: 5627 Draw.buffer("+init!", f'{Mv.restore}{Fx.trans("Finishing up... ")}{Mv.save}') 5628 try: 5629 Collector.collect_done.wait() 5630 except Exception as e: 5631 Init.fail(e) 5632 else: 5633 Init.success() 5634 5635 Init.done() 5636 Term.refresh() 5637 Draw.out(clear=True) 5638 if CONFIG.draw_clock: 5639 Box.clock_on = True 5640 if DEBUG: TimeIt.stop("Init") 5641 5642 #? Main loop -------------------------------------------------------------------------------------> 5643 5644 def run(): 5645 while not False: 5646 Term.refresh() 5647 Timer.stamp() 5648 5649 while Timer.not_zero(): 5650 if Key.input_wait(Timer.left()): 5651 process_keys() 5652 5653 Collector.collect() 5654 5655 #? Start main loop 5656 try: 5657 run() 5658 except Exception as e: 5659 errlog.exception(f'{e}') 5660 clean_quit(1) 5661 else: 5662 #? Quit cleanly even if false starts being true... 5663 clean_quit() 5664 5665 5666if __name__ == "__main__": 5667 main() 5668