1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPLv3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
4
5
6import json
7import os
8import shlex
9import shutil
10import subprocess
11import sys
12import tempfile
13import time
14from typing import Any, List, Optional
15
16from kitty.cli import parse_args
17from kitty.cli_stub import RemoteFileCLIOptions
18from kitty.constants import cache_dir
19from kitty.typing import BossType
20from kitty.utils import (
21    SSHConnectionData, command_for_open, get_editor, open_cmd
22)
23
24from ..tui.handler import result_handler
25from ..tui.operations import (
26    faint, raw_mode, reset_terminal, styled
27)
28from ..tui.utils import get_key_press
29
30
31def key(x: str) -> str:
32    return styled(x, bold=True, fg='green')
33
34
35def option_text() -> str:
36    return '''\
37--mode -m
38choices=ask,edit
39default=ask
40Which mode to operate in.
41
42
43--path -p
44Path to the remote file.
45
46
47--hostname -h
48Hostname of the remote host.
49
50
51--ssh-connection-data
52The data used to connect over ssh.
53'''
54
55
56def show_error(msg: str) -> None:
57    print(styled(msg, fg='red'))
58    print()
59    print('Press any key to exit...')
60    sys.stdout.flush()
61    with raw_mode():
62        while True:
63            try:
64                q = sys.stdin.buffer.read(1)
65                if q:
66                    break
67            except (KeyboardInterrupt, EOFError):
68                break
69
70
71def ask_action(opts: RemoteFileCLIOptions) -> str:
72    print('What would you like to do with the remote file on {}:'.format(styled(opts.hostname or 'unknown', bold=True, fg='magenta')))
73    print(styled(opts.path or '', fg='yellow', fg_intense=True))
74    print()
75
76    def help_text(x: str) -> str:
77        return faint(x)
78
79    print('{}dit the file'.format(key('E')))
80    print(help_text('The file will be downloaded and opened in an editor. Any changes you save will'
81                    ' be automatically sent back to the remote machine'))
82    print()
83
84    print('{}pen the file'.format(key('O')))
85    print(help_text('The file will be downloaded and opened by the default open program'))
86    print()
87
88    print('{}ave the file'.format(key('S')))
89    print(help_text('The file will be downloaded to a destination you select'))
90    print()
91
92    print('{}ancel'.format(key('C')))
93    print()
94
95    sys.stdout.flush()
96    response = get_key_press('ceos', 'c')
97    return {'e': 'edit', 'o': 'open', 's': 'save'}.get(response, 'cancel')
98
99
100def hostname_matches(from_hyperlink: str, actual: str) -> bool:
101    if from_hyperlink == actual:
102        return True
103    if from_hyperlink.partition('.')[0] == actual.partition('.')[0]:
104        return True
105    return False
106
107
108class ControlMaster:
109
110    def __init__(self, conn_data: SSHConnectionData, remote_path: str, cli_opts: RemoteFileCLIOptions, dest: str = ''):
111        self.conn_data = conn_data
112        self.cli_opts = cli_opts
113        self.remote_path = remote_path
114        self.dest = dest
115        self.tdir = ''
116        self.cmd_prefix = cmd = [
117            conn_data.binary, '-o', f'ControlPath=~/.ssh/kitty-master-{os.getpid()}-%r@%h:%p',
118            '-o', 'TCPKeepAlive=yes', '-o', 'ControlPersist=yes'
119        ]
120        if conn_data.port:
121            cmd += ['-p', str(conn_data.port)]
122        self.batch_cmd_prefix = cmd + ['-o', 'BatchMode=yes']
123
124    def __enter__(self) -> 'ControlMaster':
125        subprocess.check_call(
126            self.cmd_prefix + ['-o', 'ControlMaster=auto', '-fN', self.conn_data.hostname])
127        subprocess.check_call(
128            self.batch_cmd_prefix + ['-O', 'check', self.conn_data.hostname])
129        if not self.dest:
130            self.tdir = tempfile.mkdtemp()
131            self.dest = os.path.join(self.tdir, os.path.basename(self.remote_path))
132        return self
133
134    def __exit__(self, *a: Any) -> bool:
135        subprocess.Popen(
136            self.batch_cmd_prefix + ['-O', 'exit', self.conn_data.hostname],
137            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL
138        ).wait()
139        if self.tdir:
140            shutil.rmtree(self.tdir)
141
142    @property
143    def is_alive(self) -> bool:
144        return subprocess.Popen(
145            self.batch_cmd_prefix + ['-O', 'check', self.conn_data.hostname],
146            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL
147        ).wait() == 0
148
149    def check_hostname_matches(self) -> bool:
150        cp = subprocess.run(self.batch_cmd_prefix + [self.conn_data.hostname, 'hostname', '-f'], stdout=subprocess.PIPE,
151                            stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL)
152        if cp.returncode == 0:
153            q = tuple(filter(None, cp.stdout.decode('utf-8').strip().splitlines()))[-1]
154            if not hostname_matches(self.cli_opts.hostname or '', q):
155                print(reset_terminal(), end='')
156                print(f'The remote hostname {styled(q, fg="green")} does not match the')
157                print(f'hostname in the hyperlink {styled(self.cli_opts.hostname or "", fg="red")}')
158                print('This indicates that kitty has not connected to the correct remote machine.')
159                print('This can happen, for example, when using nested SSH sessions.')
160                print(f'The hostname kitty used to connect was: {styled(self.conn_data.hostname, fg="yellow")}', end='')
161                if self.conn_data.port is not None:
162                    print(f' with port: {self.conn_data.port}')
163                print()
164                print()
165                print('Do you want to continue anyway?')
166                print(
167                    f'{styled("Y", fg="green")}es',
168                    f'{styled("N", fg="red")}o', sep='\t'
169                )
170                sys.stdout.flush()
171                response = get_key_press('yn', 'n')
172                print(reset_terminal(), end='')
173                return response == 'y'
174        return True
175
176    def download(self) -> bool:
177        with open(self.dest, 'wb') as f:
178            return subprocess.run(
179                self.batch_cmd_prefix + [self.conn_data.hostname, 'cat', self.remote_path],
180                stdout=f, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL
181            ).returncode == 0
182
183    def upload(self, suppress_output: bool = True) -> bool:
184        cmd_prefix = self.cmd_prefix if suppress_output else self.batch_cmd_prefix
185        cmd = cmd_prefix + [self.conn_data.hostname, 'cat', '>', self.remote_path]
186        if not suppress_output:
187            print(' '.join(map(shlex.quote, cmd)))
188        redirect = subprocess.DEVNULL if suppress_output else None
189        with open(self.dest, 'rb') as f:
190            return subprocess.run(cmd, stdout=redirect, stderr=redirect, stdin=f).returncode == 0
191
192
193Result = Optional[str]
194
195
196def main(args: List[str]) -> Result:
197    msg = 'Ask the user what to do with the remote file'
198    try:
199        cli_opts, items = parse_args(args[1:], option_text, '', msg, 'kitty remote_file', result_class=RemoteFileCLIOptions)
200    except SystemExit as e:
201        if e.code != 0:
202            print(e.args[0])
203            input('Press enter to quit...')
204        raise SystemExit(e.code)
205
206    try:
207        action = ask_action(cli_opts)
208    finally:
209        print(reset_terminal(), end='', flush=True)
210    try:
211        return handle_action(action, cli_opts)
212    except Exception:
213        print(reset_terminal(), end='', flush=True)
214        import traceback
215        traceback.print_exc()
216        show_error('Failed with unhandled exception')
217
218
219def save_as(conn_data: SSHConnectionData, remote_path: str, cli_opts: RemoteFileCLIOptions) -> None:
220    ddir = cache_dir()
221    os.makedirs(ddir, exist_ok=True)
222    last_used_store_path = os.path.join(ddir, 'remote-file-last-used.txt')
223    try:
224        with open(last_used_store_path) as f:
225            last_used_path = f.read()
226    except FileNotFoundError:
227        last_used_path = tempfile.gettempdir()
228    last_used_file = os.path.join(last_used_path, os.path.basename(remote_path))
229    print(
230        'Where do you wish to save the file? Leaving it blank will save it as:',
231        styled(last_used_file, fg='yellow')
232    )
233    print('Relative paths will be resolved from:', styled(os.getcwd(), fg_intense=True, bold=True))
234    print()
235    from ..tui.path_completer import PathCompleter
236    try:
237        dest = PathCompleter().input()
238    except (KeyboardInterrupt, EOFError):
239        return
240    if dest:
241        dest = os.path.expandvars(os.path.expanduser(dest))
242        if os.path.isdir(dest):
243            dest = os.path.join(dest, os.path.basename(remote_path))
244        with open(last_used_store_path, 'w') as f:
245            f.write(os.path.dirname(os.path.abspath(dest)))
246    else:
247        dest = last_used_file
248    if os.path.exists(dest):
249        print(reset_terminal(), end='')
250        print(f'The file {styled(dest, fg="yellow")} already exists. What would you like to do?')
251        print(f'{key("O")}verwrite  {key("A")}bort  Auto {key("R")}ename {key("N")}ew name')
252        response = get_key_press('anor', 'a')
253        if response == 'a':
254            return
255        if response == 'n':
256            print(reset_terminal(), end='')
257            return save_as(conn_data, remote_path, cli_opts)
258
259        if response == 'r':
260            q = dest
261            c = 0
262            while os.path.exists(q):
263                c += 1
264                b, ext = os.path.splitext(dest)
265                q = f'{b}-{c}{ext}'
266            dest = q
267    if os.path.dirname(dest):
268        os.makedirs(os.path.dirname(dest), exist_ok=True)
269    with ControlMaster(conn_data, remote_path, cli_opts, dest=dest) as master:
270        if master.check_hostname_matches():
271            if not master.download():
272                show_error('Failed to copy file from remote machine')
273
274
275def handle_action(action: str, cli_opts: RemoteFileCLIOptions) -> Result:
276    conn_data = SSHConnectionData(*json.loads(cli_opts.ssh_connection_data or ''))
277    remote_path = cli_opts.path or ''
278    if action == 'open':
279        print('Opening', cli_opts.path, 'from', cli_opts.hostname)
280        dest = os.path.join(tempfile.mkdtemp(), os.path.basename(remote_path))
281        with ControlMaster(conn_data, remote_path, cli_opts, dest=dest) as master:
282            if master.check_hostname_matches():
283                if master.download():
284                    return dest
285                show_error('Failed to copy file from remote machine')
286    elif action == 'edit':
287        print('Editing', cli_opts.path, 'from', cli_opts.hostname)
288        editor = get_editor()
289        with ControlMaster(conn_data, remote_path, cli_opts) as master:
290            if not master.check_hostname_matches():
291                return None
292            if not master.download():
293                show_error(f'Failed to download {remote_path}')
294                return None
295            mtime = os.path.getmtime(master.dest)
296            print(reset_terminal(), end='', flush=True)
297            editor_process = subprocess.Popen(editor + [master.dest])
298            while editor_process.poll() is None:
299                time.sleep(0.1)
300                newmtime = os.path.getmtime(master.dest)
301                if newmtime > mtime:
302                    mtime = newmtime
303                    if master.is_alive:
304                        master.upload()
305            print(reset_terminal(), end='', flush=True)
306            if master.is_alive:
307                if not master.upload(suppress_output=False):
308                    show_error(f'Failed to upload {remote_path}')
309            else:
310                show_error(f'Failed to upload {remote_path}, SSH master process died')
311    elif action == 'save':
312        print('Saving', cli_opts.path, 'from', cli_opts.hostname)
313        save_as(conn_data, remote_path, cli_opts)
314
315
316@result_handler()
317def handle_result(args: List[str], data: Result, target_window_id: int, boss: BossType) -> None:
318    if data:
319        from kitty.fast_data_types import get_options
320        cmd = command_for_open(get_options().open_url_with)
321        open_cmd(cmd, data)
322
323
324if __name__ == '__main__':
325    main(sys.argv)
326