1# This file is part of ranger, the console file manager. 2# License: GNU GPL version 3, see the file "AUTHORS" for details. 3 4"""The Console widget implements a vim-like console""" 5 6from __future__ import (absolute_import, division, print_function) 7 8import curses 9import os 10import re 11from collections import deque 12 13from ranger.gui.widgets import Widget 14from ranger.ext.direction import Direction 15from ranger.ext.widestring import uwid, WideString 16from ranger.container.history import History, HistoryEmptyException 17import ranger 18 19 20class Console(Widget): # pylint: disable=too-many-instance-attributes,too-many-public-methods 21 visible = False 22 last_cursor_mode = None 23 history_search_pattern = None 24 prompt = ':' 25 copy = '' 26 tab_deque = None 27 original_line = None 28 history = None 29 history_backup = None 30 override = None 31 allow_close = False 32 historypath = None 33 wait_for_command_input = False 34 unicode_buffer = "" 35 36 def __init__(self, win): 37 Widget.__init__(self, win) 38 self.pos = 0 39 self.line = '' 40 self.history = History(self.settings.max_console_history_size) 41 # load history from files 42 if not ranger.args.clean: 43 self.historypath = self.fm.datapath('history') 44 if os.path.exists(self.historypath): 45 try: 46 fobj = open(self.historypath, 'r') 47 except OSError as ex: 48 self.fm.notify('Failed to read history file', bad=True, exception=ex) 49 else: 50 try: 51 for line in fobj: 52 self.history.add(line[:-1]) 53 except UnicodeDecodeError as ex: 54 self.fm.notify('Failed to parse corrupt history file', 55 bad=True, exception=ex) 56 fobj.close() 57 self.history_backup = History(self.history) 58 59 # NOTE: the console is considered in the "question mode" when the 60 # question_queue is non-empty. In that case, the console will draw the 61 # question instead of the regular console, and the input you give is 62 # used to answer the question instead of typing in commands. 63 # 64 # A question is a tuple of (question_string, callback_func, 65 # tuple_of_choices). callback_func is a function that is called when 66 # the question is answered which gets the answer as an argument. 67 # tuple_of_choices looks like ('y', 'n'). Only one-letter-answers are 68 # currently supported. Pressing enter uses the first choice whereas 69 # pressing ESC uses the second choice. 70 self.question_queue = [] 71 72 def destroy(self): 73 # save history to files 74 if ranger.args.clean or not self.settings.save_console_history: 75 return 76 if self.historypath: 77 try: 78 fobj = open(self.historypath, 'w') 79 except OSError as ex: 80 self.fm.notify('Failed to write history file', bad=True, exception=ex) 81 else: 82 for entry in self.history_backup: 83 try: 84 fobj.write(entry + '\n') 85 except UnicodeEncodeError: 86 pass 87 fobj.close() 88 Widget.destroy(self) 89 90 def _calculate_offset(self): 91 wid = self.wid - 2 92 whalf = wid // 2 93 if self.pos < whalf or len(self.line) < wid: 94 return 0 95 if self.pos > len(self.line) - (wid - whalf): 96 return len(self.line) - wid 97 return self.pos - whalf 98 99 def draw(self): 100 self.win.erase() 101 if self.question_queue: 102 assert isinstance(self.question_queue[0], tuple) 103 assert len(self.question_queue[0]) == 3 104 self.addstr(0, 0, self.question_queue[0][0][self.pos:]) 105 return 106 107 self.addstr(0, 0, self.prompt) 108 line = WideString(self.line) 109 if line: 110 x = self._calculate_offset() 111 self.addstr(0, len(self.prompt), str(line[x:])) 112 113 def finalize(self): 114 move = self.fm.ui.win.move 115 if self.question_queue: 116 try: 117 move(self.y, len(self.question_queue[0][0])) 118 except curses.error: 119 pass 120 else: 121 try: 122 x = self._calculate_offset() 123 pos = uwid(self.line[x:self.pos]) + len(self.prompt) 124 move(self.y, self.x + min(self.wid - 1, pos)) 125 except curses.error: 126 pass 127 128 def open(self, string='', prompt=None, position=None): 129 if prompt is not None: 130 assert isinstance(prompt, str) 131 self.prompt = prompt 132 elif 'prompt' in self.__dict__: 133 del self.prompt 134 135 if self.last_cursor_mode is None: 136 try: 137 self.last_cursor_mode = curses.curs_set(1) 138 except curses.error: 139 pass 140 self.allow_close = False 141 self.tab_deque = None 142 self.unicode_buffer = "" 143 self.line = string 144 self.history_search_pattern = self.line 145 self.pos = len(string) 146 if position is not None: 147 self.pos = min(self.pos, position) 148 self.history_backup.fast_forward() 149 self.history = History(self.history_backup) 150 self.history.add('') 151 self.wait_for_command_input = True 152 return True 153 154 def close(self, trigger_cancel_function=True): 155 if self.question_queue: 156 question = self.question_queue[0] 157 answers = question[2] 158 if len(answers) >= 2: 159 self._answer_question(answers[1]) 160 else: 161 self._close_command_prompt(trigger_cancel_function) 162 163 def _close_command_prompt(self, trigger_cancel_function=True): 164 if trigger_cancel_function: 165 cmd = self._get_cmd(quiet=True) 166 if cmd: 167 cmd.cancel() 168 if self.last_cursor_mode is not None: 169 try: 170 curses.curs_set(self.last_cursor_mode) 171 except curses.error: 172 pass 173 self.last_cursor_mode = None 174 self.fm.hide_console_info() 175 self.add_to_history() 176 self.tab_deque = None 177 self.clear() 178 self.__class__ = Console 179 self.wait_for_command_input = False 180 181 def clear(self): 182 self.pos = 0 183 self.line = '' 184 185 def press(self, key): 186 self.fm.ui.keymaps.use_keymap('console') 187 if not self.fm.ui.press(key): 188 self.type_key(key) 189 190 def _answer_question(self, answer): 191 if not self.question_queue: 192 return False 193 question = self.question_queue[0] 194 _, callback, answers = question 195 if answer in answers: 196 self.question_queue.pop(0) 197 callback(answer) 198 return True 199 return False 200 201 def type_key(self, key): 202 self.tab_deque = None 203 204 line = "" if self.question_queue else self.line 205 result = self._add_character(key, self.unicode_buffer, line, self.pos) 206 if result[1] == line: 207 # line didn't change, so we don't need to do anything, just update 208 # the unicode _buffer. 209 self.unicode_buffer = result[0] 210 return 211 212 if self.question_queue: 213 self.unicode_buffer, answer, _ = result 214 self._answer_question(answer) 215 else: 216 self.unicode_buffer, self.line, self.pos = result 217 self.on_line_change() 218 219 def _add_character(self, key, unicode_buffer, line, pos): 220 # Takes the pressed key, a string "unicode_buffer" containing a 221 # potentially incomplete unicode character, the current line and the 222 # position of the cursor inside the line. 223 # This function returns the new unicode buffer, the modified line and 224 # position. 225 if isinstance(key, int): 226 try: 227 key = chr(key) 228 except ValueError: 229 return unicode_buffer, line, pos 230 231 if self.fm.py3: 232 if len(unicode_buffer) >= 4: 233 unicode_buffer = "" 234 if ord(key) in range(0, 256): 235 unicode_buffer += key 236 try: 237 decoded = unicode_buffer.encode("latin-1").decode("utf-8") 238 except UnicodeDecodeError: 239 return unicode_buffer, line, pos 240 except UnicodeEncodeError: 241 return unicode_buffer, line, pos 242 else: 243 unicode_buffer = "" 244 if pos == len(line): 245 line += decoded 246 else: 247 line = line[:pos] + decoded + line[pos:] 248 pos += len(decoded) 249 else: 250 if pos == len(line): 251 line += key 252 else: 253 line = line[:pos] + key + line[pos:] 254 pos += len(key) 255 return unicode_buffer, line, pos 256 257 def history_move(self, n): 258 try: 259 current = self.history.current() 260 except HistoryEmptyException: 261 pass 262 else: 263 if self.line != current and self.line != self.history.top(): 264 self.history.modify(self.line) 265 if self.history_search_pattern: 266 self.history.search(self.history_search_pattern, n) 267 else: 268 self.history.move(n) 269 current = self.history.current() 270 if self.line != current: 271 self.line = self.history.current() 272 self.pos = len(self.line) 273 274 def add_to_history(self): 275 self.history_backup.fast_forward() 276 self.history_backup.add(self.line) 277 self.history = History(self.history_backup) 278 279 def move(self, **keywords): 280 direction = Direction(keywords) 281 if direction.horizontal(): 282 # Ensure that the pointer is moved utf-char-wise 283 if self.fm.py3: 284 if self.question_queue: 285 umax = len(self.question_queue[0][0]) + 1 - self.wid 286 else: 287 umax = len(self.line) + 1 288 self.pos = direction.move( 289 direction=direction.right(), 290 minimum=0, 291 maximum=umax, 292 current=self.pos) 293 else: 294 if self.question_queue: 295 uchar = list(self.question_queue[0][0].decode('utf-8', 'ignore')) 296 upos = len(self.question_queue[0][0][:self.pos].decode('utf-8', 'ignore')) 297 umax = len(uchar) + 1 - self.wid 298 else: 299 uchar = list(self.line.decode('utf-8', 'ignore')) 300 upos = len(self.line[:self.pos].decode('utf-8', 'ignore')) 301 umax = len(uchar) + 1 302 newupos = direction.move( 303 direction=direction.right(), 304 minimum=0, 305 maximum=umax, 306 current=upos) 307 self.pos = len(''.join(uchar[:newupos]).encode('utf-8', 'ignore')) 308 309 def move_word(self, **keywords): 310 direction = Direction(keywords) 311 if direction.horizontal(): 312 self.pos = self.move_by_word(self.line, self.pos, direction.right()) 313 self.on_line_change() 314 315 @staticmethod 316 def move_by_word(line, position, direction): 317 """ 318 Returns a new position by moving word-wise in the line 319 320 >>> import sys 321 >>> if sys.version_info < (3, ): 322 ... # Didn't get the unicode test to work on python2, even though 323 ... # it works fine in ranger, even with unicode input... 324 ... line = "ohai world, this is dog" 325 ... else: 326 ... line = "\\u30AA\\u30CF\\u30E8\\u30A6 world, this is dog" 327 >>> Console.move_by_word(line, 0, -1) 328 0 329 >>> Console.move_by_word(line, 0, 1) 330 5 331 >>> Console.move_by_word(line, 2, -1) 332 0 333 >>> Console.move_by_word(line, 2, 1) 334 5 335 >>> Console.move_by_word(line, 15, -2) 336 5 337 >>> Console.move_by_word(line, 15, 2) 338 21 339 >>> Console.move_by_word(line, 24, -1) 340 21 341 >>> Console.move_by_word(line, 24, 1) 342 24 343 """ 344 word_beginnings = [] 345 seen_whitespace = True 346 current_word = None 347 cursor_inside_word = False 348 349 # Scan the line for word boundaries and determine position of cursor 350 for i, char in enumerate(line): 351 if i == position: 352 current_word = len(word_beginnings) 353 if not seen_whitespace: 354 cursor_inside_word = True 355 if char == " ": 356 seen_whitespace = True 357 elif seen_whitespace: 358 seen_whitespace = False 359 word_beginnings.append(i) 360 word_beginnings.append(len(line)) 361 362 # Handle corner cases: 363 if current_word is None: 364 current_word = len(word_beginnings) 365 if direction > 0 and cursor_inside_word: 366 current_word -= 1 367 if direction < 0 and position == len(line): 368 current_word -= 1 369 370 new_word = current_word + direction 371 new_word = max(0, min(len(word_beginnings) - 1, new_word)) 372 373 return word_beginnings[new_word] 374 375 def delete_rest(self, direction): 376 self.tab_deque = None 377 if direction > 0: 378 self.copy = self.line[self.pos:] 379 self.line = self.line[:self.pos] 380 else: 381 self.copy = self.line[:self.pos] 382 self.line = self.line[self.pos:] 383 self.pos = 0 384 self.on_line_change() 385 386 def paste(self): 387 if self.pos == len(self.line): 388 self.line += self.copy 389 else: 390 self.line = self.line[:self.pos] + self.copy + self.line[self.pos:] 391 self.pos += len(self.copy) 392 self.on_line_change() 393 394 def delete_word(self, backward=True): 395 if self.line: 396 self.tab_deque = None 397 if backward: 398 right_part = self.line[self.pos:] 399 i = self.pos - 2 400 while i >= 0 and re.match( 401 r'[\w\d]', self.line[i], re.UNICODE): # pylint: disable=no-member 402 i -= 1 403 self.copy = self.line[i + 1:self.pos] 404 self.line = self.line[:i + 1] + right_part 405 self.pos = i + 1 406 else: 407 left_part = self.line[:self.pos] 408 i = self.pos + 1 409 while i < len(self.line) and re.match( 410 r'[\w\d]', self.line[i], re.UNICODE): # pylint: disable=no-member 411 i += 1 412 self.copy = self.line[self.pos:i] 413 if i >= len(self.line): 414 self.line = left_part 415 self.pos = len(self.line) 416 else: 417 self.line = left_part + self.line[i:] 418 self.pos = len(left_part) 419 self.on_line_change() 420 421 def delete(self, mod): 422 self.tab_deque = None 423 if mod == -1 and self.pos == 0: 424 if not self.line: 425 self.close(trigger_cancel_function=False) 426 return 427 # Delete utf-char-wise 428 if self.fm.py3: 429 left_part = self.line[:self.pos + mod] 430 self.pos = len(left_part) 431 self.line = left_part + self.line[self.pos + 1:] 432 else: 433 uchar = list(self.line.decode('utf-8', 'ignore')) 434 upos = len(self.line[:self.pos].decode('utf-8', 'ignore')) + mod 435 left_part = ''.join(uchar[:upos]).encode('utf-8', 'ignore') 436 self.pos = len(left_part) 437 self.line = left_part + ''.join(uchar[upos + 1:]).encode('utf-8', 'ignore') 438 self.on_line_change() 439 440 def execute(self, cmd=None): 441 if self.question_queue and cmd is None: 442 question = self.question_queue[0] 443 answers = question[2] 444 if len(answers) >= 1: 445 self._answer_question(answers[0]) 446 else: 447 self.question_queue.pop(0) 448 return 449 450 self.allow_close = True 451 if cmd: 452 cmd.execute() 453 else: 454 self.fm.execute_console(self.line) 455 456 if self.allow_close: 457 self._close_command_prompt(trigger_cancel_function=False) 458 459 def _get_cmd(self, quiet=False): 460 try: 461 command_class = self.get_cmd_class() 462 except IndexError: 463 return None 464 except KeyError: 465 if not quiet: 466 self.fm.notify("Command not found: `%s'" % self.line.split()[0], bad=True) 467 return None 468 return command_class(self.line) 469 470 def get_cmd_class(self): 471 return self.fm.commands.get_command(self.line.split()[0], abbrev=True) 472 473 def _get_tab(self, tabnum): 474 if ' ' in self.line: 475 cmd = self._get_cmd() 476 if cmd: 477 return cmd.tab(tabnum) 478 return None 479 480 return self.fm.commands.command_generator(self.line) 481 482 def tab(self, tabnum=1): 483 if self.tab_deque is None: 484 tab_result = self._get_tab(tabnum) 485 486 if tab_result is None: 487 pass 488 elif isinstance(tab_result, str): 489 self.line = tab_result 490 self.pos = len(tab_result) 491 self.on_line_change() 492 elif hasattr(tab_result, '__iter__'): 493 self.tab_deque = deque(tab_result) 494 self.tab_deque.appendleft(self.line) 495 496 if self.tab_deque is not None: 497 self.tab_deque.rotate(-tabnum) 498 self.line = self.tab_deque[0] 499 self.pos = len(self.line) 500 self.on_line_change() 501 502 def on_line_change(self): 503 self.history_search_pattern = self.line 504 try: 505 cls = self.get_cmd_class() 506 except (KeyError, ValueError, IndexError): 507 pass 508 else: 509 cmd = cls(self.line) 510 if cmd and cmd.quick(): 511 cmd.quickly_executed = True 512 self.execute(cmd) 513 514 def ask(self, text, callback, choices=None): 515 """Open a question prompt with predefined choices 516 517 The "text" is displayed as the question text and should include a list 518 of possible keys that the user can type. The "callback" is a function 519 that is called when the question is answered. It only gets the answer 520 as an argument. "choices" is a tuple of one-letter strings that can be 521 typed in by the user. Every other input gets ignored, except <Enter> 522 and <ESC>. 523 524 The first choice is used when the user presses <Enter>, the second 525 choice is used when the user presses <ESC>. 526 """ 527 self.question_queue.append( 528 (text, callback, choices if choices is not None else ['y', 'n'])) 529 530 531if __name__ == '__main__': 532 import doctest 533 import sys 534 sys.exit(doctest.testmod()[0]) 535