1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
4
5import atexit
6import errno
7import fcntl
8import math
9import os
10import re
11import string
12import sys
13from contextlib import suppress
14from functools import lru_cache
15from time import monotonic
16from typing import (
17    TYPE_CHECKING, Any, Callable, Dict, Generator, Iterable, List, Mapping,
18    Match, NamedTuple, Optional, Tuple, Union, cast
19)
20
21from .constants import (
22    appname, is_macos, is_wayland, read_kitty_resource, shell_path,
23    supports_primary_selection
24)
25from .rgb import Color, to_color
26from .types import run_once
27from .typing import AddressFamily, PopenType, Socket, StartupCtx
28
29if TYPE_CHECKING:
30    from .options.types import Options
31    from .fast_data_types import OSWindowSize
32else:
33    Options = object
34
35
36def expandvars(val: str, env: Mapping[str, str] = {}, fallback_to_os_env: bool = True) -> str:
37
38    def sub(m: Match) -> str:
39        key = m.group(1) or m.group(2)
40        result = env.get(key)
41        if result is None and fallback_to_os_env:
42            result = os.environ.get(key)
43        if result is None:
44            result = m.group()
45        return result
46
47    if '$' not in val:
48        return val
49
50    return re.sub(r'\$(?:(\w+)|\{([^}]+)\})', sub, val)
51
52
53def platform_window_id(os_window_id: int) -> Optional[int]:
54    if is_macos:
55        from .fast_data_types import cocoa_window_id
56        with suppress(Exception):
57            return cocoa_window_id(os_window_id)
58    if not is_wayland():
59        from .fast_data_types import x11_window_id
60        with suppress(Exception):
61            return x11_window_id(os_window_id)
62
63
64def load_shaders(name: str) -> Tuple[str, str]:
65    from .fast_data_types import GLSL_VERSION
66
67    def load(which: str) -> str:
68        return read_kitty_resource(f'{name}_{which}.glsl').decode('utf-8').replace('GLSL_VERSION', str(GLSL_VERSION), 1)
69
70    return load('vertex'), load('fragment')
71
72
73def safe_print(*a: Any, **k: Any) -> None:
74    with suppress(Exception):
75        print(*a, **k)
76
77
78def log_error(*a: Any, **k: str) -> None:
79    from .fast_data_types import log_error_string
80    output = getattr(log_error, 'redirect', log_error_string)
81    with suppress(Exception):
82        msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '').replace('\0', '')
83        output(msg)
84
85
86def ceil_int(x: float) -> int:
87    return int(math.ceil(x))
88
89
90def sanitize_title(x: str) -> str:
91    return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x))
92
93
94def color_as_int(val: Tuple[int, int, int]) -> int:
95    return val[0] << 16 | val[1] << 8 | val[2]
96
97
98def color_from_int(val: int) -> Color:
99    return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF)
100
101
102def parse_color_set(raw: str) -> Generator[Tuple[int, Optional[int]], None, None]:
103    parts = raw.split(';')
104    lp = len(parts)
105    if lp % 2 != 0:
106        return
107    for c_, spec in [parts[i:i + 2] for i in range(0, len(parts), 2)]:
108        try:
109            c = int(c_)
110            if c < 0 or c > 255:
111                continue
112            if spec == '?':
113                yield c, None
114            else:
115                q = to_color(spec)
116                if q is not None:
117                    r, g, b = q
118                    yield c, r << 16 | g << 8 | b
119        except Exception:
120            continue
121
122
123class ScreenSize(NamedTuple):
124    rows: int
125    cols: int
126    width: int
127    height: int
128    cell_width: int
129    cell_height: int
130
131
132class ScreenSizeGetter:
133    changed = True
134    Size = ScreenSize
135    ans: Optional[ScreenSize] = None
136
137    def __init__(self, fd: Optional[int]):
138        if fd is None:
139            fd = sys.stdout.fileno()
140        self.fd = fd
141
142    def __call__(self) -> ScreenSize:
143        if self.changed:
144            import array
145            import fcntl
146            import termios
147            buf = array.array('H', [0, 0, 0, 0])
148            fcntl.ioctl(self.fd, termios.TIOCGWINSZ, cast(bytearray, buf))
149            rows, cols, width, height = tuple(buf)
150            cell_width, cell_height = width // (cols or 1), height // (rows or 1)
151            self.ans = ScreenSize(rows, cols, width, height, cell_width, cell_height)
152            self.changed = False
153        return cast(ScreenSize, self.ans)
154
155
156@lru_cache(maxsize=64)
157def screen_size_function(fd: Optional[int] = None) -> ScreenSizeGetter:
158    return ScreenSizeGetter(fd)
159
160
161def fit_image(width: int, height: int, pwidth: int, pheight: int) -> Tuple[int, int]:
162    from math import floor
163    if height > pheight:
164        corrf = pheight / float(height)
165        width, height = floor(corrf * width), pheight
166    if width > pwidth:
167        corrf = pwidth / float(width)
168        width, height = pwidth, floor(corrf * height)
169    if height > pheight:
170        corrf = pheight / float(height)
171        width, height = floor(corrf * width), pheight
172
173    return int(width), int(height)
174
175
176def set_primary_selection(text: Union[str, bytes]) -> None:
177    if not supports_primary_selection:
178        return  # There is no primary selection
179    from kitty.fast_data_types import set_primary_selection as s
180    s(text)
181
182
183def get_primary_selection() -> str:
184    if not supports_primary_selection:
185        return ''  # There is no primary selection
186    from kitty.fast_data_types import get_primary_selection as g
187    return (g() or b'').decode('utf-8', 'replace')
188
189
190def base64_encode(
191    integer: int,
192    chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits +
193    '+/'
194) -> str:
195    ans = ''
196    while True:
197        integer, remainder = divmod(integer, 64)
198        ans = chars[remainder] + ans
199        if integer == 0:
200            break
201    return ans
202
203
204def command_for_open(program: Union[str, List[str]] = 'default') -> List[str]:
205    if isinstance(program, str):
206        from .conf.utils import to_cmdline
207        program = to_cmdline(program)
208    if program == ['default']:
209        cmd = ['open'] if is_macos else ['xdg-open']
210    else:
211        cmd = program
212    return cmd
213
214
215def open_cmd(cmd: Union[Iterable[str], List[str]], arg: Union[None, Iterable[str], str] = None, cwd: Optional[str] = None) -> PopenType:
216    import subprocess
217    if arg is not None:
218        cmd = list(cmd)
219        if isinstance(arg, str):
220            cmd.append(arg)
221        else:
222            cmd.extend(arg)
223    return subprocess.Popen(tuple(cmd), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None)
224
225
226def open_url(url: str, program: Union[str, List[str]] = 'default', cwd: Optional[str] = None) -> PopenType:
227    return open_cmd(command_for_open(program), url, cwd=cwd)
228
229
230def detach(fork: bool = True, setsid: bool = True, redirect: bool = True) -> None:
231    if fork:
232        # Detach from the controlling process.
233        if os.fork() != 0:
234            raise SystemExit(0)
235    if setsid:
236        os.setsid()
237    if redirect:
238        from .fast_data_types import redirect_std_streams
239        redirect_std_streams(os.devnull)
240
241
242def adjust_line_height(cell_height: int, val: Union[int, float]) -> int:
243    if isinstance(val, int):
244        return cell_height + val
245    return int(cell_height * val)
246
247
248def init_startup_notification_x11(window_handle: int, startup_id: Optional[str] = None) -> Optional['StartupCtx']:
249    # https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt
250    from kitty.fast_data_types import init_x11_startup_notification
251    sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None)  # ensure child processes don't get this env var
252    if not sid:
253        return None
254    from .fast_data_types import x11_display
255    display = x11_display()
256    if not display:
257        return None
258    return init_x11_startup_notification(display, window_handle, sid)
259
260
261def end_startup_notification_x11(ctx: 'StartupCtx') -> None:
262    from kitty.fast_data_types import end_x11_startup_notification
263    end_x11_startup_notification(ctx)
264
265
266def init_startup_notification(window_handle: Optional[int], startup_id: Optional[str] = None) -> Optional['StartupCtx']:
267    if is_macos or is_wayland():
268        return None
269    if window_handle is None:
270        log_error('Could not perform startup notification as window handle not present')
271        return None
272    try:
273        try:
274            return init_startup_notification_x11(window_handle, startup_id)
275        except OSError as e:
276            if not str(e).startswith("Failed to load libstartup-notification"):
277                raise e
278            log_error(
279                f'{e}. This has two main effects:',
280                'There will be no startup feedback and when using --single-instance, kitty windows may start on an incorrect desktop/workspace.')
281    except Exception:
282        import traceback
283        traceback.print_exc()
284
285
286def end_startup_notification(ctx: Optional['StartupCtx']) -> None:
287    if not ctx:
288        return
289    if is_macos or is_wayland():
290        return
291    try:
292        end_startup_notification_x11(ctx)
293    except Exception:
294        import traceback
295        traceback.print_exc()
296
297
298class startup_notification_handler:
299
300    def __init__(self, do_notify: bool = True, startup_id: Optional[str] = None, extra_callback: Optional[Callable] = None):
301        self.do_notify = do_notify
302        self.startup_id = startup_id
303        self.extra_callback = extra_callback
304        self.ctx: Optional['StartupCtx'] = None
305
306    def __enter__(self) -> Callable[[int], None]:
307
308        def pre_show_callback(window_handle: int) -> None:
309            if self.extra_callback is not None:
310                self.extra_callback(window_handle)
311            if self.do_notify:
312                self.ctx = init_startup_notification(window_handle, self.startup_id)
313
314        return pre_show_callback
315
316    def __exit__(self, *a: Any) -> None:
317        if self.ctx is not None:
318            end_startup_notification(self.ctx)
319
320
321def remove_socket_file(s: 'Socket', path: Optional[str] = None) -> None:
322    with suppress(OSError):
323        s.close()
324    if path:
325        with suppress(OSError):
326            os.unlink(path)
327
328
329def unix_socket_paths(name: str, ext: str = '.lock') -> Generator[str, None, None]:
330    import tempfile
331    home = os.path.expanduser('~')
332    candidates = [tempfile.gettempdir(), home]
333    if is_macos:
334        from .fast_data_types import user_cache_dir
335        candidates = [user_cache_dir(), '/Library/Caches']
336    for loc in candidates:
337        if os.access(loc, os.W_OK | os.R_OK | os.X_OK):
338            filename = ('.' if loc == home else '') + name + ext
339            yield os.path.join(loc, filename)
340
341
342def single_instance_unix(name: str) -> bool:
343    import socket
344    for path in unix_socket_paths(name):
345        socket_path = path.rpartition('.')[0] + '.sock'
346        fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_CLOEXEC)
347        try:
348            fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
349        except OSError as err:
350            if err.errno in (errno.EAGAIN, errno.EACCES):
351                # Client
352                s = socket.socket(family=socket.AF_UNIX)
353                s.connect(socket_path)
354                single_instance.socket = s
355                return False
356            raise
357        s = socket.socket(family=socket.AF_UNIX)
358        try:
359            s.bind(socket_path)
360        except OSError as err:
361            if err.errno in (errno.EADDRINUSE, errno.EEXIST):
362                os.unlink(socket_path)
363                s.bind(socket_path)
364            else:
365                raise
366        single_instance.socket = s  # prevent garbage collection from closing the socket
367        atexit.register(remove_socket_file, s, socket_path)
368        s.listen()
369        s.set_inheritable(False)
370        return True
371    return False
372
373
374class SingleInstance:
375
376    socket: Optional['Socket'] = None
377
378    def __call__(self, group_id: Optional[str] = None) -> bool:
379        import socket
380        name = '{}-ipc-{}'.format(appname, os.geteuid())
381        if group_id:
382            name += '-{}'.format(group_id)
383
384        s = socket.socket(family=socket.AF_UNIX)
385        # First try with abstract UDS
386        addr = '\0' + name
387        try:
388            s.bind(addr)
389        except OSError as err:
390            if err.errno == errno.ENOENT:
391                return single_instance_unix(name)
392            if err.errno == errno.EADDRINUSE:
393                s.connect(addr)
394                self.socket = s
395                return False
396            raise
397        s.listen()
398        self.socket = s  # prevent garbage collection from closing the socket
399        s.set_inheritable(False)
400        atexit.register(remove_socket_file, s)
401        return True
402
403
404single_instance = SingleInstance()
405
406
407def parse_address_spec(spec: str) -> Tuple[AddressFamily, Union[Tuple[str, int], str], Optional[str]]:
408    import socket
409    protocol, rest = spec.split(':', 1)
410    socket_path = None
411    address: Union[str, Tuple[str, int]] = ''
412    if protocol == 'unix':
413        family = socket.AF_UNIX
414        address = rest
415        if address.startswith('@') and len(address) > 1:
416            address = '\0' + address[1:]
417        else:
418            socket_path = address
419    elif protocol in ('tcp', 'tcp6'):
420        family = socket.AF_INET if protocol == 'tcp' else socket.AF_INET6
421        host, port = rest.rsplit(':', 1)
422        address = host, int(port)
423    else:
424        raise ValueError('Unknown protocol in --listen-on value: {}'.format(spec))
425    return family, address, socket_path
426
427
428def write_all(fd: int, data: Union[str, bytes]) -> None:
429    if isinstance(data, str):
430        data = data.encode('utf-8')
431    while data:
432        n = os.write(fd, data)
433        if not n:
434            break
435        data = data[n:]
436
437
438class TTYIO:
439
440    def __enter__(self) -> 'TTYIO':
441        from .fast_data_types import open_tty
442        self.tty_fd, self.original_termios = open_tty(True)
443        return self
444
445    def __exit__(self, *a: Any) -> None:
446        from .fast_data_types import close_tty
447        close_tty(self.tty_fd, self.original_termios)
448
449    def send(self, data: Union[str, bytes, Iterable[Union[str, bytes]]]) -> None:
450        if isinstance(data, (str, bytes)):
451            write_all(self.tty_fd, data)
452        else:
453            for chunk in data:
454                write_all(self.tty_fd, chunk)
455
456    def recv(self, more_needed: Callable[[bytes], bool], timeout: float, sz: int = 1) -> None:
457        fd = self.tty_fd
458        start_time = monotonic()
459        while timeout > monotonic() - start_time:
460            # will block for 0.1 secs waiting for data because we have set
461            # VMIN=0 VTIME=1 in termios
462            data = os.read(fd, sz)
463            if data and not more_needed(data):
464                break
465
466
467def natsort_ints(iterable: Iterable[str]) -> List[str]:
468
469    def convert(text: str) -> Union[int, str]:
470        return int(text) if text.isdigit() else text
471
472    def alphanum_key(key: str) -> Tuple[Union[int, str], ...]:
473        return tuple(map(convert, re.split(r'(\d+)', key)))
474
475    return sorted(iterable, key=alphanum_key)
476
477
478def resolve_editor_cmd(editor: str, shell_env: Mapping[str, str]) -> Optional[str]:
479    import shlex
480    editor_cmd = shlex.split(editor)
481    editor_exe = (editor_cmd or ('',))[0]
482    if editor_exe and os.path.isabs(editor_exe):
483        return editor
484    if not editor_exe:
485        return None
486
487    def patched(exe: str) -> str:
488        editor_cmd[0] = exe
489        return ' '.join(map(shlex.quote, editor_cmd))
490
491    if shell_env is os.environ:
492        q = find_exe(editor_exe)
493        if q:
494            return patched(q)
495    elif 'PATH' in shell_env:
496        import shutil
497        q = shutil.which(editor_exe, path=shell_env['PATH'])
498        if q:
499            return patched(q)
500
501
502def get_editor_from_env(env: Mapping[str, str]) -> Optional[str]:
503    for var in ('VISUAL', 'EDITOR'):
504        editor = env.get(var)
505        if editor:
506            editor = resolve_editor_cmd(editor, env)
507            if editor:
508                return editor
509
510
511def get_editor_from_env_vars(opts: Optional[Options] = None) -> List[str]:
512    import shlex
513    import shutil
514
515    editor = get_editor_from_env(os.environ)
516    if not editor:
517        shell_env = read_shell_environment(opts)
518        editor = get_editor_from_env(shell_env)
519
520    for ans in (editor, 'vim', 'nvim', 'vi', 'emacs', 'kak', 'micro', 'nano', 'vis'):
521        if ans and shutil.which(shlex.split(ans)[0]):
522            break
523    else:
524        ans = 'vim'
525    return shlex.split(ans)
526
527
528def get_editor(opts: Optional[Options] = None) -> List[str]:
529    if opts is None:
530        from .fast_data_types import get_options
531        try:
532            opts = get_options()
533        except RuntimeError:
534            # we are in a kitten
535            from .cli import create_default_opts
536            opts = create_default_opts()
537    if opts.editor == '.':
538        return get_editor_from_env_vars()
539    import shlex
540    return shlex.split(opts.editor)
541
542
543def is_path_in_temp_dir(path: str) -> bool:
544    if not path:
545        return False
546
547    def abspath(x: Optional[str]) -> str:
548        if x:
549            x = os.path.abspath(os.path.realpath(x))
550        return x or ''
551
552    import tempfile
553    path = abspath(path)
554    candidates = frozenset(map(abspath, ('/tmp', '/dev/shm', os.environ.get('TMPDIR', None), tempfile.gettempdir())))
555    for q in candidates:
556        if q and path.startswith(q):
557            return True
558    return False
559
560
561def func_name(f: Any) -> str:
562    if hasattr(f, '__name__'):
563        return str(f.__name__)
564    if hasattr(f, 'func') and hasattr(f.func, '__name__'):
565        return str(f.func.__name__)
566    return str(f)
567
568
569def resolved_shell(opts: Optional[Options] = None) -> List[str]:
570    q: str = getattr(opts, 'shell', '.')
571    if q == '.':
572        ans = [shell_path]
573    else:
574        import shlex
575        ans = shlex.split(q)
576    return ans
577
578
579@run_once
580def system_paths_on_macos() -> List[str]:
581    entries, seen = [], set()
582
583    def add_from_file(x: str) -> None:
584        try:
585            f = open(x)
586        except FileNotFoundError:
587            return
588        with f:
589            for line in f:
590                line = line.strip()
591                if line and not line.startswith('#') and line not in seen:
592                    if os.path.isdir(line):
593                        seen.add(line)
594                        entries.append(line)
595    try:
596        files = os.listdir('/etc/paths.d')
597    except FileNotFoundError:
598        files = []
599    for name in sorted(files):
600        add_from_file(os.path.join('/etc/paths.d', name))
601    add_from_file('/etc/paths')
602    return entries
603
604
605@lru_cache(maxsize=32)
606def find_exe(name: str) -> Optional[str]:
607    import shutil
608    ans = shutil.which(name)
609    if ans is None:
610        # In case PATH is messed up
611        if is_macos:
612            paths = system_paths_on_macos()
613        else:
614            paths = ['/usr/local/bin', '/opt/bin', '/usr/bin', '/bin', '/usr/sbin', '/sbin']
615        paths.insert(0, os.path.expanduser('~/.local/bin'))
616        path = os.pathsep.join(paths) + os.pathsep + os.defpath
617        ans = shutil.which(name, path=path)
618    return ans
619
620
621def read_shell_environment(opts: Optional[Options] = None) -> Dict[str, str]:
622    ans: Optional[Dict[str, str]] = getattr(read_shell_environment, 'ans', None)
623    if ans is None:
624        from .child import openpty, remove_blocking
625        ans = {}
626        setattr(read_shell_environment, 'ans', ans)
627        import subprocess
628        shell = resolved_shell(opts)
629        master, slave = openpty()
630        remove_blocking(master)
631        if '-l' not in shell and '--login' not in shell:
632            shell += ['-l']
633        if '-i' not in shell and '--interactive' not in shell:
634            shell += ['-i']
635        try:
636            p = subprocess.Popen(shell + ['-c', 'env'], stdout=slave, stdin=slave, stderr=slave, start_new_session=True, close_fds=True)
637        except FileNotFoundError:
638            log_error('Could not find shell to read environment')
639            return ans
640        with os.fdopen(master, 'rb') as stdout, os.fdopen(slave, 'wb'):
641            raw = b''
642            from subprocess import TimeoutExpired
643            from time import monotonic
644            start_time = monotonic()
645            while monotonic() - start_time < 1.5:
646                try:
647                    ret: Optional[int] = p.wait(0.01)
648                except TimeoutExpired:
649                    ret = None
650                with suppress(Exception):
651                    raw += stdout.read()
652                if ret is not None:
653                    break
654            if cast(Optional[int], p.returncode) is None:
655                log_error('Timed out waiting for shell to quit while reading shell environment')
656                p.kill()
657            elif p.returncode == 0:
658                while True:
659                    try:
660                        x = stdout.read()
661                    except Exception:
662                        break
663                    if not x:
664                        break
665                    raw += x
666                draw = raw.decode('utf-8', 'replace')
667                for line in draw.splitlines():
668                    k, v = line.partition('=')[::2]
669                    if k and v:
670                        ans[k] = v
671            else:
672                log_error('Failed to run shell to read its environment')
673    return ans
674
675
676def parse_uri_list(text: str) -> Generator[str, None, None]:
677    ' Get paths from file:// URLs '
678    from urllib.parse import unquote, urlparse
679    for line in text.splitlines():
680        if not line or line.startswith('#'):
681            continue
682        if not line.startswith('file://'):
683            yield line
684            continue
685        try:
686            purl = urlparse(line, allow_fragments=False)
687        except Exception:
688            yield line
689            continue
690        if purl.path:
691            yield unquote(purl.path)
692
693
694def edit_config_file() -> None:
695    from kitty.config import prepare_config_file_for_editing
696    p = prepare_config_file_for_editing()
697    editor = get_editor()
698    os.execvp(editor[0], editor + [p])
699
700
701class SSHConnectionData(NamedTuple):
702    binary: str
703    hostname: str
704    port: Optional[int] = None
705
706
707def get_new_os_window_size(
708    metrics: 'OSWindowSize', width: int, height: int, unit: str, incremental: bool = False, has_window_scaling: bool = True
709) -> Tuple[int, int]:
710    if unit == 'cells':
711        cw = metrics['cell_width']
712        ch = metrics['cell_height']
713        if has_window_scaling:
714            cw = int(cw / metrics['xscale'])
715            ch = int(ch / metrics['yscale'])
716        width *= cw
717        height *= ch
718    if incremental:
719        w = metrics['width'] + width
720        h = metrics['height'] + height
721    else:
722        w = width or metrics['width']
723        h = height or metrics['height']
724    return w, h
725
726
727def get_all_processes() -> Iterable[int]:
728    if is_macos:
729        from kitty.fast_data_types import get_all_processes as f
730        yield from f()
731    else:
732        for c in os.listdir('/proc'):
733            if c.isdigit():
734                yield int(c)
735
736
737def is_kitty_gui_cmdline(*cmd: str) -> bool:
738    if not cmd:
739        return False
740    if os.path.basename(cmd[0]) != 'kitty':
741        return False
742    if len(cmd) == 1:
743        return True
744    if '+' in cmd or '@' in cmd or cmd[1].startswith('+') or cmd[1].startswith('@'):
745        return False
746    return True
747
748
749def reload_conf_in_all_kitties() -> None:
750    import signal
751    from kitty.child import cmdline_of_process  # type: ignore
752    for pid in get_all_processes():
753        try:
754            cmd = cmdline_of_process(pid)
755        except Exception:
756            continue
757        if cmd and is_kitty_gui_cmdline(*cmd):
758            os.kill(pid, signal.SIGUSR1)
759