1# The MIT License 2# 3# Copyright (c) 2010-2011 Marien Zwart 4# 5# Permission is hereby granted, free of charge, to any person obtaining a copy 6# of this software and associated documentation files (the "Software"), to deal 7# in the Software without restriction, including without limitation the rights 8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9# copies of the Software, and to permit persons to whom the Software is 10# furnished to do so, subject to the following conditions: 11# 12# The above copyright notice and this permission notice shall be included in 13# all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21# THE SOFTWARE. 22 23# This whole file typing TODO 24# type: ignore 25 26"""bpython backend based on Urwid. 27 28Based on Urwid 0.9.9. 29 30This steals many things from bpython's "cli" backend. 31 32This is still *VERY* rough. 33""" 34 35import sys 36import os 37import time 38import locale 39import signal 40import urwid 41 42from . import args as bpargs, repl, translations 43from .formatter import theme_map 44from .translations import _ 45from .keys import urwid_key_dispatch as key_dispatch 46 47# Urwid colors are: 48# 'black', 'dark red', 'dark green', 'brown', 'dark blue', 49# 'dark magenta', 'dark cyan', 'light gray', 'dark gray', 50# 'light red', 'light green', 'yellow', 'light blue', 51# 'light magenta', 'light cyan', 'white' 52# and bpython has: 53# blacK, Red, Green, Yellow, Blue, Magenta, Cyan, White, Default 54 55COLORMAP = { 56 "k": "black", 57 "r": "dark red", # or light red? 58 "g": "dark green", # or light green? 59 "y": "yellow", 60 "b": "dark blue", # or light blue? 61 "m": "dark magenta", # or light magenta? 62 "c": "dark cyan", # or light cyan? 63 "w": "white", 64 "d": "default", 65} 66 67 68try: 69 from twisted.internet import protocol 70 from twisted.protocols import basic 71except ImportError: 72 pass 73else: 74 75 class EvalProtocol(basic.LineOnlyReceiver): 76 77 delimiter = "\n" 78 79 def __init__(self, myrepl): 80 self.repl = myrepl 81 82 def lineReceived(self, line): 83 # HACK! 84 # TODO: deal with encoding issues here... 85 self.repl.main_loop.process_input(line) 86 self.repl.main_loop.process_input(["enter"]) 87 88 class EvalFactory(protocol.ServerFactory): 89 def __init__(self, myrepl): 90 self.repl = myrepl 91 92 def buildProtocol(self, addr): 93 return EvalProtocol(self.repl) 94 95 96# If Twisted is not available urwid has no TwistedEventLoop attribute. 97# Code below will try to import reactor before using TwistedEventLoop. 98# I assume TwistedEventLoop will be available if that import succeeds. 99if urwid.VERSION < (1, 0, 0) and hasattr(urwid, "TwistedEventLoop"): 100 101 class TwistedEventLoop(urwid.TwistedEventLoop): 102 103 """TwistedEventLoop modified to properly stop the reactor. 104 105 urwid 0.9.9 and 0.9.9.1 crash the reactor on ExitMainLoop instead 106 of stopping it. One obvious way this breaks is if anything used 107 the reactor's thread pool: that thread pool is not shut down if 108 the reactor is not stopped, which means python hangs on exit 109 (joining the non-daemon threadpool threads that never exit). And 110 the default resolver is the ThreadedResolver, so if we looked up 111 any names we hang on exit. That is bad enough that we hack up 112 urwid a bit here to exit properly. 113 """ 114 115 def handle_exit(self, f): 116 def wrapper(*args, **kwargs): 117 try: 118 return f(*args, **kwargs) 119 except urwid.ExitMainLoop: 120 # This is our change. 121 self.reactor.stop() 122 except: 123 # This is the same as in urwid. 124 # We are obviously not supposed to ever hit this. 125 print(sys.exc_info()) 126 self._exc_info = sys.exc_info() 127 self.reactor.crash() 128 129 return wrapper 130 131 132else: 133 TwistedEventLoop = getattr(urwid, "TwistedEventLoop", None) 134 135 136class StatusbarEdit(urwid.Edit): 137 """Wrapper around urwid.Edit used for the prompt in Statusbar. 138 139 This class only adds a single signal that is emitted if the user presses 140 Enter.""" 141 142 signals = urwid.Edit.signals + ["prompt_enter"] 143 144 def __init__(self, *args, **kwargs): 145 self.single = False 146 super().__init__(*args, **kwargs) 147 148 def keypress(self, size, key): 149 if self.single: 150 urwid.emit_signal(self, "prompt_enter", self, key) 151 elif key == "enter": 152 urwid.emit_signal(self, "prompt_enter", self, self.get_edit_text()) 153 else: 154 return super().keypress(size, key) 155 156 157urwid.register_signal(StatusbarEdit, "prompt_enter") 158 159 160class Statusbar: 161 """Statusbar object, ripped off from bpython.cli. 162 163 This class provides the status bar at the bottom of the screen. 164 It has message() and prompt() methods for user interactivity, as 165 well as settext() and clear() methods for changing its appearance. 166 167 The check() method needs to be called repeatedly if the statusbar is 168 going to be aware of when it should update its display after a message() 169 has been called (it'll display for a couple of seconds and then disappear). 170 171 It should be called as: 172 foo = Statusbar('Initial text to display') 173 or, for a blank statusbar: 174 foo = Statusbar() 175 176 The "widget" attribute is an urwid widget. 177 """ 178 179 signals = ["prompt_result"] 180 181 def __init__(self, config, s=None, main_loop=None): 182 self.config = config 183 self.timer = None 184 self.main_loop = main_loop 185 self.s = s or "" 186 187 self.text = urwid.Text(("main", self.s)) 188 # use wrap mode 'clip' to just cut off at the end of line 189 self.text.set_wrap_mode("clip") 190 191 self.edit = StatusbarEdit(("main", "")) 192 urwid.connect_signal(self.edit, "prompt_enter", self._on_prompt_enter) 193 194 self.widget = urwid.Columns([self.text, self.edit]) 195 196 def _check(self, callback, userdata=None): 197 """This is the method is called from the timer to reset the status bar.""" 198 self.timer = None 199 self.settext(self.s) 200 201 def message(self, s, n=3): 202 """Display a message for a short n seconds on the statusbar and return 203 it to its original state.""" 204 205 self.settext(s) 206 self.timer = self.main_loop.set_alarm_in(n, self._check) 207 208 def _reset_timer(self): 209 """Reset the timer from message.""" 210 if self.timer is not None: 211 self.main_loop.remove_alarm(self.timer) 212 self.timer = None 213 214 def prompt(self, s=None, single=False): 215 """Prompt the user for some input (with the optional prompt 's'). After 216 the user hit enter the signal 'prompt_result' will be emitted and the 217 status bar will be reset. If single is True, the first keypress will be 218 returned.""" 219 220 self._reset_timer() 221 222 self.edit.single = single 223 self.edit.set_caption(("main", s or "?")) 224 self.edit.set_edit_text("") 225 # hide the text and display the edit widget 226 if self.edit not in self.widget.widget_list: 227 self.widget.widget_list.append(self.edit) 228 if self.text in self.widget.widget_list: 229 self.widget.widget_list.remove(self.text) 230 self.widget.set_focus_column(0) 231 232 def settext(self, s, permanent=False): 233 """Set the text on the status bar to a new value. If permanent is True, 234 the new value will be permanent. If that status bar is in prompt mode, 235 the prompt will be aborted.""" 236 237 self._reset_timer() 238 239 # hide the edit and display the text widget 240 if self.edit in self.widget.widget_list: 241 self.widget.widget_list.remove(self.edit) 242 if self.text not in self.widget.widget_list: 243 self.widget.widget_list.append(self.text) 244 245 self.text.set_text(("main", s)) 246 if permanent: 247 self.s = s 248 249 def clear(self): 250 """Clear the status bar.""" 251 self.settext("") 252 253 def _on_prompt_enter(self, edit, new_text): 254 """Reset the statusbar and pass the input from the prompt to the caller 255 via 'prompt_result'.""" 256 self.settext(self.s) 257 urwid.emit_signal(self, "prompt_result", new_text) 258 259 260urwid.register_signal(Statusbar, "prompt_result") 261 262 263def decoding_input_filter(keys, raw): 264 """Input filter for urwid which decodes each key with the locale's 265 preferred encoding.'""" 266 encoding = locale.getpreferredencoding() 267 converted_keys = list() 268 for key in keys: 269 if isinstance(key, str): 270 converted_keys.append(key.decode(encoding)) 271 else: 272 converted_keys.append(key) 273 return converted_keys 274 275 276def format_tokens(tokensource): 277 for token, text in tokensource: 278 if text == "\n": 279 continue 280 281 # TODO: something about inversing Parenthesis 282 while token not in theme_map: 283 token = token.parent 284 yield (theme_map[token], text) 285 286 287class BPythonEdit(urwid.Edit): 288 """Customized editor *very* tightly interwoven with URWIDRepl. 289 290 Changes include: 291 292 - The edit text supports markup, not just the caption. 293 This works by calling set_edit_markup from the change event 294 as well as whenever markup changes while text does not. 295 296 - The widget can be made readonly, which currently just means 297 it is no longer selectable and stops drawing the cursor. 298 299 This is currently a one-way operation, but that is just because 300 I only need and test the readwrite->readonly transition. 301 302 - move_cursor_to_coords is ignored 303 (except for internal calls from keypress or mouse_event). 304 305 - arrow up/down are ignored. 306 307 - an "edit-pos-changed" signal is emitted when edit_pos changes. 308 """ 309 310 signals = ["edit-pos-changed"] 311 312 def __init__(self, config, *args, **kwargs): 313 self._bpy_text = "" 314 self._bpy_attr = [] 315 self._bpy_selectable = True 316 self._bpy_may_move_cursor = False 317 self.config = config 318 self.tab_length = config.tab_length 319 super().__init__(*args, **kwargs) 320 321 def set_edit_pos(self, pos): 322 super().set_edit_pos(pos) 323 self._emit("edit-pos-changed", self.edit_pos) 324 325 def get_edit_pos(self): 326 return self._edit_pos 327 328 edit_pos = property(get_edit_pos, set_edit_pos) 329 330 def make_readonly(self): 331 self._bpy_selectable = False 332 # This is necessary to prevent the listbox we are in getting 333 # fresh cursor coords of None from get_cursor_coords 334 # immediately after we go readonly and then getting a cached 335 # canvas that still has the cursor set. It spots that 336 # inconsistency and raises. 337 self._invalidate() 338 339 def set_edit_markup(self, markup): 340 """Call this when markup changes but the underlying text does not. 341 342 You should arrange for this to be called from the 'change' signal. 343 """ 344 if markup: 345 self._bpy_text, self._bpy_attr = urwid.decompose_tagmarkup(markup) 346 else: 347 # decompose_tagmarkup in some urwids fails on the empty list 348 self._bpy_text, self._bpy_attr = "", [] 349 # This is redundant when we're called off the 'change' signal. 350 # I'm assuming this is cheap, making that ok. 351 self._invalidate() 352 353 def get_text(self): 354 return self._caption + self._bpy_text, self._attrib + self._bpy_attr 355 356 def selectable(self): 357 return self._bpy_selectable 358 359 def get_cursor_coords(self, *args, **kwargs): 360 # urwid gets confused if a nonselectable widget has a cursor position. 361 if not self._bpy_selectable: 362 return None 363 return super().get_cursor_coords(*args, **kwargs) 364 365 def render(self, size, focus=False): 366 # XXX I do not want to have to do this, but listbox gets confused 367 # if I do not (getting None out of get_cursor_coords because 368 # we just became unselectable, then having this render a cursor) 369 if not self._bpy_selectable: 370 focus = False 371 return super().render(size, focus=focus) 372 373 def get_pref_col(self, size): 374 # Need to make this deal with us being nonselectable 375 if not self._bpy_selectable: 376 return "left" 377 return super().get_pref_col(size) 378 379 def move_cursor_to_coords(self, *args): 380 if self._bpy_may_move_cursor: 381 return super().move_cursor_to_coords(*args) 382 return False 383 384 def keypress(self, size, key): 385 if urwid.command_map[key] in ("cursor up", "cursor down"): 386 # Do not handle up/down arrow, leave them for the repl. 387 return key 388 389 self._bpy_may_move_cursor = True 390 try: 391 if urwid.command_map[key] == "cursor max left": 392 self.edit_pos = 0 393 elif urwid.command_map[key] == "cursor max right": 394 self.edit_pos = len(self.get_edit_text()) 395 elif urwid.command_map[key] == "clear word": 396 # ^w 397 if self.edit_pos == 0: 398 return 399 line = self.get_edit_text() 400 # delete any space left of the cursor 401 p = len(line[: self.edit_pos].strip()) 402 line = line[:p] + line[self.edit_pos :] 403 # delete a full word 404 np = line.rfind(" ", 0, p) 405 if np == -1: 406 line = line[p:] 407 np = 0 408 else: 409 line = line[:np] + line[p:] 410 self.set_edit_text(line) 411 self.edit_pos = np 412 elif urwid.command_map[key] == "clear line": 413 line = self.get_edit_text() 414 self.set_edit_text(line[self.edit_pos :]) 415 self.edit_pos = 0 416 elif key == "backspace": 417 line = self.get_edit_text() 418 cpos = len(line) - self.edit_pos 419 if not (cpos or len(line) % self.tab_length or line.strip()): 420 self.set_edit_text(line[: -self.tab_length]) 421 else: 422 return super().keypress(size, key) 423 else: 424 # TODO: Add in specific keypress fetching code here 425 return super().keypress(size, key) 426 return None 427 finally: 428 self._bpy_may_move_cursor = False 429 430 def mouse_event(self, *args): 431 self._bpy_may_move_cursor = True 432 try: 433 return super().mouse_event(*args) 434 finally: 435 self._bpy_may_move_cursor = False 436 437 438class BPythonListBox(urwid.ListBox): 439 """Like `urwid.ListBox`, except that it does not eat up and 440 down keys. 441 """ 442 443 def keypress(self, size, key): 444 if key not in ("up", "down"): 445 return urwid.ListBox.keypress(self, size, key) 446 return key 447 448 449class Tooltip(urwid.BoxWidget): 450 """Container inspired by Overlay to position our tooltip. 451 452 bottom_w should be a BoxWidget. 453 The top window currently has to be a listbox to support shrinkwrapping. 454 455 This passes keyboard events to the bottom instead of the top window. 456 457 It also positions the top window relative to the cursor position 458 from the bottom window and hides it if there is no cursor. 459 """ 460 461 def __init__(self, bottom_w, listbox): 462 super().__init__() 463 464 self.bottom_w = bottom_w 465 self.listbox = listbox 466 # TODO: this linebox should use the 'main' color. 467 self.top_w = urwid.LineBox(listbox) 468 self.tooltip_focus = False 469 470 def selectable(self): 471 return self.bottom_w.selectable() 472 473 def keypress(self, size, key): 474 return self.bottom_w.keypress(size, key) 475 476 def mouse_event(self, size, event, button, col, row, focus): 477 # TODO: pass to top widget if visible and inside it. 478 if not hasattr(self.bottom_w, "mouse_event"): 479 return False 480 481 return self.bottom_w.mouse_event(size, event, button, col, row, focus) 482 483 def get_cursor_coords(self, size): 484 return self.bottom_w.get_cursor_coords(size) 485 486 def render(self, size, focus=False): 487 maxcol, maxrow = size 488 bottom_c = self.bottom_w.render(size, focus) 489 cursor = bottom_c.cursor 490 if not cursor: 491 # Hide the tooltip if there is no cursor. 492 return bottom_c 493 494 cursor_x, cursor_y = cursor 495 if cursor_y * 2 < maxrow: 496 # Cursor is in the top half. Tooltip goes below it: 497 y = cursor_y + 1 498 rows = maxrow - y 499 else: 500 # Cursor is in the bottom half. Tooltip fills the area above: 501 y = 0 502 rows = cursor_y 503 504 # HACK: shrink-wrap the tooltip. This is ugly in multiple ways: 505 # - It only works on a listbox. 506 # - It assumes the wrapping LineBox eats one char on each edge. 507 # - It is a loop. 508 # (ideally it would check how much free space there is, 509 # instead of repeatedly trying smaller sizes) 510 while "bottom" in self.listbox.ends_visible((maxcol - 2, rows - 3)): 511 rows -= 1 512 513 # If we're displaying above the cursor move the top edge down: 514 if not y: 515 y = cursor_y - rows 516 517 # Render *both* windows focused. This is probably not normal in urwid, 518 # but it works nicely. 519 top_c = self.top_w.render((maxcol, rows), focus and self.tooltip_focus) 520 521 combi_c = urwid.CanvasOverlay(top_c, bottom_c, 0, y) 522 # Use the cursor coordinates from the bottom canvas. 523 canvas = urwid.CompositeCanvas(combi_c) 524 canvas.cursor = cursor 525 return canvas 526 527 528class URWIDInteraction(repl.Interaction): 529 def __init__(self, config, statusbar, frame): 530 super().__init__(config, statusbar) 531 self.frame = frame 532 urwid.connect_signal(statusbar, "prompt_result", self._prompt_result) 533 self.callback = None 534 535 def confirm(self, q, callback): 536 """Ask for yes or no and call callback to return the result""" 537 538 def callback_wrapper(result): 539 callback(result.lower() in (_("y"), _("yes"))) 540 541 self.prompt(q, callback_wrapper, single=True) 542 543 def notify(self, s, n=10, wait_for_keypress=False): 544 return self.statusbar.message(s, n) 545 546 def prompt(self, s, callback=None, single=False): 547 """Prompt the user for input. The result will be returned via calling 548 callback. Note that there can only be one prompt active. But the 549 callback can already start a new prompt.""" 550 551 if self.callback is not None: 552 raise Exception("Prompt already in progress") 553 554 self.callback = callback 555 self.statusbar.prompt(s, single=single) 556 self.frame.set_focus("footer") 557 558 def _prompt_result(self, text): 559 self.frame.set_focus("body") 560 if self.callback is not None: 561 # The callback might want to start another prompt, so reset it 562 # before calling the callback. 563 callback = self.callback 564 self.callback = None 565 callback(text) 566 567 568class URWIDRepl(repl.Repl): 569 570 _time_between_redraws = 0.05 # seconds 571 572 def __init__(self, event_loop, palette, interpreter, config): 573 super().__init__(interpreter, config) 574 575 self._redraw_handle = None 576 self._redraw_pending = False 577 self._redraw_time = 0 578 579 self.listbox = BPythonListBox(urwid.SimpleListWalker([])) 580 581 self.tooltip = urwid.ListBox(urwid.SimpleListWalker([])) 582 self.tooltip.grid = None 583 self.overlay = Tooltip(self.listbox, self.tooltip) 584 self.stdout_hist = "" # native str (unicode in Py3) 585 586 self.frame = urwid.Frame(self.overlay) 587 588 if urwid.get_encoding_mode() == "narrow": 589 input_filter = decoding_input_filter 590 else: 591 input_filter = None 592 593 # This constructs a raw_display.Screen, which nabs sys.stdin/out. 594 self.main_loop = urwid.MainLoop( 595 self.frame, 596 palette, 597 event_loop=event_loop, 598 unhandled_input=self.handle_input, 599 input_filter=input_filter, 600 handle_mouse=False, 601 ) 602 603 # String is straight from bpython.cli 604 self.statusbar = Statusbar( 605 config, 606 _( 607 " <%s> Rewind <%s> Save <%s> Pastebin " 608 " <%s> Pager <%s> Show Source " 609 ) 610 % ( 611 config.undo_key, 612 config.save_key, 613 config.pastebin_key, 614 config.last_output_key, 615 config.show_source_key, 616 ), 617 self.main_loop, 618 ) 619 self.frame.set_footer(self.statusbar.widget) 620 self.interact = URWIDInteraction( 621 self.config, self.statusbar, self.frame 622 ) 623 624 self.edits = [] 625 self.edit = None 626 self.current_output = None 627 self._completion_update_suppressed = False 628 629 # Bulletproof: this is a value extract_exit_value accepts. 630 self.exit_value = () 631 632 load_urwid_command_map(config) 633 634 # Subclasses of Repl need to implement echo, current_line, cw 635 def echo(self, orig_s): 636 s = orig_s.rstrip("\n") 637 if s: 638 if self.current_output is None: 639 self.current_output = urwid.Text(("output", s)) 640 if self.edit is None: 641 self.listbox.body.append(self.current_output) 642 # Focus the widget we just added to force the 643 # listbox to scroll. This causes output to scroll 644 # if the user runs a blocking call that prints 645 # more than a screenful, instead of staying 646 # scrolled to the previous input line and then 647 # jumping to the bottom when done. 648 self.listbox.set_focus(len(self.listbox.body) - 1) 649 else: 650 self.listbox.body.insert(-1, self.current_output) 651 # The edit widget should be focused and *stay* focused. 652 # XXX TODO: make sure the cursor stays in the same spot. 653 self.listbox.set_focus(len(self.listbox.body) - 1) 654 else: 655 # XXX this assumes this all has "output" markup applied. 656 self.current_output.set_text( 657 ("output", self.current_output.text + s) 658 ) 659 if orig_s.endswith("\n"): 660 self.current_output = None 661 662 # If we hit this repeatedly in a loop the redraw is rather 663 # slow (testcase: pprint(__builtins__). So if we have recently 664 # drawn the screen already schedule a call in the future. 665 # 666 # Unfortunately we may hit this function repeatedly through a 667 # blocking call triggered by the user, in which case our 668 # timeout will not run timely as we do not return to urwid's 669 # eventloop. So we manually check if our timeout has long 670 # since expired, and redraw synchronously if it has. 671 if self._redraw_handle is None: 672 self.main_loop.draw_screen() 673 674 def maybe_redraw(loop, self): 675 if self._redraw_pending: 676 loop.draw_screen() 677 self._redraw_pending = False 678 679 self._redraw_handle = None 680 681 self._redraw_handle = self.main_loop.set_alarm_in( 682 self._time_between_redraws, maybe_redraw, self 683 ) 684 self._redraw_time = time.time() 685 else: 686 self._redraw_pending = True 687 now = time.time() 688 if now - self._redraw_time > 2 * self._time_between_redraws: 689 # The timeout is well past expired, assume we're 690 # blocked and redraw synchronously. 691 self.main_loop.draw_screen() 692 self._redraw_time = now 693 694 def _get_current_line(self): 695 if self.edit is None: 696 return "" 697 return self.edit.get_edit_text() 698 699 def _set_current_line(self, line): 700 self.edit.set_edit_text(line) 701 702 current_line = property( 703 _get_current_line, 704 _set_current_line, 705 None, 706 "Return the current line (the one the cursor is in).", 707 ) 708 709 def cw(self): 710 """Return the current word (incomplete word left of cursor).""" 711 if self.edit is None: 712 return 713 714 pos = self.edit.edit_pos 715 text = self.edit.get_edit_text() 716 if pos != len(text): 717 # Disable autocomplete if not at end of line, like cli does. 718 return 719 720 # Stolen from cli. TODO: clean up and split out. 721 if not text or (not text[-1].isalnum() and text[-1] not in (".", "_")): 722 return 723 724 # Seek backwards in text for the first non-identifier char: 725 for i, c in enumerate(reversed(text)): 726 if not c.isalnum() and c not in (".", "_"): 727 break 728 else: 729 # No non-identifiers, return everything. 730 return text 731 # Return everything to the right of the non-identifier. 732 return text[-i:] 733 734 @property 735 def cpos(self): 736 if self.edit is not None: 737 return len(self.current_line) - self.edit.edit_pos 738 return 0 739 740 def _get_cursor_offset(self): 741 return self.edit.edit_pos 742 743 def _set_cursor_offset(self, offset): 744 self.edit.edit_pos = offset 745 746 cursor_offset = property( 747 _get_cursor_offset, 748 _set_cursor_offset, 749 None, 750 "The cursor offset from the beginning of the line", 751 ) 752 753 def _populate_completion(self): 754 widget_list = self.tooltip.body 755 while widget_list: 756 widget_list.pop() 757 # This is just me flailing around wildly. TODO: actually write. 758 if self.complete(): 759 if self.funcprops: 760 # This is mostly just stolen from the cli module. 761 func_name, args, is_bound = self.funcprops 762 in_arg = self.arg_pos 763 args, varargs, varkw, defaults = args[:4] 764 kwonly = self.funcprops.argspec.kwonly 765 kwonly_defaults = self.funcprops.argspec.kwonly_defaults or {} 766 markup = [("bold name", func_name), ("name", ": (")] 767 768 # the isinstance checks if we're in a positional arg 769 # (instead of a keyword arg), I think 770 if is_bound and isinstance(in_arg, int): 771 in_arg += 1 772 773 # bpython.cli checks if this goes off the edge and 774 # does clever wrapping. I do not (yet). 775 for k, i in enumerate(args): 776 if defaults and k + 1 > len(args) - len(defaults): 777 kw = repr(defaults[k - (len(args) - len(defaults))]) 778 else: 779 kw = None 780 781 if not k and str(i) == "self": 782 color = "name" 783 else: 784 color = "token" 785 786 if k == in_arg or i == in_arg: 787 color = "bold " + color 788 789 markup.append((color, str(i))) 790 if kw is not None: 791 markup.extend([("punctuation", "="), ("token", kw)]) 792 if k != len(args) - 1: 793 markup.append(("punctuation", ", ")) 794 795 if varargs: 796 if args: 797 markup.append(("punctuation", ", ")) 798 markup.append(("token", "*" + varargs)) 799 800 if kwonly: 801 if not varargs: 802 if args: 803 markup.append(("punctuation", ", ")) 804 markup.append(("punctuation", "*")) 805 for arg in kwonly: 806 if arg == in_arg: 807 color = "bold token" 808 else: 809 color = "token" 810 markup.extend([("punctuation", ", "), (color, arg)]) 811 if arg in kwonly_defaults: 812 markup.extend( 813 [ 814 ("punctuation", "="), 815 ("token", repr(kwonly_defaults[arg])), 816 ] 817 ) 818 819 if varkw: 820 if args or varargs or kwonly: 821 markup.append(("punctuation", ", ")) 822 markup.append(("token", "**" + varkw)) 823 markup.append(("punctuation", ")")) 824 widget_list.append(urwid.Text(markup)) 825 if self.matches_iter.matches: 826 attr_map = {} 827 focus_map = {"main": "operator"} 828 texts = [ 829 urwid.AttrMap( 830 urwid.Text(("main", match)), attr_map, focus_map 831 ) 832 for match in self.matches_iter.matches 833 ] 834 width = max(text.original_widget.pack()[0] for text in texts) 835 gridflow = urwid.GridFlow(texts, width, 1, 0, "left") 836 widget_list.append(gridflow) 837 self.tooltip.grid = gridflow 838 self.overlay.tooltip_focus = False 839 else: 840 self.tooltip.grid = None 841 self.frame.body = self.overlay 842 else: 843 self.frame.body = self.listbox 844 self.tooltip.grid = None 845 846 if self.docstring: 847 # TODO: use self.format_docstring? needs a width/height... 848 docstring = self.docstring 849 widget_list.append(urwid.Text(("comment", docstring))) 850 851 def reprint_line(self, lineno, tokens): 852 edit = self.edits[-len(self.buffer) + lineno - 1] 853 edit.set_edit_markup(list(format_tokens(tokens))) 854 855 def getstdout(self): 856 """This method returns the 'spoofed' stdout buffer, for writing to a 857 file or sending to a pastebin or whatever.""" 858 859 return self.stdout_hist + "\n" 860 861 def ask_confirmation(self, q): 862 """Ask for yes or no and return boolean""" 863 try: 864 reply = self.statusbar.prompt(q) 865 except ValueError: 866 return False 867 868 return reply.lower() in ("y", "yes") 869 870 def reevaluate(self): 871 """Clear the buffer, redraw the screen and re-evaluate the history""" 872 873 self.evaluating = True 874 self.stdout_hist = "" 875 self.f_string = "" 876 self.buffer = [] 877 self.scr.erase() 878 # Set cursor position to -1 to prevent paren matching 879 self.cpos = -1 880 881 self.prompt(False) 882 883 self.iy, self.ix = self.scr.getyx() 884 for line in self.history: 885 self.stdout_hist += line + "\n" 886 self.print_line(line) 887 # I decided it was easier to just do this manually 888 # than to make the print_line and history stuff more flexible. 889 self.scr.addstr("\n") 890 more = self.push(line) 891 self.prompt(more) 892 self.iy, self.ix = self.scr.getyx() 893 894 self.cpos = 0 895 indent = repl.next_indentation(self.s, self.config.tab_length) 896 self.s = "" 897 self.scr.refresh() 898 899 if self.buffer: 900 for unused in range(indent): 901 self.tab() 902 903 self.evaluating = False 904 # map(self.push, self.history) 905 # ^-- That's how simple this method was at first :( 906 907 def write(self, s): 908 """For overriding stdout defaults""" 909 if "\x04" in s: 910 for block in s.split("\x04"): 911 self.write(block) 912 return 913 if s.rstrip() and "\x03" in s: 914 t = s.split("\x03")[1] 915 else: 916 t = s 917 918 if not self.stdout_hist: 919 self.stdout_hist = t 920 else: 921 self.stdout_hist += t 922 923 self.echo(s) 924 925 def push(self, s, insert_into_history=True): 926 # Restore the original SIGINT handler. This is needed to be able 927 # to break out of infinite loops. If the interpreter itself 928 # sees this it prints 'KeyboardInterrupt' and returns (good). 929 orig_handler = signal.getsignal(signal.SIGINT) 930 signal.signal(signal.SIGINT, signal.default_int_handler) 931 # Pretty blindly adapted from bpython.cli 932 try: 933 return repl.Repl.push(self, s, insert_into_history) 934 except SystemExit as e: 935 self.exit_value = e.args 936 raise urwid.ExitMainLoop() 937 except KeyboardInterrupt: 938 # KeyboardInterrupt happened between the except block around 939 # user code execution and this code. This should be rare, 940 # but make sure to not kill bpython here, so leaning on 941 # ctrl+c to kill buggy code running inside bpython is safe. 942 self.keyboard_interrupt() 943 finally: 944 signal.signal(signal.SIGINT, orig_handler) 945 946 def start(self): 947 self.prompt(False) 948 949 def keyboard_interrupt(self): 950 # If the user is currently editing, interrupt him. This 951 # mirrors what the regular python REPL does. 952 if self.edit is not None: 953 # XXX this is a lot of code, and I am not sure it is 954 # actually enough code. Needs some testing. 955 self.edit.make_readonly() 956 self.edit = None 957 self.buffer = [] 958 self.echo("KeyboardInterrupt") 959 self.prompt(False) 960 else: 961 # I do not quite remember if this is reachable, but let's 962 # be safe. 963 self.echo("KeyboardInterrupt") 964 965 def prompt(self, more): 966 # Clear current output here, or output resulting from the 967 # current prompt run will end up appended to the edit widget 968 # sitting above this prompt: 969 self.current_output = None 970 # XXX is this the right place? 971 self.rl_history.reset() 972 973 # We need the caption to use unicode as urwid normalizes later 974 # input to be the same type, using ascii as encoding. If the 975 # caption is bytes this breaks typing non-ascii into bpython. 976 if not more: 977 caption = ("prompt", self.ps1) 978 self.stdout_hist += self.ps1 979 else: 980 caption = ("prompt_more", self.ps2) 981 self.stdout_hist += self.ps2 982 self.edit = BPythonEdit(self.config, caption=caption) 983 984 urwid.connect_signal(self.edit, "change", self.on_input_change) 985 urwid.connect_signal( 986 self.edit, "edit-pos-changed", self.on_edit_pos_changed 987 ) 988 # Do this after connecting the change signal handler: 989 self.edit.insert_text(4 * self.next_indentation() * " ") 990 self.edits.append(self.edit) 991 self.listbox.body.append(self.edit) 992 self.listbox.set_focus(len(self.listbox.body) - 1) 993 # Hide the tooltip 994 self.frame.body = self.listbox 995 996 def on_input_change(self, edit, text): 997 # TODO: we get very confused here if "text" contains newlines, 998 # so we cannot put our edit widget in multiline mode yet. 999 # That is probably fixable... 1000 tokens = self.tokenize(text, False) 1001 edit.set_edit_markup(list(format_tokens(tokens))) 1002 if not self._completion_update_suppressed: 1003 # If we call this synchronously the get_edit_text() in repl.cw 1004 # still returns the old text... 1005 self.main_loop.set_alarm_in( 1006 0, lambda *args: self._populate_completion() 1007 ) 1008 1009 def on_edit_pos_changed(self, edit, position): 1010 """Gets called when the cursor position inside the edit changed. 1011 Rehighlight the current line because there might be a paren under 1012 the cursor now.""" 1013 tokens = self.tokenize(self.current_line, False) 1014 edit.set_edit_markup(list(format_tokens(tokens))) 1015 1016 def handle_input(self, event): 1017 # Since most of the input handling here should be handled in the edit 1018 # instead, we return here early if the edit doesn't have the focus. 1019 if self.frame.get_focus() != "body": 1020 return 1021 1022 if event == "enter": 1023 inp = self.edit.get_edit_text() 1024 self.history.append(inp) 1025 self.edit.make_readonly() 1026 self.stdout_hist += inp 1027 self.stdout_hist += "\n" 1028 self.edit = None 1029 # This may take a while, so force a redraw first: 1030 self.main_loop.draw_screen() 1031 more = self.push(inp) 1032 self.prompt(more) 1033 elif event == "ctrl d": 1034 # ctrl+d on an empty line exits, otherwise deletes 1035 if self.edit is not None: 1036 if not self.edit.get_edit_text(): 1037 raise urwid.ExitMainLoop() 1038 else: 1039 self.main_loop.process_input(["delete"]) 1040 elif urwid.command_map[event] == "cursor up": 1041 # "back" from bpython.cli 1042 self.rl_history.enter(self.edit.get_edit_text()) 1043 self.edit.set_edit_text("") 1044 self.edit.insert_text(self.rl_history.back()) 1045 elif urwid.command_map[event] == "cursor down": 1046 # "fwd" from bpython.cli 1047 self.rl_history.enter(self.edit.get_edit_text()) 1048 self.edit.set_edit_text("") 1049 self.edit.insert_text(self.rl_history.forward()) 1050 elif urwid.command_map[event] == "next selectable": 1051 self.tab() 1052 elif urwid.command_map[event] == "prev selectable": 1053 self.tab(True) 1054 # else: 1055 # self.echo(repr(event)) 1056 1057 def tab(self, back=False): 1058 """Process the tab key being hit. 1059 1060 If the line is blank or has only whitespace: indent. 1061 1062 If there is text before the cursor: cycle completions. 1063 1064 If `back` is True cycle backwards through completions, and return 1065 instead of indenting. 1066 1067 Returns True if the key was handled. 1068 """ 1069 self._completion_update_suppressed = True 1070 try: 1071 # Heavily inspired by cli's tab. 1072 text = self.edit.get_edit_text() 1073 if not text.lstrip() and not back: 1074 x_pos = len(text) - self.cpos 1075 num_spaces = x_pos % self.config.tab_length 1076 if not num_spaces: 1077 num_spaces = self.config.tab_length 1078 1079 self.edit.insert_text(" " * num_spaces) 1080 return True 1081 1082 if not self.matches_iter: 1083 self.complete(tab=True) 1084 cw = self.current_string() or self.cw() 1085 if not cw: 1086 return True 1087 1088 if self.matches_iter.is_cseq(): 1089 cursor, text = self.matches_iter.substitute_cseq() 1090 self.edit.set_edit_text(text) 1091 self.edit.edit_pos = cursor 1092 elif self.matches_iter.matches: 1093 if back: 1094 self.matches_iter.previous() 1095 else: 1096 next(self.matches_iter) 1097 cursor, text = self.matches_iter.cur_line() 1098 self.edit.set_edit_text(text) 1099 self.edit.edit_pos = cursor 1100 self.overlay.tooltip_focus = True 1101 if self.tooltip.grid: 1102 self.tooltip.grid.set_focus(self.matches_iter.index) 1103 return True 1104 finally: 1105 self._completion_update_suppressed = False 1106 1107 1108def main(args=None, locals_=None, banner=None): 1109 translations.init() 1110 1111 def options_callback(group): 1112 group.add_argument( 1113 "--twisted", 1114 "-T", 1115 action="store_true", 1116 help=_("Run twisted reactor."), 1117 ) 1118 group.add_argument( 1119 "--reactor", 1120 "-r", 1121 help=_( 1122 "Select specific reactor (see --help-reactors). " 1123 "Implies --twisted." 1124 ), 1125 ) 1126 group.add_argument( 1127 "--help-reactors", 1128 action="store_true", 1129 help=_("List available reactors for -r."), 1130 ) 1131 group.add_argument( 1132 "--plugin", 1133 "-p", 1134 help=_( 1135 "twistd plugin to run (use twistd for a list). " 1136 'Use "--" to pass further options to the plugin.' 1137 ), 1138 ) 1139 group.add_argument( 1140 "--server", 1141 "-s", 1142 type=int, 1143 help=_("Port to run an eval server on (forces Twisted)."), 1144 ) 1145 1146 # TODO: maybe support displays other than raw_display? 1147 config, options, exec_args = bpargs.parse( 1148 args, 1149 ( 1150 "Urwid options", 1151 None, 1152 options_callback, 1153 ), 1154 ) 1155 1156 if options.help_reactors: 1157 try: 1158 from twisted.application import reactors 1159 1160 # Stolen from twisted.application.app (twistd). 1161 for r in reactors.getReactorTypes(): 1162 print(f" {r.shortName:<4}\t{r.description}") 1163 except ImportError: 1164 sys.stderr.write( 1165 "No reactors are available. Please install " 1166 "twisted for reactor support.\n" 1167 ) 1168 return 1169 1170 palette = [ 1171 ( 1172 name, 1173 COLORMAP[color.lower()], 1174 "default", 1175 "bold" if color.isupper() else "default", 1176 ) 1177 for name, color in config.color_scheme.items() 1178 ] 1179 palette.extend( 1180 [ 1181 ("bold " + name, color + ",bold", background, monochrome) 1182 for name, color, background, monochrome in palette 1183 ] 1184 ) 1185 1186 if options.server or options.plugin: 1187 options.twisted = True 1188 1189 if options.reactor: 1190 try: 1191 from twisted.application import reactors 1192 except ImportError: 1193 sys.stderr.write( 1194 "No reactors are available. Please install " 1195 "twisted for reactor support.\n" 1196 ) 1197 return 1198 try: 1199 # XXX why does this not just return the reactor it installed? 1200 reactor = reactors.installReactor(options.reactor) 1201 if reactor is None: 1202 from twisted.internet import reactor 1203 except reactors.NoSuchReactor: 1204 sys.stderr.write(f"Reactor {options.reactor} does not exist\n") 1205 return 1206 event_loop = TwistedEventLoop(reactor) 1207 elif options.twisted: 1208 try: 1209 from twisted.internet import reactor 1210 except ImportError: 1211 sys.stderr.write( 1212 "No reactors are available. Please install " 1213 "twisted for reactor support.\n" 1214 ) 1215 return 1216 event_loop = TwistedEventLoop(reactor) 1217 else: 1218 # None, not urwid.SelectEventLoop(), to work with 1219 # screens that do not support external event loops. 1220 event_loop = None 1221 # TODO: there is also a glib event loop. Do we want that one? 1222 1223 extend_locals = {} 1224 if options.plugin: 1225 try: 1226 from twisted import plugin 1227 from twisted.application import service 1228 except ImportError: 1229 sys.stderr.write( 1230 "No twisted plugins are available. Please install " 1231 "twisted for twisted plugin support.\n" 1232 ) 1233 return 1234 1235 for plug in plugin.getPlugins(service.IServiceMaker): 1236 if plug.tapname == options.plugin: 1237 break 1238 else: 1239 sys.stderr.write(f"Plugin {options.plugin} does not exist\n") 1240 return 1241 plugopts = plug.options() 1242 plugopts.parseOptions(exec_args) 1243 serv = plug.makeService(plugopts) 1244 extend_locals["service"] = serv 1245 reactor.callWhenRunning(serv.startService) 1246 exec_args = [] 1247 interpreter = repl.Interpreter(locals_, locale.getpreferredencoding()) 1248 # TODO: replace with something less hack-ish 1249 interpreter.locals.update(extend_locals) 1250 1251 # This nabs sys.stdin/out via urwid.MainLoop 1252 myrepl = URWIDRepl(event_loop, palette, interpreter, config) 1253 1254 if options.server: 1255 factory = EvalFactory(myrepl) 1256 reactor.listenTCP(options.server, factory, interface="127.0.0.1") 1257 1258 if options.reactor: 1259 # Twisted sets a sigInt handler that stops the reactor unless 1260 # it sees a different custom signal handler. 1261 def sigint(*args): 1262 reactor.callFromThread(myrepl.keyboard_interrupt) 1263 1264 signal.signal(signal.SIGINT, sigint) 1265 1266 # Save stdin, stdout and stderr for later restoration 1267 orig_stdin = sys.stdin 1268 orig_stdout = sys.stdout 1269 orig_stderr = sys.stderr 1270 # urwid's screen start() and stop() calls currently hit sys.stdin 1271 # directly (via RealTerminal.tty_signal_keys), so start the screen 1272 # before swapping sys.std*, and swap them back before restoring 1273 # the screen. This also avoids crashes if our redirected sys.std* 1274 # are called before we get around to starting the mainloop 1275 # (urwid raises an exception if we try to draw to the screen 1276 # before starting it). 1277 1278 def run_with_screen_before_mainloop(): 1279 try: 1280 # Currently we just set this to None because I do not 1281 # expect code hitting stdin to work. For example: exit() 1282 # (not sys.exit, site.py's exit) tries to close sys.stdin, 1283 # which breaks urwid's shutdown. bpython.cli sets this to 1284 # a fake object that reads input through curses and 1285 # returns it. When using twisted I do not think we can do 1286 # that because sys.stdin.read and friends block, and we 1287 # cannot re-enter the reactor. If using urwid's own 1288 # mainloop we *might* be able to do something similar and 1289 # re-enter its mainloop. 1290 sys.stdin = None # FakeStdin(myrepl) 1291 sys.stdout = myrepl 1292 sys.stderr = myrepl 1293 1294 myrepl.main_loop.set_alarm_in(0, start) 1295 1296 while True: 1297 try: 1298 myrepl.main_loop.run() 1299 except KeyboardInterrupt: 1300 # HACK: if we run under a twisted mainloop this should 1301 # never happen: we have a SIGINT handler set. 1302 # If we use the urwid select-based loop we just restart 1303 # that loop if interrupted, instead of trying to cook 1304 # up an equivalent to reactor.callFromThread (which 1305 # is what our Twisted sigint handler does) 1306 myrepl.main_loop.set_alarm_in( 1307 0, lambda *args: myrepl.keyboard_interrupt() 1308 ) 1309 continue 1310 break 1311 1312 finally: 1313 sys.stdin = orig_stdin 1314 sys.stderr = orig_stderr 1315 sys.stdout = orig_stdout 1316 1317 # This needs more thought. What needs to happen inside the mainloop? 1318 def start(main_loop, user_data): 1319 if exec_args: 1320 bpargs.exec_code(interpreter, exec_args) 1321 if not options.interactive: 1322 raise urwid.ExitMainLoop() 1323 if not exec_args: 1324 sys.path.insert(0, "") 1325 # this is CLIRepl.startup inlined. 1326 filename = os.environ.get("PYTHONSTARTUP") 1327 if filename and os.path.isfile(filename): 1328 with open(filename) as f: 1329 interpreter.runsource(f.read(), filename, "exec") 1330 1331 if banner is not None: 1332 myrepl.write(banner) 1333 myrepl.write("\n") 1334 1335 # XXX these deprecation warnings need to go at some point 1336 myrepl.write( 1337 _( 1338 "WARNING: You are using `bpython-urwid`, the urwid backend for `bpython`. This backend has been deprecated in version 0.19 and might disappear in a future version." 1339 ) 1340 ) 1341 myrepl.write("\n") 1342 1343 myrepl.start() 1344 1345 # This bypasses main_loop.set_alarm_in because we must *not* 1346 # hit the draw_screen call (it's unnecessary and slow). 1347 def run_find_coroutine(): 1348 if myrepl.module_gatherer.find_coroutine(): 1349 main_loop.event_loop.alarm(0, run_find_coroutine) 1350 1351 run_find_coroutine() 1352 1353 myrepl.main_loop.screen.run_wrapper(run_with_screen_before_mainloop) 1354 1355 if config.flush_output and not options.quiet: 1356 sys.stdout.write(myrepl.getstdout()) 1357 if hasattr(sys.stdout, "flush"): 1358 sys.stdout.flush() 1359 return repl.extract_exit_value(myrepl.exit_value) 1360 1361 1362def load_urwid_command_map(config): 1363 urwid.command_map[key_dispatch[config.up_one_line_key]] = "cursor up" 1364 urwid.command_map[key_dispatch[config.down_one_line_key]] = "cursor down" 1365 urwid.command_map[key_dispatch["C-a"]] = "cursor max left" 1366 urwid.command_map[key_dispatch["C-e"]] = "cursor max right" 1367 urwid.command_map[key_dispatch[config.pastebin_key]] = "pastebin" 1368 urwid.command_map[key_dispatch["C-f"]] = "cursor right" 1369 urwid.command_map[key_dispatch["C-b"]] = "cursor left" 1370 urwid.command_map[key_dispatch["C-d"]] = "delete" 1371 urwid.command_map[key_dispatch[config.clear_word_key]] = "clear word" 1372 urwid.command_map[key_dispatch[config.clear_line_key]] = "clear line" 1373 1374 1375""" 1376 'clear_screen': 'C-l', 1377 'cut_to_buffer': 'C-k', 1378 'down_one_line': 'C-n', 1379 'exit': '', 1380 'last_output': 'F9', 1381 'pastebin': 'F8', 1382 'save': 'C-s', 1383 'show_source': 'F2', 1384 'suspend': 'C-z', 1385 'undo': 'C-r', 1386 'up_one_line': 'C-p', 1387 'yank_from_buffer': 'C-y'}, 1388""" 1389if __name__ == "__main__": 1390 sys.exit(main()) 1391