1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net> 4 5import os 6import re 7import subprocess 8from typing import Callable, Dict, Iterable, Iterator, Sequence, Tuple 9 10from kitty.complete import Completions, complete_files_and_dirs, debug 11from kitty.types import run_once 12 13debug 14 15 16def lines_from_file(path: str) -> Iterator[str]: 17 try: 18 f = open(os.path.expanduser(path)) 19 except OSError: 20 pass 21 else: 22 yield from f 23 24 25def lines_from_command(*cmd: str) -> Iterator[str]: 26 try: 27 output = subprocess.check_output(cmd).decode('utf-8') 28 except Exception: 29 return 30 yield from output.splitlines() 31 32 33def parts_yielder(lines: Iterable[str], pfilter: Callable[[str], Iterator[str]]) -> Iterator[str]: 34 for line in lines: 35 yield from pfilter(line) 36 37 38def hosts_from_config_lines(line: str) -> Iterator[str]: 39 parts = line.strip().split() 40 if len(parts) > 1 and parts[0] == 'Host': 41 yield parts[1] 42 43 44def hosts_from_known_hosts(line: str) -> Iterator[str]: 45 parts = line.strip().split() 46 if parts: 47 yield re.sub(r':\d+$', '', parts[0]) 48 49 50def hosts_from_hosts(line: str) -> Iterator[str]: 51 line = line.strip() 52 if not line.startswith('#'): 53 parts = line.split() 54 if parts: 55 yield parts[0] 56 if len(parts) > 1: 57 yield parts[1] 58 if len(parts) > 2: 59 yield parts[2] 60 61 62def iter_known_hosts() -> Iterator[str]: 63 yield from parts_yielder(lines_from_file('~/.ssh/config'), hosts_from_config_lines) 64 yield from parts_yielder(lines_from_file('~/.ssh/known_hosts'), hosts_from_known_hosts) 65 yield from parts_yielder(lines_from_file('/etc/ssh/ssh_known_hosts'), hosts_from_known_hosts) 66 yield from parts_yielder(lines_from_file('/etc/hosts'), hosts_from_hosts) 67 yield from parts_yielder(lines_from_command('getent', 'hosts'), hosts_from_hosts) 68 69 70@run_once 71def known_hosts() -> Tuple[str, ...]: 72 return tuple(sorted(filter(lambda x: '*' not in x and '[' not in x, set(iter_known_hosts())))) 73 74 75@run_once 76def ssh_options() -> Dict[str, str]: 77 stderr = subprocess.Popen(['ssh'], stderr=subprocess.PIPE).stderr 78 assert stderr is not None 79 raw = stderr.read().decode('utf-8') 80 ans: Dict[str, str] = {} 81 pos = 0 82 while True: 83 pos = raw.find('[', pos) 84 if pos < 0: 85 break 86 num = 1 87 epos = pos 88 while num > 0: 89 epos += 1 90 if raw[epos] not in '[]': 91 continue 92 num += 1 if raw[epos] == '[' else -1 93 q = raw[pos+1:epos] 94 pos = epos 95 if len(q) < 2 or q[0] != '-': 96 continue 97 if ' ' in q: 98 opt, desc = q.split(' ', 1) 99 ans[opt[1:]] = desc 100 else: 101 ans.update(dict.fromkeys(q[1:], '')) 102 return ans 103 104 105# option help {{{ 106@run_once 107def option_help_map() -> Dict[str, str]: 108 ans: Dict[str, str] = {} 109 lines = ''' 110-4 -- force ssh to use IPv4 addresses only 111-6 -- force ssh to use IPv6 addresses only 112-a -- disable forwarding of authentication agent connection 113-A -- enable forwarding of the authentication agent connection 114-B -- bind to specified interface before attempting to connect 115-b -- specify interface to transmit on 116-C -- compress data 117-c -- select encryption cipher 118-D -- specify a dynamic port forwarding 119-E -- append log output to file instead of stderr 120-e -- set escape character 121-f -- go to background 122-F -- specify alternate config file 123-g -- allow remote hosts to connect to local forwarded ports 124-G -- output configuration and exit 125-i -- select identity file 126-I -- specify smartcard device 127-J -- connect via a jump host 128-k -- disable forwarding of GSSAPI credentials 129-K -- enable GSSAPI-based authentication and forwarding 130-L -- specify local port forwarding 131-l -- specify login name 132-M -- master mode for connection sharing 133-m -- specify mac algorithms 134-N -- don't execute a remote command 135-n -- redirect stdin from /dev/null 136-O -- control an active connection multiplexing master process 137-o -- specify extra options 138-p -- specify port on remote host 139-P -- use non privileged port 140-Q -- query parameters 141-q -- quiet operation 142-R -- specify remote port forwarding 143-s -- invoke subsystem 144-S -- specify location of control socket for connection sharing 145-T -- disable pseudo-tty allocation 146-t -- force pseudo-tty allocation 147-V -- show version number 148-v -- verbose mode (multiple increase verbosity, up to 3) 149-W -- forward standard input and output to host 150-w -- request tunnel device forwarding 151-x -- disable X11 forwarding 152-X -- enable (untrusted) X11 forwarding 153-Y -- enable trusted X11 forwarding 154-y -- send log info via syslog instead of stderr 155'''.splitlines() 156 for line in lines: 157 line = line.strip() 158 if line: 159 parts = line.split(maxsplit=2) 160 ans[parts[0]] = parts[2] 161 return ans 162# }}} 163 164 165# option names {{{ 166@run_once 167def option_names() -> Tuple[str, ...]: 168 return tuple(filter(None, ( 169 line.strip() for line in ''' 170AddKeysToAgent 171AddressFamily 172BatchMode 173BindAddress 174CanonicalDomains 175CanonicalizeFallbackLocal 176CanonicalizeHostname 177CanonicalizeMaxDots 178CanonicalizePermittedCNAMEs 179CASignatureAlgorithms 180CertificateFile 181ChallengeResponseAuthentication 182CheckHostIP 183Ciphers 184ClearAllForwardings 185Compression 186ConnectionAttempts 187ConnectTimeout 188ControlMaster 189ControlPath 190ControlPersist 191DynamicForward 192EscapeChar 193ExitOnForwardFailure 194FingerprintHash 195ForwardAgent 196ForwardX11 197ForwardX11Timeout 198ForwardX11Trusted 199GatewayPorts 200GlobalKnownHostsFile 201GSSAPIAuthentication 202GSSAPIDelegateCredentials 203HashKnownHosts 204Host 205HostbasedAcceptedAlgorithms 206HostbasedAuthentication 207HostKeyAlgorithms 208HostKeyAlias 209Hostname 210IdentitiesOnly 211IdentityAgent 212IdentityFile 213IPQoS 214KbdInteractiveAuthentication 215KbdInteractiveDevices 216KexAlgorithms 217KnownHostsCommand 218LocalCommand 219LocalForward 220LogLevel 221MACs 222Match 223NoHostAuthenticationForLocalhost 224NumberOfPasswordPrompts 225PasswordAuthentication 226PermitLocalCommand 227PermitRemoteOpen 228PKCS11Provider 229Port 230PreferredAuthentications 231ProxyCommand 232ProxyJump 233ProxyUseFdpass 234PubkeyAcceptedAlgorithms 235PubkeyAuthentication 236RekeyLimit 237RemoteCommand 238RemoteForward 239RequestTTY 240SendEnv 241ServerAliveInterval 242ServerAliveCountMax 243SetEnv 244StreamLocalBindMask 245StreamLocalBindUnlink 246StrictHostKeyChecking 247TCPKeepAlive 248Tunnel 249TunnelDevice 250UpdateHostKeys 251User 252UserKnownHostsFile 253VerifyHostKeyDNS 254VisualHostKey 255XAuthLocation 256'''.splitlines()))) 257# }}} 258 259 260def complete_choices(ans: Completions, prefix: str, title: str, choices: Iterable[str], comma_separated: bool = False) -> None: 261 matches: Dict[str, str] = {} 262 word_transforms = {} 263 effective_prefix = prefix 264 hidden_prefix = '' 265 if comma_separated: 266 effective_prefix = prefix.split(',')[-1] 267 hidden_prefix = ','.join(prefix.split(',')[:-1]) 268 if hidden_prefix: 269 hidden_prefix += ',' 270 for q in choices: 271 if q.startswith(effective_prefix): 272 if comma_separated: 273 tq = q 274 q = hidden_prefix + q + ',' 275 word_transforms[q] = tq 276 matches[q] = '' 277 ans.add_match_group(title, matches, trailing_space=not comma_separated, word_transforms=word_transforms) 278 279 280def complete_q_choices(ans: Completions, prefix: str, title: str, key: str, comma_separated: bool) -> None: 281 choices = (line.strip() for line in lines_from_command('ssh', '-Q', key)) 282 complete_choices(ans, prefix, title, choices, comma_separated) 283 284 285def complete_arg(ans: Completions, option_flag: str, prefix: str = '') -> None: 286 options = ssh_options() 287 option_name = options.get(option_flag[1:]) 288 if option_name.endswith('file') or option_name.endswith('path'): 289 return complete_files_and_dirs(ans, prefix, option_name) 290 choices = { 291 'mac_spec': ('MAC algorithm', 'mac', True), 292 'cipher_spec': ('encryption cipher', 'cipher', True), 293 'query_option': ('query option', 'help', False), 294 } 295 if option_name in choices: 296 return complete_q_choices(ans, prefix, *choices[option_name]) 297 if option_name == 'destination': 298 return complete_destination(ans, prefix) 299 if option_name == 'ctl_cmd': 300 return complete_choices(ans, prefix, 'control command', ('check', 'forward', 'cancel', 'exit')) 301 if option_name == 'option': 302 matches = (x+'=' for x in option_names() if x.startswith(prefix)) 303 word_transforms = {x+'=': x for x in option_names()} 304 ans.add_match_group('configure file option', matches, trailing_space=False, word_transforms=word_transforms) 305 306 307def complete_destination(ans: Completions, prefix: str = '') -> None: 308 result = (k for k in known_hosts() if k.startswith(prefix)) 309 ans.add_match_group('remote host name', result) 310 311 312def complete_option(ans: Completions, prefix: str = '-') -> None: 313 hm = option_help_map() 314 if len(prefix) <= 1: 315 result = {k: v for k, v in hm.items() if k.startswith(prefix)} 316 ans.add_match_group('option', result) 317 else: 318 ans.add_match_group('option', {prefix: ''}) 319 320 321def complete(ans: Completions, words: Sequence[str], new_word: bool) -> None: 322 options = ssh_options() 323 expecting_arg = False 324 types = ['' for i in range(len(words))] 325 for i, word in enumerate(words): 326 if expecting_arg: 327 types[i] = 'arg' 328 expecting_arg = False 329 continue 330 if word.startswith('-'): 331 types[i] = 'option' 332 if len(word) == 2 and options.get(word[1]): 333 expecting_arg = True 334 continue 335 types[i] = 'destination' 336 break 337 if new_word: 338 if words: 339 if expecting_arg: 340 return complete_arg(ans, words[-1]) 341 return complete_destination(ans) 342 if words: 343 if types[-1] == 'arg' and len(words) > 1: 344 return complete_arg(ans, words[-2], words[-1]) 345 if types[-1] == 'destination': 346 return complete_destination(ans, words[-1]) 347 if types[-1] == 'option': 348 return complete_option(ans, words[-1]) 349