1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPLv3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net> 4 5import os 6import shlex 7import sys 8from typing import ( 9 Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, 10 Union 11) 12 13from kittens.runner import ( 14 all_kitten_names, get_kitten_cli_docs, get_kitten_completer 15) 16 17from .cli import ( 18 OptionDict, OptionSpecSeq, options_for_completion, parse_option_spec, 19 prettify 20) 21from .fast_data_types import truncate_point_for_length, wcswidth 22from .rc.base import all_command_names, command_for_name 23from .shell import options_for_cmd 24from .types import run_once 25from .utils import screen_size_function 26 27''' 28To add completion for a new shell, you need to: 29 301) Add an entry to completion scripts for your shell, this is 31a simple function that calls kitty's completion code and passes the 32results to the shell's completion system. This can be output by 33`kitty +complete setup shell_name` and its output goes into 34your shell's rc file. 35 362) Add an input_parser function, this takes the input from 37the shell for the text being completed and returns a list of words 38and a boolean indicating if we are on a new word or not. This 39is passed to kitty's completion system. 40 413) An output_serializer function that is responsible for 42taking the results from kitty's completion system and converting 43them into something your shell will understand. 44''' 45 46parsers: Dict[str, Callable] = {} 47serializers: Dict[str, Callable] = {} 48 49 50class MatchGroup: 51 52 def __init__( 53 self, x: Union[Dict[str, str], Iterable[str]], 54 trailing_space: bool = True, 55 is_files: bool = False, 56 word_transforms: Optional[Dict[str, str]] = None, 57 ): 58 self.mdict = x if isinstance(x, dict) else dict.fromkeys(x, '') 59 self.trailing_space = trailing_space 60 self.is_files = is_files 61 self.word_transforms = word_transforms or {} 62 63 def __iter__(self) -> Iterator[str]: 64 return iter(self.mdict) 65 66 def transformed_words(self) -> Iterator[str]: 67 for w in self: 68 yield self.word_transforms.get(w, w) 69 70 def transformed_items(self) -> Iterator[Tuple[str, str]]: 71 for w, desc in self.items(): 72 yield self.word_transforms.get(w, w), desc 73 74 def items(self) -> Iterator[Tuple[str, str]]: 75 return iter(self.mdict.items()) 76 77 def values(self) -> Iterator[str]: 78 return iter(self.mdict.values()) 79 80 81def debug(*a: Any, **kw: Any) -> None: 82 from kittens.tui.loop import debug_write 83 debug_write(*a, **kw) 84 85 86class Delegate: 87 88 def __init__(self, words: Sequence[str] = (), pos: int = -1, new_word: bool = False): 89 self.words: Sequence[str] = words 90 self.pos = pos 91 self.num_of_unknown_args = len(words) - pos 92 self.new_word = new_word 93 94 def __bool__(self) -> bool: 95 return self.pos > -1 and self.num_of_unknown_args > 0 96 97 @property 98 def precommand(self) -> str: 99 try: 100 return self.words[self.pos] 101 except IndexError: 102 return '' 103 104 105class Completions: 106 107 def __init__(self) -> None: 108 self.match_groups: Dict[str, MatchGroup] = {} 109 self.delegate: Delegate = Delegate() 110 111 def add_match_group( 112 self, name: str, x: Union[Dict[str, str], Iterable[str]], 113 trailing_space: bool = True, 114 is_files: bool = False, 115 word_transforms: Optional[Dict[str, str]] = None 116 ) -> MatchGroup: 117 self.match_groups[name] = m = MatchGroup(x, trailing_space, is_files, word_transforms) 118 return m 119 120 121@run_once 122def remote_control_command_names() -> Tuple[str, ...]: 123 return tuple(sorted(x.replace('_', '-') for x in all_command_names())) 124 125 126# Shell specific code {{{ 127 128 129completion_scripts = { 130 'zsh': '''#compdef kitty 131 132_kitty() { 133 local src 134 # Send all words up to the word the cursor is currently on 135 src=$(printf "%s\n" "${(@)words[1,$CURRENT]}" | kitty +complete zsh) 136 if [[ $? == 0 ]]; then 137 eval ${src} 138 fi 139} 140compdef _kitty kitty 141''', 142 'bash': ''' 143_kitty_completions() { 144 local src 145 local limit 146 # Send all words up to the word the cursor is currently on 147 let limit=1+$COMP_CWORD 148 src=$(printf "%s\n" "${COMP_WORDS[@]: 0:$limit}" | kitty +complete bash) 149 if [[ $? == 0 ]]; then 150 eval ${src} 151 fi 152} 153 154complete -o nospace -F _kitty_completions kitty 155''', 156 'fish': ''' 157function __kitty_completions 158 # Send all words up to the one before the cursor 159 commandline -cop | kitty +complete fish 160end 161 162complete -f -c kitty -a "(__kitty_completions)" 163''', 164} 165 166ParseResult = Tuple[List[str], bool] 167ParserFunc = Callable[[str], ParseResult] 168SerializerFunc = Callable[[Completions], str] 169 170 171def input_parser(func: ParserFunc) -> ParserFunc: 172 name = func.__name__.split('_')[0] 173 parsers[name] = func 174 return func 175 176 177def output_serializer(func: SerializerFunc) -> SerializerFunc: 178 name = func.__name__.split('_')[0] 179 serializers[name] = func 180 return func 181 182 183@input_parser 184def zsh_input_parser(data: str) -> ParseResult: 185 new_word = data.endswith('\n\n') 186 words = data.rstrip().splitlines() 187 return words, new_word 188 189 190@input_parser 191def bash_input_parser(data: str) -> ParseResult: 192 new_word = data.endswith('\n\n') 193 words = data.rstrip().splitlines() 194 return words, new_word 195 196 197@input_parser 198def fish_input_parser(data: str) -> ParseResult: 199 return data.rstrip().splitlines(), True 200 201 202@output_serializer 203def zsh_output_serializer(ans: Completions) -> str: 204 lines = [] 205 206 screen = screen_size_function(sys.stderr.fileno())() 207 width = screen.cols 208 209 def fmt_desc(word: str, desc: str, max_word_len: int) -> Iterator[str]: 210 if not desc: 211 yield word 212 return 213 desc = prettify(desc.splitlines()[0]) 214 multiline = False 215 if wcswidth(word) > max_word_len: 216 max_desc_len = width - 2 217 multiline = True 218 else: 219 word = word.ljust(max_word_len) 220 max_desc_len = width - max_word_len - 3 221 if wcswidth(desc) > max_desc_len: 222 desc = desc[:truncate_point_for_length(desc, max_desc_len - 2)] 223 desc += '…' 224 225 if multiline: 226 ans = f'{word}\n {desc}' 227 else: 228 ans = f'{word} {desc}' 229 yield ans 230 231 for description, matches in ans.match_groups.items(): 232 cmd = ['compadd', '-U', '-J', shlex.quote(description), '-X', shlex.quote('%B' + description + '%b')] 233 if not matches.trailing_space: 234 cmd += ['-S', '""'] 235 if matches.is_files: 236 cmd.append('-f') 237 common_prefix = os.path.commonprefix(tuple(matches)) 238 if common_prefix: 239 cmd.extend(('-p', shlex.quote(common_prefix))) 240 matches = MatchGroup({k[len(common_prefix):]: v for k, v in matches.items()}) 241 has_descriptions = any(matches.values()) 242 if has_descriptions or matches.word_transforms: 243 lines.append('compdescriptions=(') 244 sz = max(map(wcswidth, matches.transformed_words())) 245 limit = min(16, sz) 246 for word, desc in matches.transformed_items(): 247 lines.extend(map(shlex.quote, fmt_desc(word, desc, limit))) 248 lines.append(')') 249 if has_descriptions: 250 cmd.append('-l') 251 cmd.append('-d') 252 cmd.append('compdescriptions') 253 cmd.append('--') 254 for word in matches: 255 cmd.append(shlex.quote(word)) 256 lines.append(' '.join(cmd) + ';') 257 258 if ans.delegate: 259 if ans.delegate.num_of_unknown_args == 1 and not ans.delegate.new_word: 260 lines.append('_command_names -e') 261 elif ans.delegate.precommand: 262 for i in range(ans.delegate.pos + 1): 263 lines.append('shift words') 264 lines.append('(( CURRENT-- ))') 265 lines.append(f'_normal -p "{ans.delegate.precommand}"') 266 result = '\n'.join(lines) 267 # debug(result) 268 return result 269 270 271@output_serializer 272def bash_output_serializer(ans: Completions) -> str: 273 lines = [] 274 for description, matches in ans.match_groups.items(): 275 for word in matches: 276 if matches.trailing_space: 277 word += ' ' 278 lines.append('COMPREPLY+=({})'.format(shlex.quote(word))) 279 # debug('\n'.join(lines)) 280 return '\n'.join(lines) 281 282 283@output_serializer 284def fish_output_serializer(ans: Completions) -> str: 285 lines = [] 286 for matches in ans.match_groups.values(): 287 for word in matches: 288 lines.append(shlex.quote(word)) 289 # debug('\n'.join(lines)) 290 return '\n'.join(lines) 291# }}} 292 293 294def completions_for_first_word(ans: Completions, prefix: str, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None: 295 cmds = ['@' + c for c in remote_control_command_names()] 296 ans.add_match_group('Entry points', { 297 k: '' for k in 298 list(entry_points) + cmds + ['+' + k for k in namespaced_entry_points] 299 if not prefix or k.startswith(prefix) 300 }) 301 if prefix: 302 ans.delegate = Delegate([prefix], 0) 303 304 305def kitty_cli_opts(ans: Completions, prefix: Optional[str] = None) -> None: 306 matches = {} 307 for opt in options_for_completion(): 308 if isinstance(opt, str): 309 continue 310 aliases = frozenset(x for x in opt['aliases'] if x.startswith(prefix)) if prefix else opt['aliases'] 311 for alias in aliases: 312 matches[alias] = opt['help'].strip() 313 ans.add_match_group('Options', matches) 314 315 316def complete_kitty_cli_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: 317 prefix = prefix or '' 318 if not opt: 319 if unknown_args.num_of_unknown_args > 0: 320 ans.delegate = unknown_args 321 return 322 dest = opt['dest'] 323 if dest == 'override': 324 from kitty.config import option_names_for_completion 325 k = 'Config directives' 326 ans.add_match_group(k, {k+'=': '' for k in option_names_for_completion() if k.startswith(prefix)}, trailing_space=False) 327 elif dest == 'config': 328 329 def is_conf_file(x: str) -> bool: 330 if os.path.isdir(x): 331 return True 332 return x.lower().endswith('.conf') 333 334 complete_files_and_dirs(ans, prefix, files_group_name='Config files', predicate=is_conf_file) 335 elif dest == 'session': 336 complete_files_and_dirs(ans, prefix, files_group_name='Session files') 337 elif dest == 'watcher': 338 complete_files_and_dirs(ans, prefix, files_group_name='Watcher files') 339 elif dest == 'directory': 340 complete_files_and_dirs(ans, prefix, files_group_name='Directories', predicate=os.path.isdir) 341 elif dest == 'listen_on': 342 if ':' not in prefix: 343 k = 'Address type' 344 ans.add_match_group(k, {x: x for x in ('unix:', 'tcp:') if x.startswith(prefix)}, trailing_space=False) 345 elif prefix.startswith('unix:') and not prefix.startswith('@'): 346 complete_files_and_dirs(ans, prefix[len('unix:'):], files_group_name='UNIX sockets', add_prefix='unix:') 347 else: 348 complete_basic_option_args(ans, opt, prefix) 349 350 351CompleteArgsFunc = Callable[[Completions, Optional[OptionDict], str, Delegate], None] 352 353 354def complete_alias_map( 355 ans: Completions, 356 words: Sequence[str], 357 new_word: bool, 358 option_map: Dict[str, OptionDict], 359 complete_args: Optional[CompleteArgsFunc] = None 360) -> None: 361 expecting_arg = False 362 opt: Optional[OptionDict] = None 363 last_word = words[-1] if words else '' 364 for i, w in enumerate(words): 365 if expecting_arg: 366 if w is last_word and not new_word: 367 if opt is not None and complete_args is not None: 368 complete_args(ans, opt, w, Delegate()) 369 return 370 expecting_arg = False 371 continue 372 opt = option_map.get(w) 373 if w is last_word and not new_word: 374 if w.startswith('-'): 375 ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(last_word)}) 376 else: 377 if complete_args is not None: 378 complete_args(ans, None, last_word, Delegate(words, i)) 379 return 380 if opt is None: 381 if complete_args is not None: 382 complete_args(ans, None, '' if new_word else last_word, Delegate(words, i, new_word)) 383 return # some non-option word encountered 384 expecting_arg = not opt.get('type', '').startswith('bool-') 385 if expecting_arg: 386 if opt is not None and complete_args is not None: 387 complete_args(ans, opt, '' if new_word else last_word, Delegate()) 388 else: 389 prefix = '' if new_word else last_word 390 if complete_args is not None: 391 complete_args(ans, None, prefix, Delegate()) 392 ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(prefix)}) 393 394 395def complete_cli( 396 ans: Completions, 397 words: Sequence[str], 398 new_word: bool, 399 seq: OptionSpecSeq, 400 complete_args: Optional[CompleteArgsFunc] = None 401) -> None: 402 option_map = {} 403 for opt in seq: 404 if not isinstance(opt, str): 405 for alias in opt['aliases']: 406 option_map[alias] = opt 407 complete_alias_map(ans, words, new_word, option_map, complete_args) 408 409 410def complete_remote_command(ans: Completions, cmd_name: str, words: Sequence[str], new_word: bool) -> None: 411 aliases, alias_map = options_for_cmd(cmd_name) 412 if not alias_map: 413 return 414 args_completer: Optional[CompleteArgsFunc] = None 415 args_completion = command_for_name(cmd_name).args_completion 416 if args_completion: 417 if 'files' in args_completion: 418 title, matchers = args_completion['files'] 419 if isinstance(matchers, tuple): 420 args_completer = remote_files_completer(title, matchers) 421 elif 'names' in args_completion: 422 title, q = args_completion['names'] 423 args_completer = remote_args_completer(title, q() if callable(q) else q) 424 complete_alias_map(ans, words, new_word, alias_map, complete_args=args_completer) 425 426 427def path_completion(prefix: str = '') -> Tuple[List[str], List[str]]: 428 prefix = prefix.replace(r'\ ', ' ') 429 dirs, files = [], [] 430 base = '.' 431 if prefix.endswith('/'): 432 base = prefix 433 elif '/' in prefix: 434 base = os.path.dirname(prefix) 435 src = os.path.expandvars(os.path.expanduser(base)) 436 src_prefix = os.path.abspath(os.path.expandvars(os.path.expanduser(prefix))) if prefix else '' 437 try: 438 items: Iterable[os.DirEntry] = os.scandir(src) 439 except FileNotFoundError: 440 items = () 441 for x in items: 442 abspath = os.path.abspath(x.path) 443 if prefix and not abspath.startswith(src_prefix): 444 continue 445 if prefix: 446 q = prefix + abspath[len(src_prefix):].lstrip(os.sep) 447 q = os.path.expandvars(os.path.expanduser(q)) 448 else: 449 q = os.path.relpath(abspath) 450 if x.is_dir(): 451 dirs.append(q.rstrip(os.sep) + os.sep) 452 else: 453 files.append(q) 454 return dirs, files 455 456 457def complete_files_and_dirs( 458 ans: Completions, 459 prefix: str, 460 files_group_name: str = 'Files', 461 predicate: Optional[Callable[[str], bool]] = None, 462 add_prefix: Optional[str] = None 463) -> None: 464 dirs, files_ = path_completion(prefix or '') 465 files: Iterable[str] = filter(predicate, files_) 466 if add_prefix: 467 dirs = list(add_prefix + x for x in dirs) 468 files = (add_prefix + x for x in files) 469 470 if dirs: 471 ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True) 472 if files: 473 ans.add_match_group(files_group_name, files, is_files=True) 474 475 476def complete_basic_option_args(ans: Completions, opt: OptionDict, prefix: str) -> None: 477 if opt['choices']: 478 ans.add_match_group(f'Choices for {opt["dest"]}', tuple(k for k in opt['choices'] if k.startswith(prefix))) 479 480 481def complete_icat_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: 482 from .guess_mime_type import guess_type 483 484 def icat_file_predicate(filename: str) -> bool: 485 mt = guess_type(filename, allow_filesystem_access=True) 486 if mt and mt.startswith('image/'): 487 return True 488 return False 489 490 if opt is None: 491 complete_files_and_dirs(ans, prefix, 'Images', icat_file_predicate) 492 else: 493 complete_basic_option_args(ans, opt, prefix) 494 495 496def complete_themes_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: 497 if opt is None: 498 from kittens.themes.collection import load_themes 499 themes = load_themes(cache_age=-1, ignore_no_cache=True) 500 names = tuple(t.name for t in themes if t.name.startswith(prefix)) 501 ans.add_match_group('Themes', names) 502 else: 503 complete_basic_option_args(ans, opt, prefix) 504 505 506def remote_files_completer(name: str, matchers: Tuple[str, ...]) -> CompleteArgsFunc: 507 508 def complete_files_map(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: 509 510 def predicate(filename: str) -> bool: 511 for m in matchers: 512 if isinstance(m, str): 513 from fnmatch import fnmatch 514 return fnmatch(filename, m) 515 return False 516 517 if opt is None: 518 complete_files_and_dirs(ans, prefix, name, predicate) 519 else: 520 complete_basic_option_args(ans, opt, prefix) 521 return complete_files_map 522 523 524def remote_args_completer(title: str, words: Iterable[str]) -> CompleteArgsFunc: 525 items = sorted(words) 526 527 def complete_names_for_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: 528 if opt is None: 529 ans.add_match_group(title, {c: '' for c in items if c.startswith(prefix)}) 530 else: 531 complete_basic_option_args(ans, opt, prefix) 532 533 return complete_names_for_arg 534 535 536def config_file_predicate(filename: str) -> bool: 537 return filename.endswith('.conf') 538 539 540def complete_diff_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None: 541 if opt is None: 542 complete_files_and_dirs(ans, prefix, 'Files') 543 elif opt['dest'] == 'config': 544 complete_files_and_dirs(ans, prefix, 'Config Files', config_file_predicate) 545 else: 546 complete_basic_option_args(ans, opt, prefix) 547 548 549def complete_kitten(ans: Completions, kitten: str, words: Sequence[str], new_word: bool) -> None: 550 try: 551 completer = get_kitten_completer(kitten) 552 except SystemExit: 553 completer = None 554 if completer is not None: 555 completer(ans, words, new_word) 556 return 557 try: 558 cd = get_kitten_cli_docs(kitten) 559 except SystemExit: 560 cd = None 561 if cd is None: 562 return 563 options = cd['options']() 564 seq = parse_option_spec(options)[0] 565 option_map = {} 566 for opt in seq: 567 if not isinstance(opt, str): 568 for alias in opt['aliases']: 569 option_map[alias] = opt 570 complete_alias_map(ans, words, new_word, option_map, { 571 'icat': complete_icat_args, 572 'diff': complete_diff_args, 573 'themes': complete_themes_args, 574 }.get(kitten)) 575 576 577def find_completions(words: Sequence[str], new_word: bool, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> Completions: 578 ans = Completions() 579 if not words or words[0] != 'kitty': 580 return ans 581 words = words[1:] 582 if not words or (len(words) == 1 and not new_word): 583 prefix = words[0] if words else '' 584 completions_for_first_word(ans, prefix, entry_points, namespaced_entry_points) 585 kitty_cli_opts(ans, prefix) 586 return ans 587 if words[0] == '@': 588 if len(words) == 1 or (len(words) == 2 and not new_word): 589 prefix = words[1] if len(words) > 1 else '' 590 ans.add_match_group('Remote control commands', {c: '' for c in remote_control_command_names() if c.startswith(prefix)}) 591 else: 592 complete_remote_command(ans, words[1], words[2:], new_word) 593 return ans 594 if words[0].startswith('@'): 595 if len(words) == 1 and not new_word: 596 prefix = words[0] 597 ans.add_match_group('Remote control commands', {'@' + c: '' for c in remote_control_command_names() if c.startswith(prefix)}) 598 else: 599 complete_remote_command(ans, words[0][1:], words[1:], new_word) 600 if words[0] == '+': 601 if len(words) == 1 or (len(words) == 2 and not new_word): 602 prefix = words[1] if len(words) > 1 else '' 603 ans.add_match_group('Entry points', {c: '' for c in namespaced_entry_points if c.startswith(prefix)}) 604 else: 605 if words[1] == 'kitten': 606 if len(words) == 2 or (len(words) == 3 and not new_word): 607 ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith('' if len(words) == 2 else words[2]))) 608 else: 609 complete_kitten(ans, words[2], words[3:], new_word) 610 return ans 611 if words[0].startswith('+'): 612 if len(words) == 1: 613 if new_word: 614 if words[0] == '+kitten': 615 ans.add_match_group('Kittens', all_kitten_names()) 616 else: 617 prefix = words[0] 618 ans.add_match_group('Entry points', (c for c in namespaced_entry_points if c.startswith(prefix))) 619 else: 620 if len(words) == 2 and not new_word: 621 ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith(words[1]))) 622 else: 623 if words[0] == '+kitten': 624 complete_kitten(ans, words[1], words[2:], new_word) 625 else: 626 complete_cli(ans, words, new_word, options_for_completion(), complete_kitty_cli_arg) 627 628 return ans 629 630 631def setup(cstyle: str) -> None: 632 print(completion_scripts[cstyle]) 633 634 635def main(args: Sequence[str], entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None: 636 if not args: 637 raise SystemExit('Must specify completion style') 638 cstyle = args[0] 639 if cstyle == 'setup': 640 return setup(args[1]) 641 data = sys.stdin.read() 642 try: 643 parser = parsers[cstyle] 644 serializer = serializers[cstyle] 645 except KeyError: 646 raise SystemExit('Unknown completion style: {}'.format(cstyle)) 647 words, new_word = parser(data) 648 ans = find_completions(words, new_word, entry_points, namespaced_entry_points) 649 print(serializer(ans), end='') 650