1#!/usr/local/bin/python3.8 2# -*- coding: utf-8 -*- 3 4""" 5Python 3 library for [rlwrap](https://github.com/hanslub42/rlwrap) filters 6 7* Synopsis 8 9 filter = rlwrapfilter.RlwrapFilter(message_handler=do_something) 10 filter.help_text = 'useful help' 11 filter.output_handler = lambda x: re.sub('apple', 'orange', x) # re−write output 12 filter.prompt_handler = munge_prompt 13 filter.completion_handler = complete_handler 14 filter.history_handler = lambda x: re.sub(r'(identified\s+by\s+)(\S+)', r'\\1xXxXxXxX', x) 15 filter.run() 16 17This is an [RlwrapFilter](https://github.com/hanslub42/rlwrap/wiki/RlwrapFilter.pm-manpage) 18clone written in Python 3. The focus is on providing the same API's 19and usage of the Perl version of [RlwrapFilter](https://github.com/hanslub42/rlwrap/wiki/RlwrapFilter.pm-manpage) 20as possible. 21 22[rlwrap](https://github.com/hanslub42/rlwrap) is a tiny 23utility that sits between the user and any console command, in order 24to bestow readline capabilities (line editing, history recall) to 25commands that don't have them. 26 27Since version 0.32, rlwrap can use filters to script almost every 28aspect of rlwrap's interaction with the user: changing the history, 29re-writing output and input, calling a pager or computing completion 30word lists from the current input. 31 32rlwrapfilter.py makes it very simple to write rlwrap 33filters in Python 3. A filter only needs to instantiate a RlwrapFilter 34object, change a few of its default handlers and then call its 'run' 35method. 36""" 37 38 39import sys 40import os 41import io 42import types 43import time 44import struct 45import select 46import re 47import traceback 48import binascii 49import collections 50import numbers 51 52TAG_INPUT = 0 53TAG_OUTPUT = 1 54TAG_HISTORY = 2 55TAG_COMPLETION = 3 56TAG_PROMPT = 4 57TAG_HOTKEY = 5 58TAG_SIGNAL = 6 59TAG_WHAT_ARE_YOUR_INTERESTS = 127 60TAG_IGNORE = 251 61TAG_ADD_TO_COMPLETION_LIST = 252 62TAG_REMOVE_FROM_COMPLETION_LIST = 253 63TAG_OUTPUT_OUT_OF_BAND = 254 64TAG_ERROR = 255 65 66 67REJECT_PROMPT = '_THIS_CANNOT_BE_A_PROMPT_' 68 69 70# we want to behave differently when running outside rlwrap 71we_are_running_under_rlwrap = 'RLWRAP_COMMAND_PID' in os.environ 72 73# rlwrap version 74rlwrap_version = float(os.environ.get('RLWRAP_VERSION', "0.41")) 75 76# open communication lines with rlwrap (or with the terminal when not running under rlwrap) 77if (we_are_running_under_rlwrap): 78 CMD_IN = int(os.environ['RLWRAP_MASTER_PTY_FD']) 79 CMD_OUT = int(os.environ['RLWRAP_MASTER_PTY_FD']) 80 81 FILTER_IN = int(os.environ['RLWRAP_INPUT_PIPE_FD']) 82 FILTER_OUT = int(os.environ['RLWRAP_OUTPUT_PIPE_FD']) 83else: 84 CMD_IN = sys.stdout.fileno() 85 CMD_OUT = sys.stdin.fileno() 86 87 FILTER_IN = sys.stdout.fileno() 88 FILTER_OUT = sys.stdin.fileno() 89 90 91 92def when_defined(maybe_ref_to_sub, *args): 93 """ 94` when_defined(f, x, y, ...) returns f(x, y, ...) if f is defined, x otherwise 95 """ 96 if (maybe_ref_to_sub is not None): 97 try: 98 return maybe_ref_to_sub(*args) 99 except Exception as e: 100 send_error( 101 "improper handler <{0}> of type {1} (expected a ref to a sub)\n{2}" 102 .format(maybe_ref_to_sub, type(maybe_ref_to_sub), traceback.format_exc()),e) 103 else: 104 return args[0] 105 106 107def out_of_band(tag): 108 return tag > 128 109 110 111def read_until(fh, stoptext, timeout, 112 prompt_search_from=0, prompt_search_to=None): 113 """ 114 read chunks from pty pointed to by fh until either inactive for timeout or stoptext is seen at end-of-chunk 115 """ 116 res = '' 117 while (True): 118 chunk = read_chunk(fh, timeout); 119 if(not chunk): 120 # got "" back: timeout 121 #send_warn("read_until: timeout") 122 return res 123 res = res + chunk 124 # multi-line mode so that "^" matches a head of each line 125 slice = res[prompt_search_from:prompt_search_to] 126 if re.search(stoptext, slice, re.MULTILINE): 127 return res 128 129 130def read_chunk(fh, timeout): 131 """ 132 read chunk from pty pointed to by fh with timeout if timed out, returns 0-length string 133 """ 134 if (len(select.select([fh], [], [], timeout)[0]) > 0): 135 chunk = os.read(fh, 2**16); # read up-to 2^16=65536 bytes 136 return chunk.decode(sys.stdin.encoding, errors="ignore") 137 return "" 138 139 140def read_patiently(fh, count): 141 """ 142 keep reading until count total bytes were read from filehandle fh 143 """ 144 already_read = 0 145 buf = bytearray() 146 while(already_read < count): 147 buf += os.read(fh, count-already_read) 148 nread = len(buf) 149 if (nread == 0): 150 break 151 already_read += nread 152 return buf 153 154 155def write_patiently(fh, buffer): 156 """ 157 keep writing until all bytes from $buffer were written to $fh 158 """ 159 already_written = 0 160 count = len(buffer) 161 while(already_written < count): 162 try: 163 nwritten = os.write(fh, buffer[already_written:]) 164 if (nwritten <= 0): 165 send_error("error writing: {0}".format(str(buffer))) 166 already_written = already_written + nwritten 167 except BrokenPipeError: # quit when rlwrap dies 168 sys.exit(1) 169 170def read_message(): 171 """ 172 read message (tag, length word and contents) from FILTER_IN 173 """ 174 if not we_are_running_under_rlwrap: 175 return read_from_stdin() 176 177 tag = int.from_bytes(read_patiently(FILTER_IN,1), sys.byteorder) 178 length = int.from_bytes(read_patiently(FILTER_IN,4), sys.byteorder) 179 message = read_patiently(FILTER_IN, length).decode(sys.stdin.encoding, errors = "ignore") 180 # \Z matches only at the end of the string in python 181 message = re.sub(r'\n\Z', '', str(message or "")) 182 return tag, message 183 184 185def write_message(tag, message): 186 if (not we_are_running_under_rlwrap): 187 return write_to_stdout(tag, message) 188 189 message = '\n' if message is None else message + '\n' # allow undefined message 190 bmessage = bytearray(message, sys.stdin.encoding) 191 length = len(bmessage) 192 193 write_patiently(FILTER_OUT, tag.to_bytes(1, sys.byteorder, signed=False)) 194 write_patiently(FILTER_OUT, length.to_bytes(4, sys.byteorder, signed=False)) 195 write_patiently(FILTER_OUT, bmessage) 196 197 198def read_from_stdin(): 199 tag = None 200 prompt = None 201 tagname = None 202 while (tag is None): 203 try: 204 m = re.match("(\S+) (.*?)\r?\n", sys.stdin.readline()) 205 except KeyboardInterrupt: 206 sys.exit() 207 if not m: 208 sys.exit() 209 tagname, message = m.groups() 210 message.replace("\\t","\t").replace("\\n","\n") 211 tag = name2tag(tagname) 212 return tag, message 213 214def name2tag(name): 215 """ 216 Convert a valid tag name like " TAG_PROMPT " to a tag (an integer) 217 """ 218 try: 219 tag = eval(name) 220 except Exception as e: 221 raise SystemExit('unknown tagname {0}'.format(name)) 222 return tag 223 224 225def tag2name(tag): 226 """ 227 Convert the tag (an integer) to its name (e.g. " TAG_PROMPT ") 228 """ 229 for name in ['TAG_REMOVE_FROM_COMPLETION_LIST', 230 'TAG_ADD_TO_COMPLETION_LIST', 231 'TAG_WHAT_ARE_YOUR_INTERESTS', 232 'TAG_INPUT', 233 'TAG_PROMPT', 234 'TAG_COMPLETION', 235 'TAG_HOTKEY', 236 'TAG_SIGNAL', 237 'TAG_HISTORY', 238 'TAG_OUTPUT_OUT_OF_BAND', 239 'TAG_ERROR', 240 'TAG_IGNORE', 241 'TAG_OUTPUT']: 242 if (eval('{0} == {1}'.format(str(tag), name))): 243 return name 244 245 246 247 248def write_to_stdout(tag, message): 249 print('{0}: {1}'.format(tag2name(tag), message)) 250 251 252def send_warn(message): 253 """ 254 send message to rlwrap. 255 """ 256 write_message(TAG_OUTPUT_OUT_OF_BAND, "{0}: {1}".format(__name__, message)) 257 258 259def send_error(message,e = None): 260 """ 261 send message to rlwrap, and raise e. 262 """ 263 write_message(TAG_OUTPUT_OUT_OF_BAND if e else TAG_ERROR, "{0}: {1}".format(__name__, message)) 264 if e: 265 raise e 266 else: 267 time.sleep(2) # timeout doesn't matter much because rlwrap will kill us anyway 268 exit() 269 270def intercept_error(func): 271 """ 272 A decorator to intercept an exception, send the message to rlwrap, and raise an exception. 273 """ 274 def wrapper(*args, **kwargs): 275 try: 276 return func(*args, **kwargs) 277 except Exception as e: 278 write_message( 279 TAG_ERROR, 280 "{0}: {1}".format(__name__, '/'.join(map(str,e.args))) 281 ) 282 raise e 283 return wrapper 284 285 286 287def intercept_error_with_message(message=None): 288 """ 289 A decorator (-factory) to intercept an exception, send the message 290 to rlwrap, print a message and exit (or re-raise the exception, if 291 message = None) N.B: decorators, hence also <message> are evaluated 292 at compile time. @intercept_error_with_message (f"This script 293 crashed after {sec} seconds") doesn't make sense. 294 """ 295 def intercept_error_closure(func): 296 def wrapper(*args, **kwargs): 297 try: 298 return func(*args, **kwargs) 299 except Exception as e: 300 complete_message = "{0}: {1}".format(__name__, '/'.join(map(str,e.args))) if message == None else "{0}\n{1}".format(message, e) 301 write_message(TAG_ERROR, complete_message) 302 if message: 303 exit() 304 else: 305 raise e 306 return wrapper 307 return intercept_error_closure 308 309def is_string(value): 310 return isinstance(value, str) or value == None 311 312 313def is_boolean(value): 314 return isinstance(value, bool) or value == None 315 316 317def is_integer(value): 318 return isinstance(value, int) or value == None 319 320 321def is_float(value): 322 return isinstance(value, numbers.Number) or value == None 323 324 325def is_callable(value): 326 return isinstance(value, collections.Callable) or value == None 327 328 329@intercept_error 330def test_intercept(): 331 print('test intercept!!! + + + +') 332 raise Exception('test exception = = = = = .........') 333 334 335DIGIT_NUMBER=8 336 337def split_rlwrap_message(message): 338 bmessage = bytes(message, sys.stdin.encoding) 339 fields = [] 340 341 while(len(bmessage) != 0): 342 blen = bmessage[:DIGIT_NUMBER] 343 bmessage = bmessage[DIGIT_NUMBER:] 344 length = int(str(blen, sys.stdin.encoding), base=16) 345 bfield = bmessage[:length] 346 bmessage = bmessage[length:] 347 fields.append(str(bfield, sys.stdin.encoding)) 348 return fields 349 350 351def merge_fields(fields): 352 message = "" 353 354 for field in fields: 355 length = len(bytes(field, sys.stdin.encoding)) 356 lenstr = format(length, '0' + str(DIGIT_NUMBER) + 'x') 357 message = message + lenstr + field 358 return message 359 360 361class RlwrapFilterError(Exception): 362 """ 363 A custom exception for rlwrap 364 """ 365 def __init__(self, *args): 366 super().__init__(args) 367 368 369class RlwrapFilter: 370 """ 371 class for rlwrap filters 372 """ 373 374 def __setattr__(self, name, value): 375 if not name in self._fields: 376 self.warn("There is no '{0}' attribute in class {1}\n".format(name, self.__class__.__name__)) 377 378 is_valid_type = self._field_types[name] 379 if not is_valid_type(value): 380 self.warn("{0} should not be '{1}'\n".format(name, type(value))) 381 382 if name == 'minimal_rlwrap_version' and (value > rlwrap_version): 383 self.error("requires rlwrap version {0} or newer.\n".format(str(value))) 384 self.__dict__[name] = value 385 386 387 """ 388 def __getattr__(self, name): 389 if(name in self.fields): 390 return self.__dict__[name] 391 elif(name in self.handlers): 392 return self.__dict__[name] 393 else: 394 send_error("There is no '{0}' attribute in class {1}" 395 .format(name, self.__class__.__name__)) 396 """ 397 398 399 def __init__(self, **kwargs): 400 self.__dict__['_field_types'] = { 401 'input_handler':is_callable, 402 'output_handler':is_callable, 403 'prompt_handler':is_callable, 404 'hotkey_handler':is_callable, 405 'signal_handler':is_callable, 406 'echo_handler':is_callable, 407 'message_handler':is_callable, 408 'history_handler':is_callable, 409 'completion_handler':is_callable, 410 'help_text':is_string, 411 'cloak_and_dagger_verbose':is_boolean, 412 'cumulative_output':is_string, 413 'prompts_are_never_empty':is_boolean, 414 'previous_tag':is_integer, 415 'previous_message':is_string, 416 'echo_has_been_handled':is_boolean, 417 'saved_output':is_string, 418 'prompt_rejected':is_string, 419 'command_line':is_string, 420 'running_under_rlwrap':is_boolean, 421 'minimal_rlwrap_version':is_float, 422 'name':is_string 423 } 424 425 self.__dict__['_fields'] = self.__dict__['_field_types'].keys() 426 427 for field in self._fields: 428 self.__dict__[field] = None 429 430 self.previous_tag = -1 431 self.echo_has_been_handled = False 432 self.saved_output = '' 433 self.cumulative_output = '' 434 self.minimal_rlwrap_version = rlwrap_version 435 self.command_line = os.environ.get('RLWRAP_COMMAND_LINE') 436 self.running_under_rlwrap = 'RLWRAP_COMMAND_PID' in os.environ 437 self.name = os.path.basename(sys.argv[0]) 438 439 for key in kwargs: 440 exec('self.{0} = kwargs[key]'.format(key)) 441 442 443 444 def handle_output(self, message): 445 """ 446 split output in echo and the rest and call the appropriate handlers on them 447 """ 448 (handled_echo, nl) = ('', '') 449 if (self.previous_tag is not None and self.previous_tag == TAG_INPUT): 450 self.cumulative_output = "" 451 self.echo_has_been_handled = False 452 453 if (not self.echo_has_been_handled): 454 if (not re.search(r'\n', message)): 455 # save all output until we have one *whole* echo line 456 self.saved_output = self.saved_output + message 457 return "" 458 else: 459 # ... then process it 460 message = self.saved_output + message 461 self.echo_has_been_handled = True 462 self.saved_output = "" 463 (echo, nl, message) = re.match(r'^([^\n\r]*)(\r?\n)?(.*)?', message, re.DOTALL).groups() 464 handled_echo = when_defined(self.echo_handler, echo) 465 self.cumulative_output = self.cumulative_output + message 466 return handled_echo + str(nl or "") + str(when_defined(self.output_handler, message)) 467 468 469 def add_to_completion_list(self, *args): 470 write_message(TAG_ADD_TO_COMPLETION_LIST, ' '.join(args)) 471 472 473 def remove_from_completion_list(self, *args): 474 write_message(TAG_REMOVE_FROM_COMPLETION_LIST, ' '.join(args)) 475 476 477 def cloak_and_dagger(self, question, prompt, timeout, 478 prompt_search_from=0, prompt_search_to=None): 479 """ 480 have a private chat with the rlwrapped command. This relies very much om the assumption that command stops. 481 talking, and only listens, when it has displayed the prompt 482 """ 483 write_patiently(CMD_IN, bytearray(question + "\n", sys.stdin.encoding)) 484 if (self.cloak_and_dagger_verbose): 485 self.send_output_oob("cloak_and_dagger question: {0}\n".format(question)) 486 response = read_until(CMD_OUT, prompt, timeout, 487 prompt_search_from=prompt_search_from, 488 prompt_search_to=prompt_search_to) 489 response = re.sub('^.*?\n', '', response) # chop off echoed question; 490 response = re.sub('{0}$'.format(prompt), '', response) # chop off prompt; 491 if (self.cloak_and_dagger_verbose): 492 self.send_output_oob("cloak_and_dagger response: {0}\n".format(response)) 493 return response 494 495 496 def vacuum_stale_message(self, prompt, timeout): 497 """ 498 Some command returns messages asynchronously 499 and tends to delay message when invoking multiple `cloak_and_dagger`. 500 You may want to drop message at such time. 501 502 rlwrap_filter.cloak_and_dagger(command1, prompt, timeout) 503 rlwrap_filter.cloak_and_dagger(command2, prompt, timeout) 504 ... 505 time.sleep(1) 506 rlwrap_filter.vacuum_stale_message(prompt, timeout) 507 """ 508 response = read_until(CMD_OUT, prompt, timeout) 509 return response 510 511 512 def add_interests(self, message): 513 interested = list(message) 514 tag2handler = {TAG_OUTPUT : self.output_handler or self.echo_handler, # echo is the first OUTPUT after an INPUT 515 TAG_INPUT : self.input_handler or self.echo_handler, # so to determine what is ECHO we need to see INPUT... 516 TAG_HISTORY : self.history_handler, 517 TAG_COMPLETION : self.completion_handler, 518 TAG_PROMPT : self.prompt_handler, 519 TAG_HOTKEY : self.hotkey_handler, 520 TAG_SIGNAL : self.signal_handler} 521 522 for tag in range(0, len(message)): 523 if interested[tag] == 'y': 524 continue # a preceding filter in the pipeline has already shown interest 525 if tag2handler[tag] is not None: 526 interested[tag] = 'y' 527 return ''.join(interested) 528 529 def name2tag(self, name): 530 """ 531 Convert a valid tag name like " TAG_PROMPT " to a tag (an integer) 532 """ 533 return name2tag(name) 534 535 def tag2name(self, tag): 536 """ 537 Convert the tag (an integer) to its name (e.g. " TAG_PROMPT ") 538 """ 539 return tag2name(tag) 540 541 def warn(self, message): 542 """ 543 send message to rlwrap. 544 """ 545 send_warn(message) 546 547 548 def error(self, message,e = None): 549 """ 550 send message to rlwrap, and raise e. 551 """ 552 send_error(message, e) 553 554 555 def send_output_oob(self, text): 556 write_message(TAG_OUTPUT_OUT_OF_BAND, text) 557 558 559 def send_ignore_oob(self, text): 560 write_message(TAG_IGNORE, text) 561 562 563 def tweak_readline_oob(self, rl_tweak, *args): 564 nargs = {'rl_variable_bind' : 2, 565 'rl_completer_word_break_characters' : 1, 566 'rl_completer_quote_characters' : 1, 567 'rl_filename_completion_desired' : 1} 568 # the above list can be extended in future versions 569 if rl_tweak not in nargs: 570 self.error("tweak_readline_oob() called with unknown/unimplemented readline function '{}'".format(rl_tweak)) 571 if len(args) != nargs[rl_tweak]: 572 self.error("tweak_readline_oob({},...) should be called with exactly {} args".format(rl_tweak, nargs[rl_tweak] + 1)) 573 self.send_ignore_oob("@" + "::".join((rl_tweak,) + args + ("\n",))) 574 575 576 def cwd(self): 577 return os.getcwd() 578 579 def run(self): 580 """ 581 event loop 582 """ 583 584 # $RLWRAP_COMMAND_PID can be undefined (e.g. when run interactively, or under rlwrap -z listing 585 # or == "0" (when rlwrap is called without a command name, like in rlwrap -z filter.py) 586 # In both cases: print help text 587 if os.environ.get('RLWRAP_COMMAND_PID') in [None, '0']: 588 write_message(TAG_OUTPUT_OUT_OF_BAND, self.help_text + '\n') 589 while(True): 590 tag, message = read_message() 591 592 message = when_defined(self.message_handler, message, tag) # ignore return value 593 594 if (tag == TAG_INPUT): 595 response = when_defined(self.input_handler, message) 596 elif (tag == TAG_OUTPUT): 597 response = self.handle_output(message) 598 elif (tag == TAG_HISTORY): 599 response = when_defined(self.history_handler, message) 600 elif (tag == TAG_COMPLETION): 601 if (self.completion_handler is not None): 602 params = split_rlwrap_message(message) 603 (line, prefix, completions) = (params[0], params[1], params[2:]) 604 completions = self.completion_handler(line, prefix, completions) 605 response = merge_fields([line, prefix] + completions) 606 else: 607 response = message 608 elif (tag == TAG_HOTKEY): 609 if (self.hotkey_handler is not None): 610 params = split_rlwrap_message(message) 611 result = self.hotkey_handler(*params) 612 response = merge_fields(result) 613 else: 614 response = message 615 elif (tag == TAG_PROMPT): 616 if (message == REJECT_PROMPT or 617 (self.prompts_are_never_empty is not None and message == '')): 618 write_message(tag,REJECT_PROMPT); 619 # don't update <previous_tag> and don't reset <cumulative_input> 620 next 621 if (os.environ.get('RLWRAP_IMPATIENT') and not re.search('\n$', self.cumulative_output)): 622 # cumulative output contains prompt: chop it off! 623 # s/[^\n]*$// takes way too long on big strings, 624 # what is the optimal regex to do this? 625 self.cumulative_output = re.sub('(?<![^\n])[^\n]*$', '', self.cumulative_output) 626 627 response = when_defined(self.prompt_handler, message) 628 if (re.search('\n', response)): 629 send_error('prompts may not contain newlines!') 630 elif (tag == TAG_SIGNAL): 631 response = when_defined(self.signal_handler, message) 632 elif (tag == TAG_WHAT_ARE_YOUR_INTERESTS): 633 response = self.add_interests(message) 634 else: 635 # No error message, compatible with future rlwrap 636 # versions that may define new tag types 637 response = message 638 639 if (not (out_of_band(tag) and (tag == TAG_PROMPT and response == REJECT_PROMPT))): 640 self.previous_tag = tag 641 self.previous_message = message 642 643 write_message(tag, response) 644 645 646 647 648if __name__ == '__main__': 649 import doctest 650 doctest.testmod() 651