1import greenlet 2import time 3from curtsies import events 4 5from ..translations import _ 6from ..repl import Interaction 7from ..curtsiesfrontend.events import RefreshRequestEvent 8from ..curtsiesfrontend.manual_readline import edit_keys 9 10 11class StatusBar(Interaction): 12 """StatusBar and Interaction for Repl 13 14 Passing of control back and forth between calls that use interact api 15 (notify, confirm, file_prompt) like bpython.Repl.write2file and events on 16 the main thread happens via those calls and 17 self.wait_for_request_or_notify. 18 19 Calling one of these three is required for the main thread to regain 20 control! 21 22 This is probably a terrible idea, and better would be rewriting this 23 functionality in a evented or callback style, but trying to integrate 24 bpython.Repl code. 25 """ 26 27 def __init__( 28 self, 29 config, 30 permanent_text="", 31 request_refresh=lambda: None, 32 schedule_refresh=lambda when: None, 33 ): 34 self._current_line = "" 35 self.cursor_offset_in_line = 0 36 self.in_prompt = False 37 self.in_confirm = False 38 self.waiting_for_refresh = False 39 self.prompt = "" 40 self._message = "" 41 self.message_start_time = time.time() 42 self.message_time = 3 43 self.permanent_stack = [] 44 if permanent_text: 45 self.permanent_stack.append(permanent_text) 46 self.main_context = greenlet.getcurrent() 47 self.request_context = None 48 self.request_refresh = request_refresh 49 self.schedule_refresh = schedule_refresh 50 51 super().__init__(config) 52 53 def push_permanent_message(self, msg): 54 self._message = "" 55 self.permanent_stack.append(msg) 56 57 def pop_permanent_message(self, msg): 58 if msg in self.permanent_stack: 59 self.permanent_stack.remove(msg) 60 else: 61 raise ValueError("Message %r was not in permanent_stack" % msg) 62 63 @property 64 def has_focus(self): 65 return self.in_prompt or self.in_confirm or self.waiting_for_refresh 66 67 def message(self, msg, schedule_refresh=True): 68 """Sets a temporary message""" 69 self.message_start_time = time.time() 70 self._message = msg 71 if schedule_refresh: 72 self.schedule_refresh(time.time() + self.message_time) 73 74 def _check_for_expired_message(self): 75 if ( 76 self._message 77 and time.time() > self.message_start_time + self.message_time 78 ): 79 self._message = "" 80 81 def process_event(self, e) -> None: 82 """Returns True if shutting down""" 83 assert self.in_prompt or self.in_confirm or self.waiting_for_refresh 84 if isinstance(e, RefreshRequestEvent): 85 self.waiting_for_refresh = False 86 self.request_context.switch() 87 elif isinstance(e, events.PasteEvent): 88 for ee in e.events: 89 # strip control seq 90 self.add_normal_character(ee if len(ee) == 1 else ee[-1]) 91 elif e == "<ESC>" or isinstance(e, events.SigIntEvent): 92 self.request_context.switch(False) 93 self.escape() 94 elif e in edit_keys: 95 self.cursor_offset_in_line, self._current_line = edit_keys[e]( 96 self.cursor_offset_in_line, self._current_line 97 ) 98 elif e == "<Ctrl-c>": # TODO can this be removed? 99 raise KeyboardInterrupt() 100 elif e == "<Ctrl-d>": # TODO this isn't a very intuitive behavior 101 raise SystemExit() 102 elif self.in_prompt and e in ("\n", "\r", "<Ctrl-j>", "Ctrl-m>"): 103 line = self._current_line 104 self.escape() 105 self.request_context.switch(line) 106 elif self.in_confirm: 107 if e.lower() == _("y"): 108 self.request_context.switch(True) 109 else: 110 self.request_context.switch(False) 111 self.escape() 112 else: # add normal character 113 self.add_normal_character(e) 114 115 def add_normal_character(self, e): 116 if e == "<SPACE>": 117 e = " " 118 if len(e) > 1: 119 return 120 self._current_line = ( 121 self._current_line[: self.cursor_offset_in_line] 122 + e 123 + self._current_line[self.cursor_offset_in_line :] 124 ) 125 self.cursor_offset_in_line += 1 126 127 def escape(self): 128 """unfocus from statusbar, clear prompt state, wait for notify call""" 129 self.in_prompt = False 130 self.in_confirm = False 131 self.prompt = "" 132 self._current_line = "" 133 134 @property 135 def current_line(self): 136 self._check_for_expired_message() 137 if self.in_prompt: 138 return self.prompt + self._current_line 139 if self.in_confirm: 140 return self.prompt 141 if self._message: 142 return self._message 143 if self.permanent_stack: 144 return self.permanent_stack[-1] 145 return "" 146 147 @property 148 def should_show_message(self): 149 return bool(self.current_line) 150 151 # interaction interface - should be called from other greenlets 152 def notify(self, msg, n=3, wait_for_keypress=False): 153 self.request_context = greenlet.getcurrent() 154 self.message_time = n 155 self.message(msg, schedule_refresh=wait_for_keypress) 156 self.waiting_for_refresh = True 157 self.request_refresh() 158 self.main_context.switch(msg) 159 160 # below really ought to be called from greenlets other than main because 161 # they block 162 def confirm(self, q): 163 """Expected to return True or False, given question prompt q""" 164 self.request_context = greenlet.getcurrent() 165 self.prompt = q 166 self.in_confirm = True 167 return self.main_context.switch(q) 168 169 def file_prompt(self, s): 170 """Expected to return a file name, given""" 171 self.request_context = greenlet.getcurrent() 172 self.prompt = s 173 self.in_prompt = True 174 return self.main_context.switch(s) 175