1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
4
5import os
6import re
7import subprocess
8from typing import Callable, Dict, Iterable, Iterator, Sequence, Tuple
9
10from kitty.complete import Completions, complete_files_and_dirs, debug
11from kitty.types import run_once
12
13debug
14
15
16def lines_from_file(path: str) -> Iterator[str]:
17    try:
18        f = open(os.path.expanduser(path))
19    except OSError:
20        pass
21    else:
22        yield from f
23
24
25def lines_from_command(*cmd: str) -> Iterator[str]:
26    try:
27        output = subprocess.check_output(cmd).decode('utf-8')
28    except Exception:
29        return
30    yield from output.splitlines()
31
32
33def parts_yielder(lines: Iterable[str], pfilter: Callable[[str], Iterator[str]]) -> Iterator[str]:
34    for line in lines:
35        yield from pfilter(line)
36
37
38def hosts_from_config_lines(line: str) -> Iterator[str]:
39    parts = line.strip().split()
40    if len(parts) > 1 and parts[0] == 'Host':
41        yield parts[1]
42
43
44def hosts_from_known_hosts(line: str) -> Iterator[str]:
45    parts = line.strip().split()
46    if parts:
47        yield re.sub(r':\d+$', '', parts[0])
48
49
50def hosts_from_hosts(line: str) -> Iterator[str]:
51    line = line.strip()
52    if not line.startswith('#'):
53        parts = line.split()
54        if parts:
55            yield parts[0]
56        if len(parts) > 1:
57            yield parts[1]
58        if len(parts) > 2:
59            yield parts[2]
60
61
62def iter_known_hosts() -> Iterator[str]:
63    yield from parts_yielder(lines_from_file('~/.ssh/config'), hosts_from_config_lines)
64    yield from parts_yielder(lines_from_file('~/.ssh/known_hosts'), hosts_from_known_hosts)
65    yield from parts_yielder(lines_from_file('/etc/ssh/ssh_known_hosts'), hosts_from_known_hosts)
66    yield from parts_yielder(lines_from_file('/etc/hosts'), hosts_from_hosts)
67    yield from parts_yielder(lines_from_command('getent', 'hosts'), hosts_from_hosts)
68
69
70@run_once
71def known_hosts() -> Tuple[str, ...]:
72    return tuple(sorted(filter(lambda x: '*' not in x and '[' not in x, set(iter_known_hosts()))))
73
74
75@run_once
76def ssh_options() -> Dict[str, str]:
77    stderr = subprocess.Popen(['ssh'], stderr=subprocess.PIPE).stderr
78    assert stderr is not None
79    raw = stderr.read().decode('utf-8')
80    ans: Dict[str, str] = {}
81    pos = 0
82    while True:
83        pos = raw.find('[', pos)
84        if pos < 0:
85            break
86        num = 1
87        epos = pos
88        while num > 0:
89            epos += 1
90            if raw[epos] not in '[]':
91                continue
92            num += 1 if raw[epos] == '[' else -1
93        q = raw[pos+1:epos]
94        pos = epos
95        if len(q) < 2 or q[0] != '-':
96            continue
97        if ' ' in q:
98            opt, desc = q.split(' ', 1)
99            ans[opt[1:]] = desc
100        else:
101            ans.update(dict.fromkeys(q[1:], ''))
102    return ans
103
104
105# option help {{{
106@run_once
107def option_help_map() -> Dict[str, str]:
108    ans: Dict[str, str] = {}
109    lines = '''
110-4  -- force ssh to use IPv4 addresses only
111-6  -- force ssh to use IPv6 addresses only
112-a  -- disable forwarding of authentication agent connection
113-A  -- enable forwarding of the authentication agent connection
114-B  -- bind to specified interface before attempting to connect
115-b  -- specify interface to transmit on
116-C  -- compress data
117-c  -- select encryption cipher
118-D  -- specify a dynamic port forwarding
119-E  -- append log output to file instead of stderr
120-e  -- set escape character
121-f  -- go to background
122-F  -- specify alternate config file
123-g  -- allow remote hosts to connect to local forwarded ports
124-G  -- output configuration and exit
125-i  -- select identity file
126-I  -- specify smartcard device
127-J  -- connect via a jump host
128-k  -- disable forwarding of GSSAPI credentials
129-K  -- enable GSSAPI-based authentication and forwarding
130-L  -- specify local port forwarding
131-l  -- specify login name
132-M  -- master mode for connection sharing
133-m  -- specify mac algorithms
134-N  -- don't execute a remote command
135-n  -- redirect stdin from /dev/null
136-O  -- control an active connection multiplexing master process
137-o  -- specify extra options
138-p  -- specify port on remote host
139-P  -- use non privileged port
140-Q  -- query parameters
141-q  -- quiet operation
142-R  -- specify remote port forwarding
143-s  -- invoke subsystem
144-S  -- specify location of control socket for connection sharing
145-T  -- disable pseudo-tty allocation
146-t  -- force pseudo-tty allocation
147-V  -- show version number
148-v  -- verbose mode (multiple increase verbosity, up to 3)
149-W  -- forward standard input and output to host
150-w  -- request tunnel device forwarding
151-x  -- disable X11 forwarding
152-X  -- enable (untrusted) X11 forwarding
153-Y  -- enable trusted X11 forwarding
154-y  -- send log info via syslog instead of stderr
155'''.splitlines()
156    for line in lines:
157        line = line.strip()
158        if line:
159            parts = line.split(maxsplit=2)
160            ans[parts[0]] = parts[2]
161    return ans
162# }}}
163
164
165# option names {{{
166@run_once
167def option_names() -> Tuple[str, ...]:
168    return tuple(filter(None, (
169        line.strip() for line in '''
170AddKeysToAgent
171AddressFamily
172BatchMode
173BindAddress
174CanonicalDomains
175CanonicalizeFallbackLocal
176CanonicalizeHostname
177CanonicalizeMaxDots
178CanonicalizePermittedCNAMEs
179CASignatureAlgorithms
180CertificateFile
181ChallengeResponseAuthentication
182CheckHostIP
183Ciphers
184ClearAllForwardings
185Compression
186ConnectionAttempts
187ConnectTimeout
188ControlMaster
189ControlPath
190ControlPersist
191DynamicForward
192EscapeChar
193ExitOnForwardFailure
194FingerprintHash
195ForwardAgent
196ForwardX11
197ForwardX11Timeout
198ForwardX11Trusted
199GatewayPorts
200GlobalKnownHostsFile
201GSSAPIAuthentication
202GSSAPIDelegateCredentials
203HashKnownHosts
204Host
205HostbasedAcceptedAlgorithms
206HostbasedAuthentication
207HostKeyAlgorithms
208HostKeyAlias
209Hostname
210IdentitiesOnly
211IdentityAgent
212IdentityFile
213IPQoS
214KbdInteractiveAuthentication
215KbdInteractiveDevices
216KexAlgorithms
217KnownHostsCommand
218LocalCommand
219LocalForward
220LogLevel
221MACs
222Match
223NoHostAuthenticationForLocalhost
224NumberOfPasswordPrompts
225PasswordAuthentication
226PermitLocalCommand
227PermitRemoteOpen
228PKCS11Provider
229Port
230PreferredAuthentications
231ProxyCommand
232ProxyJump
233ProxyUseFdpass
234PubkeyAcceptedAlgorithms
235PubkeyAuthentication
236RekeyLimit
237RemoteCommand
238RemoteForward
239RequestTTY
240SendEnv
241ServerAliveInterval
242ServerAliveCountMax
243SetEnv
244StreamLocalBindMask
245StreamLocalBindUnlink
246StrictHostKeyChecking
247TCPKeepAlive
248Tunnel
249TunnelDevice
250UpdateHostKeys
251User
252UserKnownHostsFile
253VerifyHostKeyDNS
254VisualHostKey
255XAuthLocation
256'''.splitlines())))
257# }}}
258
259
260def complete_choices(ans: Completions, prefix: str, title: str, choices: Iterable[str], comma_separated: bool = False) -> None:
261    matches: Dict[str, str] = {}
262    word_transforms = {}
263    effective_prefix = prefix
264    hidden_prefix = ''
265    if comma_separated:
266        effective_prefix = prefix.split(',')[-1]
267        hidden_prefix = ','.join(prefix.split(',')[:-1])
268        if hidden_prefix:
269            hidden_prefix += ','
270    for q in choices:
271        if q.startswith(effective_prefix):
272            if comma_separated:
273                tq = q
274                q = hidden_prefix + q + ','
275                word_transforms[q] = tq
276            matches[q] = ''
277    ans.add_match_group(title, matches, trailing_space=not comma_separated, word_transforms=word_transforms)
278
279
280def complete_q_choices(ans: Completions, prefix: str, title: str, key: str, comma_separated: bool) -> None:
281    choices = (line.strip() for line in lines_from_command('ssh', '-Q', key))
282    complete_choices(ans, prefix, title, choices, comma_separated)
283
284
285def complete_arg(ans: Completions, option_flag: str, prefix: str = '') -> None:
286    options = ssh_options()
287    option_name = options.get(option_flag[1:])
288    if option_name.endswith('file') or option_name.endswith('path'):
289        return complete_files_and_dirs(ans, prefix, option_name)
290    choices = {
291        'mac_spec': ('MAC algorithm', 'mac', True),
292        'cipher_spec': ('encryption cipher', 'cipher', True),
293        'query_option': ('query option', 'help', False),
294    }
295    if option_name in choices:
296        return complete_q_choices(ans, prefix, *choices[option_name])
297    if option_name == 'destination':
298        return complete_destination(ans, prefix)
299    if option_name == 'ctl_cmd':
300        return complete_choices(ans, prefix, 'control command', ('check', 'forward', 'cancel', 'exit'))
301    if option_name == 'option':
302        matches = (x+'=' for x in option_names() if x.startswith(prefix))
303        word_transforms = {x+'=': x for x in option_names()}
304        ans.add_match_group('configure file option', matches, trailing_space=False, word_transforms=word_transforms)
305
306
307def complete_destination(ans: Completions, prefix: str = '') -> None:
308    result = (k for k in known_hosts() if k.startswith(prefix))
309    ans.add_match_group('remote host name', result)
310
311
312def complete_option(ans: Completions, prefix: str = '-') -> None:
313    hm = option_help_map()
314    if len(prefix) <= 1:
315        result = {k: v for k, v in hm.items() if k.startswith(prefix)}
316        ans.add_match_group('option', result)
317    else:
318        ans.add_match_group('option', {prefix: ''})
319
320
321def complete(ans: Completions, words: Sequence[str], new_word: bool) -> None:
322    options = ssh_options()
323    expecting_arg = False
324    types = ['' for i in range(len(words))]
325    for i, word in enumerate(words):
326        if expecting_arg:
327            types[i] = 'arg'
328            expecting_arg = False
329            continue
330        if word.startswith('-'):
331            types[i] = 'option'
332            if len(word) == 2 and options.get(word[1]):
333                expecting_arg = True
334            continue
335        types[i] = 'destination'
336        break
337    if new_word:
338        if words:
339            if expecting_arg:
340                return complete_arg(ans, words[-1])
341        return complete_destination(ans)
342    if words:
343        if types[-1] == 'arg' and len(words) > 1:
344            return complete_arg(ans, words[-2], words[-1])
345        if types[-1] == 'destination':
346            return complete_destination(ans, words[-1])
347        if types[-1] == 'option':
348            return complete_option(ans, words[-1])
349