1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPLv3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
4
5import os
6import shlex
7import sys
8from typing import (
9    Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple,
10    Union
11)
12
13from kittens.runner import (
14    all_kitten_names, get_kitten_cli_docs, get_kitten_completer
15)
16
17from .cli import (
18    OptionDict, OptionSpecSeq, options_for_completion, parse_option_spec,
19    prettify
20)
21from .fast_data_types import truncate_point_for_length, wcswidth
22from .rc.base import all_command_names, command_for_name
23from .shell import options_for_cmd
24from .types import run_once
25from .utils import screen_size_function
26
27'''
28To add completion for a new shell, you need to:
29
301) Add an entry to completion scripts for your shell, this is
31a simple function that calls kitty's completion code and passes the
32results to the shell's completion system. This can be output by
33`kitty +complete setup shell_name` and its output goes into
34your shell's rc file.
35
362) Add an input_parser function, this takes the input from
37the shell for the text being completed and returns a list of words
38and a boolean indicating if we are on a new word or not. This
39is passed to kitty's completion system.
40
413) An output_serializer function that is responsible for
42taking the results from kitty's completion system and converting
43them into something your shell will understand.
44'''
45
46parsers: Dict[str, Callable] = {}
47serializers: Dict[str, Callable] = {}
48
49
50class MatchGroup:
51
52    def __init__(
53        self, x: Union[Dict[str, str], Iterable[str]],
54        trailing_space: bool = True,
55        is_files: bool = False,
56        word_transforms: Optional[Dict[str, str]] = None,
57    ):
58        self.mdict = x if isinstance(x, dict) else dict.fromkeys(x, '')
59        self.trailing_space = trailing_space
60        self.is_files = is_files
61        self.word_transforms = word_transforms or {}
62
63    def __iter__(self) -> Iterator[str]:
64        return iter(self.mdict)
65
66    def transformed_words(self) -> Iterator[str]:
67        for w in self:
68            yield self.word_transforms.get(w, w)
69
70    def transformed_items(self) -> Iterator[Tuple[str, str]]:
71        for w, desc in self.items():
72            yield self.word_transforms.get(w, w), desc
73
74    def items(self) -> Iterator[Tuple[str, str]]:
75        return iter(self.mdict.items())
76
77    def values(self) -> Iterator[str]:
78        return iter(self.mdict.values())
79
80
81def debug(*a: Any, **kw: Any) -> None:
82    from kittens.tui.loop import debug_write
83    debug_write(*a, **kw)
84
85
86class Delegate:
87
88    def __init__(self, words: Sequence[str] = (), pos: int = -1, new_word: bool = False):
89        self.words: Sequence[str] = words
90        self.pos = pos
91        self.num_of_unknown_args = len(words) - pos
92        self.new_word = new_word
93
94    def __bool__(self) -> bool:
95        return self.pos > -1 and self.num_of_unknown_args > 0
96
97    @property
98    def precommand(self) -> str:
99        try:
100            return self.words[self.pos]
101        except IndexError:
102            return ''
103
104
105class Completions:
106
107    def __init__(self) -> None:
108        self.match_groups: Dict[str, MatchGroup] = {}
109        self.delegate: Delegate = Delegate()
110
111    def add_match_group(
112        self, name: str, x: Union[Dict[str, str], Iterable[str]],
113        trailing_space: bool = True,
114        is_files: bool = False,
115        word_transforms: Optional[Dict[str, str]] = None
116    ) -> MatchGroup:
117        self.match_groups[name] = m = MatchGroup(x, trailing_space, is_files, word_transforms)
118        return m
119
120
121@run_once
122def remote_control_command_names() -> Tuple[str, ...]:
123    return tuple(sorted(x.replace('_', '-') for x in all_command_names()))
124
125
126# Shell specific code {{{
127
128
129completion_scripts = {
130    'zsh': '''#compdef kitty
131
132_kitty() {
133    local src
134    # Send all words up to the word the cursor is currently on
135    src=$(printf "%s\n" "${(@)words[1,$CURRENT]}" | kitty +complete zsh)
136    if [[ $? == 0 ]]; then
137        eval ${src}
138    fi
139}
140compdef _kitty kitty
141''',
142    'bash': '''
143_kitty_completions() {
144    local src
145    local limit
146    # Send all words up to the word the cursor is currently on
147    let limit=1+$COMP_CWORD
148    src=$(printf "%s\n" "${COMP_WORDS[@]: 0:$limit}" | kitty +complete bash)
149    if [[ $? == 0 ]]; then
150        eval ${src}
151    fi
152}
153
154complete -o nospace -F _kitty_completions kitty
155''',
156    'fish': '''
157function __kitty_completions
158    # Send all words up to the one before the cursor
159    commandline -cop | kitty +complete fish
160end
161
162complete -f -c kitty -a "(__kitty_completions)"
163''',
164}
165
166ParseResult = Tuple[List[str], bool]
167ParserFunc = Callable[[str], ParseResult]
168SerializerFunc = Callable[[Completions], str]
169
170
171def input_parser(func: ParserFunc) -> ParserFunc:
172    name = func.__name__.split('_')[0]
173    parsers[name] = func
174    return func
175
176
177def output_serializer(func: SerializerFunc) -> SerializerFunc:
178    name = func.__name__.split('_')[0]
179    serializers[name] = func
180    return func
181
182
183@input_parser
184def zsh_input_parser(data: str) -> ParseResult:
185    new_word = data.endswith('\n\n')
186    words = data.rstrip().splitlines()
187    return words, new_word
188
189
190@input_parser
191def bash_input_parser(data: str) -> ParseResult:
192    new_word = data.endswith('\n\n')
193    words = data.rstrip().splitlines()
194    return words, new_word
195
196
197@input_parser
198def fish_input_parser(data: str) -> ParseResult:
199    return data.rstrip().splitlines(), True
200
201
202@output_serializer
203def zsh_output_serializer(ans: Completions) -> str:
204    lines = []
205
206    screen = screen_size_function(sys.stderr.fileno())()
207    width = screen.cols
208
209    def fmt_desc(word: str, desc: str, max_word_len: int) -> Iterator[str]:
210        if not desc:
211            yield word
212            return
213        desc = prettify(desc.splitlines()[0])
214        multiline = False
215        if wcswidth(word) > max_word_len:
216            max_desc_len = width - 2
217            multiline = True
218        else:
219            word = word.ljust(max_word_len)
220            max_desc_len = width - max_word_len - 3
221        if wcswidth(desc) > max_desc_len:
222            desc = desc[:truncate_point_for_length(desc, max_desc_len - 2)]
223            desc += '…'
224
225        if multiline:
226            ans = f'{word}\n  {desc}'
227        else:
228            ans = f'{word}  {desc}'
229        yield ans
230
231    for description, matches in ans.match_groups.items():
232        cmd = ['compadd', '-U', '-J', shlex.quote(description), '-X', shlex.quote('%B' + description + '%b')]
233        if not matches.trailing_space:
234            cmd += ['-S', '""']
235        if matches.is_files:
236            cmd.append('-f')
237            common_prefix = os.path.commonprefix(tuple(matches))
238            if common_prefix:
239                cmd.extend(('-p', shlex.quote(common_prefix)))
240                matches = MatchGroup({k[len(common_prefix):]: v for k, v in matches.items()})
241        has_descriptions = any(matches.values())
242        if has_descriptions or matches.word_transforms:
243            lines.append('compdescriptions=(')
244            sz = max(map(wcswidth, matches.transformed_words()))
245            limit = min(16, sz)
246            for word, desc in matches.transformed_items():
247                lines.extend(map(shlex.quote, fmt_desc(word, desc, limit)))
248            lines.append(')')
249            if has_descriptions:
250                cmd.append('-l')
251            cmd.append('-d')
252            cmd.append('compdescriptions')
253        cmd.append('--')
254        for word in matches:
255            cmd.append(shlex.quote(word))
256        lines.append(' '.join(cmd) + ';')
257
258    if ans.delegate:
259        if ans.delegate.num_of_unknown_args == 1 and not ans.delegate.new_word:
260            lines.append('_command_names -e')
261        elif ans.delegate.precommand:
262            for i in range(ans.delegate.pos + 1):
263                lines.append('shift words')
264                lines.append('(( CURRENT-- ))')
265            lines.append(f'_normal -p "{ans.delegate.precommand}"')
266    result = '\n'.join(lines)
267    # debug(result)
268    return result
269
270
271@output_serializer
272def bash_output_serializer(ans: Completions) -> str:
273    lines = []
274    for description, matches in ans.match_groups.items():
275        for word in matches:
276            if matches.trailing_space:
277                word += ' '
278            lines.append('COMPREPLY+=({})'.format(shlex.quote(word)))
279    # debug('\n'.join(lines))
280    return '\n'.join(lines)
281
282
283@output_serializer
284def fish_output_serializer(ans: Completions) -> str:
285    lines = []
286    for matches in ans.match_groups.values():
287        for word in matches:
288            lines.append(shlex.quote(word))
289    # debug('\n'.join(lines))
290    return '\n'.join(lines)
291# }}}
292
293
294def completions_for_first_word(ans: Completions, prefix: str, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None:
295    cmds = ['@' + c for c in remote_control_command_names()]
296    ans.add_match_group('Entry points', {
297        k: '' for k in
298        list(entry_points) + cmds + ['+' + k for k in namespaced_entry_points]
299        if not prefix or k.startswith(prefix)
300    })
301    if prefix:
302        ans.delegate = Delegate([prefix], 0)
303
304
305def kitty_cli_opts(ans: Completions, prefix: Optional[str] = None) -> None:
306    matches = {}
307    for opt in options_for_completion():
308        if isinstance(opt, str):
309            continue
310        aliases = frozenset(x for x in opt['aliases'] if x.startswith(prefix)) if prefix else opt['aliases']
311        for alias in aliases:
312            matches[alias] = opt['help'].strip()
313    ans.add_match_group('Options', matches)
314
315
316def complete_kitty_cli_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
317    prefix = prefix or ''
318    if not opt:
319        if unknown_args.num_of_unknown_args > 0:
320            ans.delegate = unknown_args
321        return
322    dest = opt['dest']
323    if dest == 'override':
324        from kitty.config import option_names_for_completion
325        k = 'Config directives'
326        ans.add_match_group(k, {k+'=': '' for k in option_names_for_completion() if k.startswith(prefix)}, trailing_space=False)
327    elif dest == 'config':
328
329        def is_conf_file(x: str) -> bool:
330            if os.path.isdir(x):
331                return True
332            return x.lower().endswith('.conf')
333
334        complete_files_and_dirs(ans, prefix, files_group_name='Config files', predicate=is_conf_file)
335    elif dest == 'session':
336        complete_files_and_dirs(ans, prefix, files_group_name='Session files')
337    elif dest == 'watcher':
338        complete_files_and_dirs(ans, prefix, files_group_name='Watcher files')
339    elif dest == 'directory':
340        complete_files_and_dirs(ans, prefix, files_group_name='Directories', predicate=os.path.isdir)
341    elif dest == 'listen_on':
342        if ':' not in prefix:
343            k = 'Address type'
344            ans.add_match_group(k, {x: x for x in ('unix:', 'tcp:') if x.startswith(prefix)}, trailing_space=False)
345        elif prefix.startswith('unix:') and not prefix.startswith('@'):
346            complete_files_and_dirs(ans, prefix[len('unix:'):], files_group_name='UNIX sockets', add_prefix='unix:')
347    else:
348        complete_basic_option_args(ans, opt, prefix)
349
350
351CompleteArgsFunc = Callable[[Completions, Optional[OptionDict], str, Delegate], None]
352
353
354def complete_alias_map(
355    ans: Completions,
356    words: Sequence[str],
357    new_word: bool,
358    option_map: Dict[str, OptionDict],
359    complete_args: Optional[CompleteArgsFunc] = None
360) -> None:
361    expecting_arg = False
362    opt: Optional[OptionDict] = None
363    last_word = words[-1] if words else ''
364    for i, w in enumerate(words):
365        if expecting_arg:
366            if w is last_word and not new_word:
367                if opt is not None and complete_args is not None:
368                    complete_args(ans, opt, w, Delegate())
369                return
370            expecting_arg = False
371            continue
372        opt = option_map.get(w)
373        if w is last_word and not new_word:
374            if w.startswith('-'):
375                ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(last_word)})
376            else:
377                if complete_args is not None:
378                    complete_args(ans, None, last_word, Delegate(words, i))
379            return
380        if opt is None:
381            if complete_args is not None:
382                complete_args(ans, None, '' if new_word else last_word, Delegate(words, i, new_word))
383            return  # some non-option word encountered
384        expecting_arg = not opt.get('type', '').startswith('bool-')
385    if expecting_arg:
386        if opt is not None and complete_args is not None:
387            complete_args(ans, opt, '' if new_word else last_word, Delegate())
388    else:
389        prefix = '' if new_word else last_word
390        if complete_args is not None:
391            complete_args(ans, None, prefix, Delegate())
392        ans.add_match_group('Options', {k: opt['help'] for k, opt in option_map.items() if k.startswith(prefix)})
393
394
395def complete_cli(
396    ans: Completions,
397    words: Sequence[str],
398    new_word: bool,
399    seq: OptionSpecSeq,
400    complete_args: Optional[CompleteArgsFunc] = None
401) -> None:
402    option_map = {}
403    for opt in seq:
404        if not isinstance(opt, str):
405            for alias in opt['aliases']:
406                option_map[alias] = opt
407    complete_alias_map(ans, words, new_word, option_map, complete_args)
408
409
410def complete_remote_command(ans: Completions, cmd_name: str, words: Sequence[str], new_word: bool) -> None:
411    aliases, alias_map = options_for_cmd(cmd_name)
412    if not alias_map:
413        return
414    args_completer: Optional[CompleteArgsFunc] = None
415    args_completion = command_for_name(cmd_name).args_completion
416    if args_completion:
417        if 'files' in args_completion:
418            title, matchers = args_completion['files']
419            if isinstance(matchers, tuple):
420                args_completer = remote_files_completer(title, matchers)
421        elif 'names' in args_completion:
422            title, q = args_completion['names']
423            args_completer = remote_args_completer(title, q() if callable(q) else q)
424    complete_alias_map(ans, words, new_word, alias_map, complete_args=args_completer)
425
426
427def path_completion(prefix: str = '') -> Tuple[List[str], List[str]]:
428    prefix = prefix.replace(r'\ ', ' ')
429    dirs, files = [], []
430    base = '.'
431    if prefix.endswith('/'):
432        base = prefix
433    elif '/' in prefix:
434        base = os.path.dirname(prefix)
435    src = os.path.expandvars(os.path.expanduser(base))
436    src_prefix = os.path.abspath(os.path.expandvars(os.path.expanduser(prefix))) if prefix else ''
437    try:
438        items: Iterable[os.DirEntry] = os.scandir(src)
439    except FileNotFoundError:
440        items = ()
441    for x in items:
442        abspath = os.path.abspath(x.path)
443        if prefix and not abspath.startswith(src_prefix):
444            continue
445        if prefix:
446            q = prefix + abspath[len(src_prefix):].lstrip(os.sep)
447            q = os.path.expandvars(os.path.expanduser(q))
448        else:
449            q = os.path.relpath(abspath)
450        if x.is_dir():
451            dirs.append(q.rstrip(os.sep) + os.sep)
452        else:
453            files.append(q)
454    return dirs, files
455
456
457def complete_files_and_dirs(
458    ans: Completions,
459    prefix: str,
460    files_group_name: str = 'Files',
461    predicate: Optional[Callable[[str], bool]] = None,
462    add_prefix: Optional[str] = None
463) -> None:
464    dirs, files_ = path_completion(prefix or '')
465    files: Iterable[str] = filter(predicate, files_)
466    if add_prefix:
467        dirs = list(add_prefix + x for x in dirs)
468        files = (add_prefix + x for x in files)
469
470    if dirs:
471        ans.add_match_group('Directories', dirs, trailing_space=False, is_files=True)
472    if files:
473        ans.add_match_group(files_group_name, files, is_files=True)
474
475
476def complete_basic_option_args(ans: Completions, opt: OptionDict, prefix: str) -> None:
477    if opt['choices']:
478        ans.add_match_group(f'Choices for {opt["dest"]}', tuple(k for k in opt['choices'] if k.startswith(prefix)))
479
480
481def complete_icat_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
482    from .guess_mime_type import guess_type
483
484    def icat_file_predicate(filename: str) -> bool:
485        mt = guess_type(filename, allow_filesystem_access=True)
486        if mt and mt.startswith('image/'):
487            return True
488        return False
489
490    if opt is None:
491        complete_files_and_dirs(ans, prefix, 'Images', icat_file_predicate)
492    else:
493        complete_basic_option_args(ans, opt, prefix)
494
495
496def complete_themes_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
497    if opt is None:
498        from kittens.themes.collection import load_themes
499        themes = load_themes(cache_age=-1, ignore_no_cache=True)
500        names = tuple(t.name for t in themes if t.name.startswith(prefix))
501        ans.add_match_group('Themes', names)
502    else:
503        complete_basic_option_args(ans, opt, prefix)
504
505
506def remote_files_completer(name: str, matchers: Tuple[str, ...]) -> CompleteArgsFunc:
507
508    def complete_files_map(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
509
510        def predicate(filename: str) -> bool:
511            for m in matchers:
512                if isinstance(m, str):
513                    from fnmatch import fnmatch
514                    return fnmatch(filename, m)
515            return False
516
517        if opt is None:
518            complete_files_and_dirs(ans, prefix, name, predicate)
519        else:
520            complete_basic_option_args(ans, opt, prefix)
521    return complete_files_map
522
523
524def remote_args_completer(title: str, words: Iterable[str]) -> CompleteArgsFunc:
525    items = sorted(words)
526
527    def complete_names_for_arg(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
528        if opt is None:
529            ans.add_match_group(title, {c: '' for c in items if c.startswith(prefix)})
530        else:
531            complete_basic_option_args(ans, opt, prefix)
532
533    return complete_names_for_arg
534
535
536def config_file_predicate(filename: str) -> bool:
537    return filename.endswith('.conf')
538
539
540def complete_diff_args(ans: Completions, opt: Optional[OptionDict], prefix: str, unknown_args: Delegate) -> None:
541    if opt is None:
542        complete_files_and_dirs(ans, prefix, 'Files')
543    elif opt['dest'] == 'config':
544        complete_files_and_dirs(ans, prefix, 'Config Files', config_file_predicate)
545    else:
546        complete_basic_option_args(ans, opt, prefix)
547
548
549def complete_kitten(ans: Completions, kitten: str, words: Sequence[str], new_word: bool) -> None:
550    try:
551        completer = get_kitten_completer(kitten)
552    except SystemExit:
553        completer = None
554    if completer is not None:
555        completer(ans, words, new_word)
556        return
557    try:
558        cd = get_kitten_cli_docs(kitten)
559    except SystemExit:
560        cd = None
561    if cd is None:
562        return
563    options = cd['options']()
564    seq = parse_option_spec(options)[0]
565    option_map = {}
566    for opt in seq:
567        if not isinstance(opt, str):
568            for alias in opt['aliases']:
569                option_map[alias] = opt
570    complete_alias_map(ans, words, new_word, option_map, {
571        'icat': complete_icat_args,
572        'diff': complete_diff_args,
573        'themes': complete_themes_args,
574    }.get(kitten))
575
576
577def find_completions(words: Sequence[str], new_word: bool, entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> Completions:
578    ans = Completions()
579    if not words or words[0] != 'kitty':
580        return ans
581    words = words[1:]
582    if not words or (len(words) == 1 and not new_word):
583        prefix = words[0] if words else ''
584        completions_for_first_word(ans, prefix, entry_points, namespaced_entry_points)
585        kitty_cli_opts(ans, prefix)
586        return ans
587    if words[0] == '@':
588        if len(words) == 1 or (len(words) == 2 and not new_word):
589            prefix = words[1] if len(words) > 1 else ''
590            ans.add_match_group('Remote control commands', {c: '' for c in remote_control_command_names() if c.startswith(prefix)})
591        else:
592            complete_remote_command(ans, words[1], words[2:], new_word)
593        return ans
594    if words[0].startswith('@'):
595        if len(words) == 1 and not new_word:
596            prefix = words[0]
597            ans.add_match_group('Remote control commands', {'@' + c: '' for c in remote_control_command_names() if c.startswith(prefix)})
598        else:
599            complete_remote_command(ans, words[0][1:], words[1:], new_word)
600    if words[0] == '+':
601        if len(words) == 1 or (len(words) == 2 and not new_word):
602            prefix = words[1] if len(words) > 1 else ''
603            ans.add_match_group('Entry points', {c: '' for c in namespaced_entry_points if c.startswith(prefix)})
604        else:
605            if words[1] == 'kitten':
606                if len(words) == 2 or (len(words) == 3 and not new_word):
607                    ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith('' if len(words) == 2 else words[2])))
608                else:
609                    complete_kitten(ans, words[2], words[3:], new_word)
610        return ans
611    if words[0].startswith('+'):
612        if len(words) == 1:
613            if new_word:
614                if words[0] == '+kitten':
615                    ans.add_match_group('Kittens', all_kitten_names())
616            else:
617                prefix = words[0]
618                ans.add_match_group('Entry points', (c for c in namespaced_entry_points if c.startswith(prefix)))
619        else:
620            if len(words) == 2 and not new_word:
621                ans.add_match_group('Kittens', (k for k in all_kitten_names() if k.startswith(words[1])))
622            else:
623                if words[0] == '+kitten':
624                    complete_kitten(ans, words[1], words[2:], new_word)
625    else:
626        complete_cli(ans, words, new_word, options_for_completion(), complete_kitty_cli_arg)
627
628    return ans
629
630
631def setup(cstyle: str) -> None:
632    print(completion_scripts[cstyle])
633
634
635def main(args: Sequence[str], entry_points: Iterable[str], namespaced_entry_points: Iterable[str]) -> None:
636    if not args:
637        raise SystemExit('Must specify completion style')
638    cstyle = args[0]
639    if cstyle == 'setup':
640        return setup(args[1])
641    data = sys.stdin.read()
642    try:
643        parser = parsers[cstyle]
644        serializer = serializers[cstyle]
645    except KeyError:
646        raise SystemExit('Unknown completion style: {}'.format(cstyle))
647    words, new_word = parser(data)
648    ans = find_completions(words, new_word, entry_points, namespaced_entry_points)
649    print(serializer(ans), end='')
650