1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net> 4 5import atexit 6import errno 7import fcntl 8import math 9import os 10import re 11import string 12import sys 13from contextlib import suppress 14from functools import lru_cache 15from time import monotonic 16from typing import ( 17 TYPE_CHECKING, Any, Callable, Dict, Generator, Iterable, List, Mapping, 18 Match, NamedTuple, Optional, Tuple, Union, cast 19) 20 21from .constants import ( 22 appname, is_macos, is_wayland, read_kitty_resource, shell_path, 23 supports_primary_selection 24) 25from .rgb import Color, to_color 26from .types import run_once 27from .typing import AddressFamily, PopenType, Socket, StartupCtx 28 29if TYPE_CHECKING: 30 from .options.types import Options 31 from .fast_data_types import OSWindowSize 32else: 33 Options = object 34 35 36def expandvars(val: str, env: Mapping[str, str] = {}, fallback_to_os_env: bool = True) -> str: 37 38 def sub(m: Match) -> str: 39 key = m.group(1) or m.group(2) 40 result = env.get(key) 41 if result is None and fallback_to_os_env: 42 result = os.environ.get(key) 43 if result is None: 44 result = m.group() 45 return result 46 47 if '$' not in val: 48 return val 49 50 return re.sub(r'\$(?:(\w+)|\{([^}]+)\})', sub, val) 51 52 53def platform_window_id(os_window_id: int) -> Optional[int]: 54 if is_macos: 55 from .fast_data_types import cocoa_window_id 56 with suppress(Exception): 57 return cocoa_window_id(os_window_id) 58 if not is_wayland(): 59 from .fast_data_types import x11_window_id 60 with suppress(Exception): 61 return x11_window_id(os_window_id) 62 63 64def load_shaders(name: str) -> Tuple[str, str]: 65 from .fast_data_types import GLSL_VERSION 66 67 def load(which: str) -> str: 68 return read_kitty_resource(f'{name}_{which}.glsl').decode('utf-8').replace('GLSL_VERSION', str(GLSL_VERSION), 1) 69 70 return load('vertex'), load('fragment') 71 72 73def safe_print(*a: Any, **k: Any) -> None: 74 with suppress(Exception): 75 print(*a, **k) 76 77 78def log_error(*a: Any, **k: str) -> None: 79 from .fast_data_types import log_error_string 80 output = getattr(log_error, 'redirect', log_error_string) 81 with suppress(Exception): 82 msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '').replace('\0', '') 83 output(msg) 84 85 86def ceil_int(x: float) -> int: 87 return int(math.ceil(x)) 88 89 90def sanitize_title(x: str) -> str: 91 return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x)) 92 93 94def color_as_int(val: Tuple[int, int, int]) -> int: 95 return val[0] << 16 | val[1] << 8 | val[2] 96 97 98def color_from_int(val: int) -> Color: 99 return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF) 100 101 102def parse_color_set(raw: str) -> Generator[Tuple[int, Optional[int]], None, None]: 103 parts = raw.split(';') 104 lp = len(parts) 105 if lp % 2 != 0: 106 return 107 for c_, spec in [parts[i:i + 2] for i in range(0, len(parts), 2)]: 108 try: 109 c = int(c_) 110 if c < 0 or c > 255: 111 continue 112 if spec == '?': 113 yield c, None 114 else: 115 q = to_color(spec) 116 if q is not None: 117 r, g, b = q 118 yield c, r << 16 | g << 8 | b 119 except Exception: 120 continue 121 122 123class ScreenSize(NamedTuple): 124 rows: int 125 cols: int 126 width: int 127 height: int 128 cell_width: int 129 cell_height: int 130 131 132class ScreenSizeGetter: 133 changed = True 134 Size = ScreenSize 135 ans: Optional[ScreenSize] = None 136 137 def __init__(self, fd: Optional[int]): 138 if fd is None: 139 fd = sys.stdout.fileno() 140 self.fd = fd 141 142 def __call__(self) -> ScreenSize: 143 if self.changed: 144 import array 145 import fcntl 146 import termios 147 buf = array.array('H', [0, 0, 0, 0]) 148 fcntl.ioctl(self.fd, termios.TIOCGWINSZ, cast(bytearray, buf)) 149 rows, cols, width, height = tuple(buf) 150 cell_width, cell_height = width // (cols or 1), height // (rows or 1) 151 self.ans = ScreenSize(rows, cols, width, height, cell_width, cell_height) 152 self.changed = False 153 return cast(ScreenSize, self.ans) 154 155 156@lru_cache(maxsize=64) 157def screen_size_function(fd: Optional[int] = None) -> ScreenSizeGetter: 158 return ScreenSizeGetter(fd) 159 160 161def fit_image(width: int, height: int, pwidth: int, pheight: int) -> Tuple[int, int]: 162 from math import floor 163 if height > pheight: 164 corrf = pheight / float(height) 165 width, height = floor(corrf * width), pheight 166 if width > pwidth: 167 corrf = pwidth / float(width) 168 width, height = pwidth, floor(corrf * height) 169 if height > pheight: 170 corrf = pheight / float(height) 171 width, height = floor(corrf * width), pheight 172 173 return int(width), int(height) 174 175 176def set_primary_selection(text: Union[str, bytes]) -> None: 177 if not supports_primary_selection: 178 return # There is no primary selection 179 from kitty.fast_data_types import set_primary_selection as s 180 s(text) 181 182 183def get_primary_selection() -> str: 184 if not supports_primary_selection: 185 return '' # There is no primary selection 186 from kitty.fast_data_types import get_primary_selection as g 187 return (g() or b'').decode('utf-8', 'replace') 188 189 190def base64_encode( 191 integer: int, 192 chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits + 193 '+/' 194) -> str: 195 ans = '' 196 while True: 197 integer, remainder = divmod(integer, 64) 198 ans = chars[remainder] + ans 199 if integer == 0: 200 break 201 return ans 202 203 204def command_for_open(program: Union[str, List[str]] = 'default') -> List[str]: 205 if isinstance(program, str): 206 from .conf.utils import to_cmdline 207 program = to_cmdline(program) 208 if program == ['default']: 209 cmd = ['open'] if is_macos else ['xdg-open'] 210 else: 211 cmd = program 212 return cmd 213 214 215def open_cmd(cmd: Union[Iterable[str], List[str]], arg: Union[None, Iterable[str], str] = None, cwd: Optional[str] = None) -> PopenType: 216 import subprocess 217 if arg is not None: 218 cmd = list(cmd) 219 if isinstance(arg, str): 220 cmd.append(arg) 221 else: 222 cmd.extend(arg) 223 return subprocess.Popen(tuple(cmd), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None) 224 225 226def open_url(url: str, program: Union[str, List[str]] = 'default', cwd: Optional[str] = None) -> PopenType: 227 return open_cmd(command_for_open(program), url, cwd=cwd) 228 229 230def detach(fork: bool = True, setsid: bool = True, redirect: bool = True) -> None: 231 if fork: 232 # Detach from the controlling process. 233 if os.fork() != 0: 234 raise SystemExit(0) 235 if setsid: 236 os.setsid() 237 if redirect: 238 from .fast_data_types import redirect_std_streams 239 redirect_std_streams(os.devnull) 240 241 242def adjust_line_height(cell_height: int, val: Union[int, float]) -> int: 243 if isinstance(val, int): 244 return cell_height + val 245 return int(cell_height * val) 246 247 248def init_startup_notification_x11(window_handle: int, startup_id: Optional[str] = None) -> Optional['StartupCtx']: 249 # https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt 250 from kitty.fast_data_types import init_x11_startup_notification 251 sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None) # ensure child processes don't get this env var 252 if not sid: 253 return None 254 from .fast_data_types import x11_display 255 display = x11_display() 256 if not display: 257 return None 258 return init_x11_startup_notification(display, window_handle, sid) 259 260 261def end_startup_notification_x11(ctx: 'StartupCtx') -> None: 262 from kitty.fast_data_types import end_x11_startup_notification 263 end_x11_startup_notification(ctx) 264 265 266def init_startup_notification(window_handle: Optional[int], startup_id: Optional[str] = None) -> Optional['StartupCtx']: 267 if is_macos or is_wayland(): 268 return None 269 if window_handle is None: 270 log_error('Could not perform startup notification as window handle not present') 271 return None 272 try: 273 try: 274 return init_startup_notification_x11(window_handle, startup_id) 275 except OSError as e: 276 if not str(e).startswith("Failed to load libstartup-notification"): 277 raise e 278 log_error( 279 f'{e}. This has two main effects:', 280 'There will be no startup feedback and when using --single-instance, kitty windows may start on an incorrect desktop/workspace.') 281 except Exception: 282 import traceback 283 traceback.print_exc() 284 285 286def end_startup_notification(ctx: Optional['StartupCtx']) -> None: 287 if not ctx: 288 return 289 if is_macos or is_wayland(): 290 return 291 try: 292 end_startup_notification_x11(ctx) 293 except Exception: 294 import traceback 295 traceback.print_exc() 296 297 298class startup_notification_handler: 299 300 def __init__(self, do_notify: bool = True, startup_id: Optional[str] = None, extra_callback: Optional[Callable] = None): 301 self.do_notify = do_notify 302 self.startup_id = startup_id 303 self.extra_callback = extra_callback 304 self.ctx: Optional['StartupCtx'] = None 305 306 def __enter__(self) -> Callable[[int], None]: 307 308 def pre_show_callback(window_handle: int) -> None: 309 if self.extra_callback is not None: 310 self.extra_callback(window_handle) 311 if self.do_notify: 312 self.ctx = init_startup_notification(window_handle, self.startup_id) 313 314 return pre_show_callback 315 316 def __exit__(self, *a: Any) -> None: 317 if self.ctx is not None: 318 end_startup_notification(self.ctx) 319 320 321def remove_socket_file(s: 'Socket', path: Optional[str] = None) -> None: 322 with suppress(OSError): 323 s.close() 324 if path: 325 with suppress(OSError): 326 os.unlink(path) 327 328 329def unix_socket_paths(name: str, ext: str = '.lock') -> Generator[str, None, None]: 330 import tempfile 331 home = os.path.expanduser('~') 332 candidates = [tempfile.gettempdir(), home] 333 if is_macos: 334 from .fast_data_types import user_cache_dir 335 candidates = [user_cache_dir(), '/Library/Caches'] 336 for loc in candidates: 337 if os.access(loc, os.W_OK | os.R_OK | os.X_OK): 338 filename = ('.' if loc == home else '') + name + ext 339 yield os.path.join(loc, filename) 340 341 342def single_instance_unix(name: str) -> bool: 343 import socket 344 for path in unix_socket_paths(name): 345 socket_path = path.rpartition('.')[0] + '.sock' 346 fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_CLOEXEC) 347 try: 348 fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 349 except OSError as err: 350 if err.errno in (errno.EAGAIN, errno.EACCES): 351 # Client 352 s = socket.socket(family=socket.AF_UNIX) 353 s.connect(socket_path) 354 single_instance.socket = s 355 return False 356 raise 357 s = socket.socket(family=socket.AF_UNIX) 358 try: 359 s.bind(socket_path) 360 except OSError as err: 361 if err.errno in (errno.EADDRINUSE, errno.EEXIST): 362 os.unlink(socket_path) 363 s.bind(socket_path) 364 else: 365 raise 366 single_instance.socket = s # prevent garbage collection from closing the socket 367 atexit.register(remove_socket_file, s, socket_path) 368 s.listen() 369 s.set_inheritable(False) 370 return True 371 return False 372 373 374class SingleInstance: 375 376 socket: Optional['Socket'] = None 377 378 def __call__(self, group_id: Optional[str] = None) -> bool: 379 import socket 380 name = '{}-ipc-{}'.format(appname, os.geteuid()) 381 if group_id: 382 name += '-{}'.format(group_id) 383 384 s = socket.socket(family=socket.AF_UNIX) 385 # First try with abstract UDS 386 addr = '\0' + name 387 try: 388 s.bind(addr) 389 except OSError as err: 390 if err.errno == errno.ENOENT: 391 return single_instance_unix(name) 392 if err.errno == errno.EADDRINUSE: 393 s.connect(addr) 394 self.socket = s 395 return False 396 raise 397 s.listen() 398 self.socket = s # prevent garbage collection from closing the socket 399 s.set_inheritable(False) 400 atexit.register(remove_socket_file, s) 401 return True 402 403 404single_instance = SingleInstance() 405 406 407def parse_address_spec(spec: str) -> Tuple[AddressFamily, Union[Tuple[str, int], str], Optional[str]]: 408 import socket 409 protocol, rest = spec.split(':', 1) 410 socket_path = None 411 address: Union[str, Tuple[str, int]] = '' 412 if protocol == 'unix': 413 family = socket.AF_UNIX 414 address = rest 415 if address.startswith('@') and len(address) > 1: 416 address = '\0' + address[1:] 417 else: 418 socket_path = address 419 elif protocol in ('tcp', 'tcp6'): 420 family = socket.AF_INET if protocol == 'tcp' else socket.AF_INET6 421 host, port = rest.rsplit(':', 1) 422 address = host, int(port) 423 else: 424 raise ValueError('Unknown protocol in --listen-on value: {}'.format(spec)) 425 return family, address, socket_path 426 427 428def write_all(fd: int, data: Union[str, bytes]) -> None: 429 if isinstance(data, str): 430 data = data.encode('utf-8') 431 while data: 432 n = os.write(fd, data) 433 if not n: 434 break 435 data = data[n:] 436 437 438class TTYIO: 439 440 def __enter__(self) -> 'TTYIO': 441 from .fast_data_types import open_tty 442 self.tty_fd, self.original_termios = open_tty(True) 443 return self 444 445 def __exit__(self, *a: Any) -> None: 446 from .fast_data_types import close_tty 447 close_tty(self.tty_fd, self.original_termios) 448 449 def send(self, data: Union[str, bytes, Iterable[Union[str, bytes]]]) -> None: 450 if isinstance(data, (str, bytes)): 451 write_all(self.tty_fd, data) 452 else: 453 for chunk in data: 454 write_all(self.tty_fd, chunk) 455 456 def recv(self, more_needed: Callable[[bytes], bool], timeout: float, sz: int = 1) -> None: 457 fd = self.tty_fd 458 start_time = monotonic() 459 while timeout > monotonic() - start_time: 460 # will block for 0.1 secs waiting for data because we have set 461 # VMIN=0 VTIME=1 in termios 462 data = os.read(fd, sz) 463 if data and not more_needed(data): 464 break 465 466 467def natsort_ints(iterable: Iterable[str]) -> List[str]: 468 469 def convert(text: str) -> Union[int, str]: 470 return int(text) if text.isdigit() else text 471 472 def alphanum_key(key: str) -> Tuple[Union[int, str], ...]: 473 return tuple(map(convert, re.split(r'(\d+)', key))) 474 475 return sorted(iterable, key=alphanum_key) 476 477 478def resolve_editor_cmd(editor: str, shell_env: Mapping[str, str]) -> Optional[str]: 479 import shlex 480 editor_cmd = shlex.split(editor) 481 editor_exe = (editor_cmd or ('',))[0] 482 if editor_exe and os.path.isabs(editor_exe): 483 return editor 484 if not editor_exe: 485 return None 486 487 def patched(exe: str) -> str: 488 editor_cmd[0] = exe 489 return ' '.join(map(shlex.quote, editor_cmd)) 490 491 if shell_env is os.environ: 492 q = find_exe(editor_exe) 493 if q: 494 return patched(q) 495 elif 'PATH' in shell_env: 496 import shutil 497 q = shutil.which(editor_exe, path=shell_env['PATH']) 498 if q: 499 return patched(q) 500 501 502def get_editor_from_env(env: Mapping[str, str]) -> Optional[str]: 503 for var in ('VISUAL', 'EDITOR'): 504 editor = env.get(var) 505 if editor: 506 editor = resolve_editor_cmd(editor, env) 507 if editor: 508 return editor 509 510 511def get_editor_from_env_vars(opts: Optional[Options] = None) -> List[str]: 512 import shlex 513 import shutil 514 515 editor = get_editor_from_env(os.environ) 516 if not editor: 517 shell_env = read_shell_environment(opts) 518 editor = get_editor_from_env(shell_env) 519 520 for ans in (editor, 'vim', 'nvim', 'vi', 'emacs', 'kak', 'micro', 'nano', 'vis'): 521 if ans and shutil.which(shlex.split(ans)[0]): 522 break 523 else: 524 ans = 'vim' 525 return shlex.split(ans) 526 527 528def get_editor(opts: Optional[Options] = None) -> List[str]: 529 if opts is None: 530 from .fast_data_types import get_options 531 try: 532 opts = get_options() 533 except RuntimeError: 534 # we are in a kitten 535 from .cli import create_default_opts 536 opts = create_default_opts() 537 if opts.editor == '.': 538 return get_editor_from_env_vars() 539 import shlex 540 return shlex.split(opts.editor) 541 542 543def is_path_in_temp_dir(path: str) -> bool: 544 if not path: 545 return False 546 547 def abspath(x: Optional[str]) -> str: 548 if x: 549 x = os.path.abspath(os.path.realpath(x)) 550 return x or '' 551 552 import tempfile 553 path = abspath(path) 554 candidates = frozenset(map(abspath, ('/tmp', '/dev/shm', os.environ.get('TMPDIR', None), tempfile.gettempdir()))) 555 for q in candidates: 556 if q and path.startswith(q): 557 return True 558 return False 559 560 561def func_name(f: Any) -> str: 562 if hasattr(f, '__name__'): 563 return str(f.__name__) 564 if hasattr(f, 'func') and hasattr(f.func, '__name__'): 565 return str(f.func.__name__) 566 return str(f) 567 568 569def resolved_shell(opts: Optional[Options] = None) -> List[str]: 570 q: str = getattr(opts, 'shell', '.') 571 if q == '.': 572 ans = [shell_path] 573 else: 574 import shlex 575 ans = shlex.split(q) 576 return ans 577 578 579@run_once 580def system_paths_on_macos() -> List[str]: 581 entries, seen = [], set() 582 583 def add_from_file(x: str) -> None: 584 try: 585 f = open(x) 586 except FileNotFoundError: 587 return 588 with f: 589 for line in f: 590 line = line.strip() 591 if line and not line.startswith('#') and line not in seen: 592 if os.path.isdir(line): 593 seen.add(line) 594 entries.append(line) 595 try: 596 files = os.listdir('/etc/paths.d') 597 except FileNotFoundError: 598 files = [] 599 for name in sorted(files): 600 add_from_file(os.path.join('/etc/paths.d', name)) 601 add_from_file('/etc/paths') 602 return entries 603 604 605@lru_cache(maxsize=32) 606def find_exe(name: str) -> Optional[str]: 607 import shutil 608 ans = shutil.which(name) 609 if ans is None: 610 # In case PATH is messed up 611 if is_macos: 612 paths = system_paths_on_macos() 613 else: 614 paths = ['/usr/local/bin', '/opt/bin', '/usr/bin', '/bin', '/usr/sbin', '/sbin'] 615 paths.insert(0, os.path.expanduser('~/.local/bin')) 616 path = os.pathsep.join(paths) + os.pathsep + os.defpath 617 ans = shutil.which(name, path=path) 618 return ans 619 620 621def read_shell_environment(opts: Optional[Options] = None) -> Dict[str, str]: 622 ans: Optional[Dict[str, str]] = getattr(read_shell_environment, 'ans', None) 623 if ans is None: 624 from .child import openpty, remove_blocking 625 ans = {} 626 setattr(read_shell_environment, 'ans', ans) 627 import subprocess 628 shell = resolved_shell(opts) 629 master, slave = openpty() 630 remove_blocking(master) 631 if '-l' not in shell and '--login' not in shell: 632 shell += ['-l'] 633 if '-i' not in shell and '--interactive' not in shell: 634 shell += ['-i'] 635 try: 636 p = subprocess.Popen(shell + ['-c', 'env'], stdout=slave, stdin=slave, stderr=slave, start_new_session=True, close_fds=True) 637 except FileNotFoundError: 638 log_error('Could not find shell to read environment') 639 return ans 640 with os.fdopen(master, 'rb') as stdout, os.fdopen(slave, 'wb'): 641 raw = b'' 642 from subprocess import TimeoutExpired 643 from time import monotonic 644 start_time = monotonic() 645 while monotonic() - start_time < 1.5: 646 try: 647 ret: Optional[int] = p.wait(0.01) 648 except TimeoutExpired: 649 ret = None 650 with suppress(Exception): 651 raw += stdout.read() 652 if ret is not None: 653 break 654 if cast(Optional[int], p.returncode) is None: 655 log_error('Timed out waiting for shell to quit while reading shell environment') 656 p.kill() 657 elif p.returncode == 0: 658 while True: 659 try: 660 x = stdout.read() 661 except Exception: 662 break 663 if not x: 664 break 665 raw += x 666 draw = raw.decode('utf-8', 'replace') 667 for line in draw.splitlines(): 668 k, v = line.partition('=')[::2] 669 if k and v: 670 ans[k] = v 671 else: 672 log_error('Failed to run shell to read its environment') 673 return ans 674 675 676def parse_uri_list(text: str) -> Generator[str, None, None]: 677 ' Get paths from file:// URLs ' 678 from urllib.parse import unquote, urlparse 679 for line in text.splitlines(): 680 if not line or line.startswith('#'): 681 continue 682 if not line.startswith('file://'): 683 yield line 684 continue 685 try: 686 purl = urlparse(line, allow_fragments=False) 687 except Exception: 688 yield line 689 continue 690 if purl.path: 691 yield unquote(purl.path) 692 693 694def edit_config_file() -> None: 695 from kitty.config import prepare_config_file_for_editing 696 p = prepare_config_file_for_editing() 697 editor = get_editor() 698 os.execvp(editor[0], editor + [p]) 699 700 701class SSHConnectionData(NamedTuple): 702 binary: str 703 hostname: str 704 port: Optional[int] = None 705 706 707def get_new_os_window_size( 708 metrics: 'OSWindowSize', width: int, height: int, unit: str, incremental: bool = False, has_window_scaling: bool = True 709) -> Tuple[int, int]: 710 if unit == 'cells': 711 cw = metrics['cell_width'] 712 ch = metrics['cell_height'] 713 if has_window_scaling: 714 cw = int(cw / metrics['xscale']) 715 ch = int(ch / metrics['yscale']) 716 width *= cw 717 height *= ch 718 if incremental: 719 w = metrics['width'] + width 720 h = metrics['height'] + height 721 else: 722 w = width or metrics['width'] 723 h = height or metrics['height'] 724 return w, h 725 726 727def get_all_processes() -> Iterable[int]: 728 if is_macos: 729 from kitty.fast_data_types import get_all_processes as f 730 yield from f() 731 else: 732 for c in os.listdir('/proc'): 733 if c.isdigit(): 734 yield int(c) 735 736 737def is_kitty_gui_cmdline(*cmd: str) -> bool: 738 if not cmd: 739 return False 740 if os.path.basename(cmd[0]) != 'kitty': 741 return False 742 if len(cmd) == 1: 743 return True 744 if '+' in cmd or '@' in cmd or cmd[1].startswith('+') or cmd[1].startswith('@'): 745 return False 746 return True 747 748 749def reload_conf_in_all_kitties() -> None: 750 import signal 751 from kitty.child import cmdline_of_process # type: ignore 752 for pid in get_all_processes(): 753 try: 754 cmd = cmdline_of_process(pid) 755 except Exception: 756 continue 757 if cmd and is_kitty_gui_cmdline(*cmd): 758 os.kill(pid, signal.SIGUSR1) 759