1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPLv3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net> 4 5 6import json 7import os 8import shlex 9import shutil 10import subprocess 11import sys 12import tempfile 13import time 14from typing import Any, List, Optional 15 16from kitty.cli import parse_args 17from kitty.cli_stub import RemoteFileCLIOptions 18from kitty.constants import cache_dir 19from kitty.typing import BossType 20from kitty.utils import ( 21 SSHConnectionData, command_for_open, get_editor, open_cmd 22) 23 24from ..tui.handler import result_handler 25from ..tui.operations import ( 26 faint, raw_mode, reset_terminal, styled 27) 28from ..tui.utils import get_key_press 29 30 31def key(x: str) -> str: 32 return styled(x, bold=True, fg='green') 33 34 35def option_text() -> str: 36 return '''\ 37--mode -m 38choices=ask,edit 39default=ask 40Which mode to operate in. 41 42 43--path -p 44Path to the remote file. 45 46 47--hostname -h 48Hostname of the remote host. 49 50 51--ssh-connection-data 52The data used to connect over ssh. 53''' 54 55 56def show_error(msg: str) -> None: 57 print(styled(msg, fg='red')) 58 print() 59 print('Press any key to exit...') 60 sys.stdout.flush() 61 with raw_mode(): 62 while True: 63 try: 64 q = sys.stdin.buffer.read(1) 65 if q: 66 break 67 except (KeyboardInterrupt, EOFError): 68 break 69 70 71def ask_action(opts: RemoteFileCLIOptions) -> str: 72 print('What would you like to do with the remote file on {}:'.format(styled(opts.hostname or 'unknown', bold=True, fg='magenta'))) 73 print(styled(opts.path or '', fg='yellow', fg_intense=True)) 74 print() 75 76 def help_text(x: str) -> str: 77 return faint(x) 78 79 print('{}dit the file'.format(key('E'))) 80 print(help_text('The file will be downloaded and opened in an editor. Any changes you save will' 81 ' be automatically sent back to the remote machine')) 82 print() 83 84 print('{}pen the file'.format(key('O'))) 85 print(help_text('The file will be downloaded and opened by the default open program')) 86 print() 87 88 print('{}ave the file'.format(key('S'))) 89 print(help_text('The file will be downloaded to a destination you select')) 90 print() 91 92 print('{}ancel'.format(key('C'))) 93 print() 94 95 sys.stdout.flush() 96 response = get_key_press('ceos', 'c') 97 return {'e': 'edit', 'o': 'open', 's': 'save'}.get(response, 'cancel') 98 99 100def hostname_matches(from_hyperlink: str, actual: str) -> bool: 101 if from_hyperlink == actual: 102 return True 103 if from_hyperlink.partition('.')[0] == actual.partition('.')[0]: 104 return True 105 return False 106 107 108class ControlMaster: 109 110 def __init__(self, conn_data: SSHConnectionData, remote_path: str, cli_opts: RemoteFileCLIOptions, dest: str = ''): 111 self.conn_data = conn_data 112 self.cli_opts = cli_opts 113 self.remote_path = remote_path 114 self.dest = dest 115 self.tdir = '' 116 self.cmd_prefix = cmd = [ 117 conn_data.binary, '-o', f'ControlPath=~/.ssh/kitty-master-{os.getpid()}-%r@%h:%p', 118 '-o', 'TCPKeepAlive=yes', '-o', 'ControlPersist=yes' 119 ] 120 if conn_data.port: 121 cmd += ['-p', str(conn_data.port)] 122 self.batch_cmd_prefix = cmd + ['-o', 'BatchMode=yes'] 123 124 def __enter__(self) -> 'ControlMaster': 125 subprocess.check_call( 126 self.cmd_prefix + ['-o', 'ControlMaster=auto', '-fN', self.conn_data.hostname]) 127 subprocess.check_call( 128 self.batch_cmd_prefix + ['-O', 'check', self.conn_data.hostname]) 129 if not self.dest: 130 self.tdir = tempfile.mkdtemp() 131 self.dest = os.path.join(self.tdir, os.path.basename(self.remote_path)) 132 return self 133 134 def __exit__(self, *a: Any) -> bool: 135 subprocess.Popen( 136 self.batch_cmd_prefix + ['-O', 'exit', self.conn_data.hostname], 137 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL 138 ).wait() 139 if self.tdir: 140 shutil.rmtree(self.tdir) 141 142 @property 143 def is_alive(self) -> bool: 144 return subprocess.Popen( 145 self.batch_cmd_prefix + ['-O', 'check', self.conn_data.hostname], 146 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL 147 ).wait() == 0 148 149 def check_hostname_matches(self) -> bool: 150 cp = subprocess.run(self.batch_cmd_prefix + [self.conn_data.hostname, 'hostname', '-f'], stdout=subprocess.PIPE, 151 stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL) 152 if cp.returncode == 0: 153 q = tuple(filter(None, cp.stdout.decode('utf-8').strip().splitlines()))[-1] 154 if not hostname_matches(self.cli_opts.hostname or '', q): 155 print(reset_terminal(), end='') 156 print(f'The remote hostname {styled(q, fg="green")} does not match the') 157 print(f'hostname in the hyperlink {styled(self.cli_opts.hostname or "", fg="red")}') 158 print('This indicates that kitty has not connected to the correct remote machine.') 159 print('This can happen, for example, when using nested SSH sessions.') 160 print(f'The hostname kitty used to connect was: {styled(self.conn_data.hostname, fg="yellow")}', end='') 161 if self.conn_data.port is not None: 162 print(f' with port: {self.conn_data.port}') 163 print() 164 print() 165 print('Do you want to continue anyway?') 166 print( 167 f'{styled("Y", fg="green")}es', 168 f'{styled("N", fg="red")}o', sep='\t' 169 ) 170 sys.stdout.flush() 171 response = get_key_press('yn', 'n') 172 print(reset_terminal(), end='') 173 return response == 'y' 174 return True 175 176 def download(self) -> bool: 177 with open(self.dest, 'wb') as f: 178 return subprocess.run( 179 self.batch_cmd_prefix + [self.conn_data.hostname, 'cat', self.remote_path], 180 stdout=f, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL 181 ).returncode == 0 182 183 def upload(self, suppress_output: bool = True) -> bool: 184 cmd_prefix = self.cmd_prefix if suppress_output else self.batch_cmd_prefix 185 cmd = cmd_prefix + [self.conn_data.hostname, 'cat', '>', self.remote_path] 186 if not suppress_output: 187 print(' '.join(map(shlex.quote, cmd))) 188 redirect = subprocess.DEVNULL if suppress_output else None 189 with open(self.dest, 'rb') as f: 190 return subprocess.run(cmd, stdout=redirect, stderr=redirect, stdin=f).returncode == 0 191 192 193Result = Optional[str] 194 195 196def main(args: List[str]) -> Result: 197 msg = 'Ask the user what to do with the remote file' 198 try: 199 cli_opts, items = parse_args(args[1:], option_text, '', msg, 'kitty remote_file', result_class=RemoteFileCLIOptions) 200 except SystemExit as e: 201 if e.code != 0: 202 print(e.args[0]) 203 input('Press enter to quit...') 204 raise SystemExit(e.code) 205 206 try: 207 action = ask_action(cli_opts) 208 finally: 209 print(reset_terminal(), end='', flush=True) 210 try: 211 return handle_action(action, cli_opts) 212 except Exception: 213 print(reset_terminal(), end='', flush=True) 214 import traceback 215 traceback.print_exc() 216 show_error('Failed with unhandled exception') 217 218 219def save_as(conn_data: SSHConnectionData, remote_path: str, cli_opts: RemoteFileCLIOptions) -> None: 220 ddir = cache_dir() 221 os.makedirs(ddir, exist_ok=True) 222 last_used_store_path = os.path.join(ddir, 'remote-file-last-used.txt') 223 try: 224 with open(last_used_store_path) as f: 225 last_used_path = f.read() 226 except FileNotFoundError: 227 last_used_path = tempfile.gettempdir() 228 last_used_file = os.path.join(last_used_path, os.path.basename(remote_path)) 229 print( 230 'Where do you wish to save the file? Leaving it blank will save it as:', 231 styled(last_used_file, fg='yellow') 232 ) 233 print('Relative paths will be resolved from:', styled(os.getcwd(), fg_intense=True, bold=True)) 234 print() 235 from ..tui.path_completer import PathCompleter 236 try: 237 dest = PathCompleter().input() 238 except (KeyboardInterrupt, EOFError): 239 return 240 if dest: 241 dest = os.path.expandvars(os.path.expanduser(dest)) 242 if os.path.isdir(dest): 243 dest = os.path.join(dest, os.path.basename(remote_path)) 244 with open(last_used_store_path, 'w') as f: 245 f.write(os.path.dirname(os.path.abspath(dest))) 246 else: 247 dest = last_used_file 248 if os.path.exists(dest): 249 print(reset_terminal(), end='') 250 print(f'The file {styled(dest, fg="yellow")} already exists. What would you like to do?') 251 print(f'{key("O")}verwrite {key("A")}bort Auto {key("R")}ename {key("N")}ew name') 252 response = get_key_press('anor', 'a') 253 if response == 'a': 254 return 255 if response == 'n': 256 print(reset_terminal(), end='') 257 return save_as(conn_data, remote_path, cli_opts) 258 259 if response == 'r': 260 q = dest 261 c = 0 262 while os.path.exists(q): 263 c += 1 264 b, ext = os.path.splitext(dest) 265 q = f'{b}-{c}{ext}' 266 dest = q 267 if os.path.dirname(dest): 268 os.makedirs(os.path.dirname(dest), exist_ok=True) 269 with ControlMaster(conn_data, remote_path, cli_opts, dest=dest) as master: 270 if master.check_hostname_matches(): 271 if not master.download(): 272 show_error('Failed to copy file from remote machine') 273 274 275def handle_action(action: str, cli_opts: RemoteFileCLIOptions) -> Result: 276 conn_data = SSHConnectionData(*json.loads(cli_opts.ssh_connection_data or '')) 277 remote_path = cli_opts.path or '' 278 if action == 'open': 279 print('Opening', cli_opts.path, 'from', cli_opts.hostname) 280 dest = os.path.join(tempfile.mkdtemp(), os.path.basename(remote_path)) 281 with ControlMaster(conn_data, remote_path, cli_opts, dest=dest) as master: 282 if master.check_hostname_matches(): 283 if master.download(): 284 return dest 285 show_error('Failed to copy file from remote machine') 286 elif action == 'edit': 287 print('Editing', cli_opts.path, 'from', cli_opts.hostname) 288 editor = get_editor() 289 with ControlMaster(conn_data, remote_path, cli_opts) as master: 290 if not master.check_hostname_matches(): 291 return None 292 if not master.download(): 293 show_error(f'Failed to download {remote_path}') 294 return None 295 mtime = os.path.getmtime(master.dest) 296 print(reset_terminal(), end='', flush=True) 297 editor_process = subprocess.Popen(editor + [master.dest]) 298 while editor_process.poll() is None: 299 time.sleep(0.1) 300 newmtime = os.path.getmtime(master.dest) 301 if newmtime > mtime: 302 mtime = newmtime 303 if master.is_alive: 304 master.upload() 305 print(reset_terminal(), end='', flush=True) 306 if master.is_alive: 307 if not master.upload(suppress_output=False): 308 show_error(f'Failed to upload {remote_path}') 309 else: 310 show_error(f'Failed to upload {remote_path}, SSH master process died') 311 elif action == 'save': 312 print('Saving', cli_opts.path, 'from', cli_opts.hostname) 313 save_as(conn_data, remote_path, cli_opts) 314 315 316@result_handler() 317def handle_result(args: List[str], data: Result, target_window_id: int, boss: BossType) -> None: 318 if data: 319 from kitty.fast_data_types import get_options 320 cmd = command_for_open(get_options().open_url_with) 321 open_cmd(cmd, data) 322 323 324if __name__ == '__main__': 325 main(sys.argv) 326