1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net> 4 5import asyncio 6import codecs 7import io 8import os 9import re 10import selectors 11import signal 12import sys 13import termios 14from contextlib import contextmanager 15from functools import partial 16from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional 17 18from kitty.constants import is_macos 19from kitty.fast_data_types import ( 20 close_tty, normal_tty, open_tty, parse_input_from_terminal, raw_tty 21) 22from kitty.key_encoding import ( 23 ALT, CTRL, PRESS, RELEASE, REPEAT, SHIFT, backspace_key, decode_key_event, 24 enter_key 25) 26from kitty.typing import ImageManagerType, KeyEventType, Protocol 27from kitty.utils import ScreenSizeGetter, screen_size_function, write_all 28 29from .handler import Handler 30from .operations import init_state, reset_state 31 32 33class BinaryWrite(Protocol): 34 35 def write(self, data: bytes) -> None: 36 pass 37 38 def flush(self) -> None: 39 pass 40 41 42def debug_write(*a: Any, **kw: Any) -> None: 43 from base64 import standard_b64encode 44 fobj = kw.pop('file', sys.stderr.buffer) 45 buf = io.StringIO() 46 kw['file'] = buf 47 print(*a, **kw) 48 stext = buf.getvalue() 49 for i in range(0, len(stext), 256): 50 chunk = stext[i:i + 256] 51 text = b'\x1bP@kitty-print|' + standard_b64encode(chunk.encode('utf-8')) + b'\x1b\\' 52 fobj.write(text) 53 fobj.flush() 54 55 56class Debug: 57 58 fobj: Optional[BinaryWrite] = None 59 60 def __call__(self, *a: Any, **kw: Any) -> None: 61 kw['file'] = self.fobj or sys.stdout.buffer 62 debug_write(*a, **kw) 63 64 65debug = Debug() 66 67 68class TermManager: 69 70 def __init__(self, optional_actions: int = termios.TCSANOW) -> None: 71 self.extra_finalize: Optional[str] = None 72 self.optional_actions = optional_actions 73 74 def set_state_for_loop(self, set_raw: bool = True) -> None: 75 if set_raw: 76 raw_tty(self.tty_fd, self.original_termios) 77 write_all(self.tty_fd, init_state()) 78 79 def reset_state_to_original(self) -> None: 80 normal_tty(self.tty_fd, self.original_termios) 81 if self.extra_finalize: 82 write_all(self.tty_fd, self.extra_finalize) 83 write_all(self.tty_fd, reset_state()) 84 85 @contextmanager 86 def suspend(self) -> Generator['TermManager', None, None]: 87 self.reset_state_to_original() 88 yield self 89 self.set_state_for_loop() 90 91 def __enter__(self) -> 'TermManager': 92 self.tty_fd, self.original_termios = open_tty(False, self.optional_actions) 93 self.set_state_for_loop(set_raw=False) 94 return self 95 96 def __exit__(self, *a: object) -> None: 97 self.reset_state_to_original() 98 close_tty(self.tty_fd, self.original_termios) 99 del self.tty_fd, self.original_termios 100 101 102LEFT, MIDDLE, RIGHT, FOURTH, FIFTH = 1, 2, 4, 8, 16 103DRAG = REPEAT 104bmap = {0: LEFT, 1: MIDDLE, 2: RIGHT} 105MOTION_INDICATOR = 1 << 5 106EXTRA_BUTTON_INDICATOR = 1 << 6 107SHIFT_INDICATOR = 1 << 2 108ALT_INDICATOR = 1 << 3 109CTRL_INDICATOR = 1 << 4 110 111 112class MouseEvent(NamedTuple): 113 x: int 114 y: int 115 type: int 116 buttons: int 117 mods: int 118 119 120def decode_sgr_mouse(text: str) -> MouseEvent: 121 cb_, x_, y_ = text.split(';') 122 m, y_ = y_[-1], y_[:-1] 123 cb, x, y = map(int, (cb_, x_, y_)) 124 typ = RELEASE if m == 'm' else (DRAG if cb & MOTION_INDICATOR else PRESS) 125 buttons = 0 126 cb3 = cb & 3 127 if cb3 != 3: 128 if cb & EXTRA_BUTTON_INDICATOR: 129 buttons |= FIFTH if cb3 & 1 else FOURTH 130 else: 131 buttons |= bmap[cb3] 132 mods = 0 133 if cb & SHIFT_INDICATOR: 134 mods |= SHIFT 135 if cb & ALT_INDICATOR: 136 mods |= ALT 137 if cb & CTRL_INDICATOR: 138 mods |= CTRL 139 return MouseEvent(x, y, typ, buttons, mods) 140 141 142class UnhandledException(Handler): 143 144 def __init__(self, tb: str) -> None: 145 self.tb = tb 146 147 def initialize(self) -> None: 148 self.cmd.clear_screen() 149 self.cmd.set_scrolling_region() 150 self.cmd.set_cursor_visible(True) 151 self.cmd.set_default_colors() 152 self.write(self.tb.replace('\n', '\r\n')) 153 self.write('\r\n') 154 self.write('Press the Enter key to quit') 155 156 def on_key(self, key_event: KeyEventType) -> None: 157 if key_event.key == 'ENTER': 158 self.quit_loop(1) 159 160 def on_interrupt(self) -> None: 161 self.quit_loop(1) 162 on_eot = on_term = on_interrupt 163 164 165class SignalManager: 166 167 def __init__( 168 self, 169 loop: asyncio.AbstractEventLoop, 170 on_winch: Callable, 171 on_interrupt: Callable, 172 on_term: Callable 173 ) -> None: 174 self.asycio_loop = loop 175 self.on_winch, self.on_interrupt, self.on_term = on_winch, on_interrupt, on_term 176 177 def __enter__(self) -> None: 178 tuple(map(lambda x: self.asycio_loop.add_signal_handler(*x), ( 179 (signal.SIGWINCH, self.on_winch), 180 (signal.SIGINT, self.on_interrupt), 181 (signal.SIGTERM, self.on_term) 182 ))) 183 184 def __exit__(self, *a: Any) -> None: 185 tuple(map(self.asycio_loop.remove_signal_handler, ( 186 signal.SIGWINCH, signal.SIGINT, signal.SIGTERM))) 187 188 189sanitize_bracketed_paste: str = '[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]' 190 191 192class Loop: 193 194 def __init__( 195 self, 196 sanitize_bracketed_paste: str = sanitize_bracketed_paste, 197 optional_actions: int = termios.TCSADRAIN 198 ): 199 if is_macos: 200 # On macOS PTY devices are not supported by the KqueueSelector and 201 # the PollSelector is broken, causes 100% CPU usage 202 self.asycio_loop: asyncio.AbstractEventLoop = asyncio.SelectorEventLoop(selectors.SelectSelector()) 203 asyncio.set_event_loop(self.asycio_loop) 204 else: 205 self.asycio_loop = asyncio.get_event_loop() 206 self.return_code = 0 207 self.optional_actions = optional_actions 208 self.read_buf = '' 209 self.decoder = codecs.getincrementaldecoder('utf-8')('ignore') 210 try: 211 self.iov_limit = max(os.sysconf('SC_IOV_MAX') - 1, 255) 212 except Exception: 213 self.iov_limit = 255 214 self.parse_input_from_terminal = partial(parse_input_from_terminal, self._on_text, self._on_dcs, self._on_csi, self._on_osc, self._on_pm, self._on_apc) 215 self.ebs_pat = re.compile('([\177\r\x03\x04])') 216 self.in_bracketed_paste = False 217 self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste) 218 if self.sanitize_bracketed_paste: 219 self.sanitize_ibp_pat = re.compile(sanitize_bracketed_paste) 220 221 def _read_ready(self, handler: Handler, fd: int) -> None: 222 try: 223 bdata = os.read(fd, io.DEFAULT_BUFFER_SIZE) 224 except BlockingIOError: 225 return 226 if not bdata: 227 raise EOFError('The input stream is closed') 228 data = self.decoder.decode(bdata) 229 if self.read_buf: 230 data = self.read_buf + data 231 self.read_buf = data 232 self.handler = handler 233 try: 234 self.read_buf = self.parse_input_from_terminal(self.read_buf, self.in_bracketed_paste) 235 except Exception: 236 self.read_buf = '' 237 raise 238 finally: 239 del self.handler 240 241 # terminal input callbacks {{{ 242 def _on_text(self, text: str) -> None: 243 if self.in_bracketed_paste and self.sanitize_bracketed_paste: 244 text = self.sanitize_ibp_pat.sub('', text) 245 246 for chunk in self.ebs_pat.split(text): 247 if len(chunk) == 1: 248 if chunk == '\r': 249 self.handler.on_key(enter_key) 250 elif chunk == '\177': 251 self.handler.on_key(backspace_key) 252 elif chunk == '\x03': 253 self.handler.on_interrupt() 254 elif chunk == '\x04': 255 self.handler.on_eot() 256 else: 257 self.handler.on_text(chunk, self.in_bracketed_paste) 258 elif chunk: 259 self.handler.on_text(chunk, self.in_bracketed_paste) 260 261 def _on_dcs(self, dcs: str) -> None: 262 if dcs.startswith('@kitty-cmd'): 263 import json 264 self.handler.on_kitty_cmd_response(json.loads(dcs[len('@kitty-cmd'):])) 265 elif dcs.startswith('1+r'): 266 from binascii import unhexlify 267 vals = dcs[3:].split(';') 268 for q in vals: 269 parts = q.split('=', 1) 270 try: 271 name, val = parts[0], unhexlify(parts[1]).decode('utf-8', 'replace') 272 except Exception: 273 continue 274 self.handler.on_capability_response(name, val) 275 276 def _on_csi(self, csi: str) -> None: 277 q = csi[-1] 278 if q in 'mM': 279 if csi.startswith('<'): 280 # SGR mouse event 281 try: 282 ev = decode_sgr_mouse(csi[1:]) 283 except Exception: 284 pass 285 else: 286 self.handler.on_mouse(ev) 287 elif q in 'u~ABCDEHFPQRS': 288 if csi == '200~': 289 self.in_bracketed_paste = True 290 return 291 elif csi == '201~': 292 self.in_bracketed_paste = False 293 return 294 try: 295 k = decode_key_event(csi[:-1], q) 296 except Exception: 297 pass 298 else: 299 if k.matches('ctrl+c'): 300 self.handler.on_interrupt() 301 return 302 if k.matches('ctrl+d'): 303 self.handler.on_eot() 304 return 305 self.handler.on_key_event(k) 306 307 def _on_pm(self, pm: str) -> None: 308 pass 309 310 def _on_osc(self, osc: str) -> None: 311 m = re.match(r'(\d+);', osc) 312 if m is not None: 313 code = int(m.group(1)) 314 rest = osc[m.end():] 315 if code == 52: 316 where, rest = rest.partition(';')[::2] 317 from_primary = 'p' in where 318 from base64 import standard_b64decode 319 self.handler.on_clipboard_response(standard_b64decode(rest).decode('utf-8'), from_primary) 320 321 def _on_apc(self, apc: str) -> None: 322 if apc.startswith('G'): 323 if self.handler.image_manager is not None: 324 self.handler.image_manager.handle_response(apc) 325 # }}} 326 327 def _write_ready(self, handler: Handler, fd: int) -> None: 328 if len(self.write_buf) > self.iov_limit: 329 self.write_buf[self.iov_limit - 1] = b''.join(self.write_buf[self.iov_limit - 1:]) 330 del self.write_buf[self.iov_limit:] 331 sizes = tuple(map(len, self.write_buf)) 332 total_size = sum(sizes) 333 if total_size: 334 try: 335 written = os.writev(fd, self.write_buf) 336 except BlockingIOError: 337 return 338 if not written: 339 raise EOFError('The output stream is closed') 340 else: 341 written = 0 342 if written >= total_size: 343 self.write_buf: List[bytes] = [] 344 self.asycio_loop.remove_writer(fd) 345 self.waiting_for_writes = False 346 handler.on_writing_finished() 347 else: 348 consumed = 0 349 for i, buf in enumerate(self.write_buf): 350 if not written: 351 break 352 if len(buf) <= written: 353 written -= len(buf) 354 consumed += 1 355 continue 356 self.write_buf[i] = buf[written:] 357 break 358 del self.write_buf[:consumed] 359 360 def quit(self, return_code: Optional[int] = None) -> None: 361 if return_code is not None: 362 self.return_code = return_code 363 self.asycio_loop.stop() 364 365 def loop_impl(self, handler: Handler, term_manager: TermManager, image_manager: Optional[ImageManagerType] = None) -> Optional[str]: 366 self.write_buf = [] 367 tty_fd = term_manager.tty_fd 368 tb = None 369 self.waiting_for_writes = True 370 371 def schedule_write(data: bytes) -> None: 372 self.write_buf.append(data) 373 if not self.waiting_for_writes: 374 self.asycio_loop.add_writer(tty_fd, self._write_ready, handler, tty_fd) 375 self.waiting_for_writes = True 376 377 def handle_exception(loop: asyncio.AbstractEventLoop, context: Dict[str, Any]) -> None: 378 nonlocal tb 379 loop.stop() 380 tb = context['message'] 381 exc = context.get('exception') 382 if exc is not None: 383 import traceback 384 tb += '\n' + ''.join(traceback.format_exception(exc.__class__, exc, exc.__traceback__)) 385 386 self.asycio_loop.set_exception_handler(handle_exception) 387 handler._initialize(self._get_screen_size(), term_manager, schedule_write, self, debug, image_manager) 388 with handler: 389 self.asycio_loop.add_reader( 390 tty_fd, self._read_ready, handler, tty_fd) 391 self.asycio_loop.add_writer( 392 tty_fd, self._write_ready, handler, tty_fd) 393 self.asycio_loop.run_forever() 394 self.asycio_loop.remove_reader(tty_fd) 395 if self.waiting_for_writes: 396 self.asycio_loop.remove_writer(tty_fd) 397 return tb 398 399 def loop(self, handler: Handler) -> None: 400 tb: Optional[str] = None 401 402 def _on_sigwinch() -> None: 403 self._get_screen_size.changed = True 404 handler.screen_size = self._get_screen_size() 405 handler.on_resize(handler.screen_size) 406 407 signal_manager = SignalManager(self.asycio_loop, _on_sigwinch, handler.on_interrupt, handler.on_term) 408 with TermManager(self.optional_actions) as term_manager, signal_manager: 409 self._get_screen_size: ScreenSizeGetter = screen_size_function(term_manager.tty_fd) 410 image_manager = None 411 if handler.image_manager_class is not None: 412 image_manager = handler.image_manager_class(handler) 413 try: 414 tb = self.loop_impl(handler, term_manager, image_manager) 415 except Exception: 416 import traceback 417 tb = traceback.format_exc() 418 419 term_manager.extra_finalize = b''.join(self.write_buf).decode('utf-8') 420 if tb is not None: 421 self.return_code = 1 422 self._report_error_loop(tb, term_manager) 423 424 def _report_error_loop(self, tb: str, term_manager: TermManager) -> None: 425 self.loop_impl(UnhandledException(tb), term_manager) 426