1#!/usr/local/bin/python3.8
2# -*- coding: utf-8 -*-
3
4"""
5Python 3 library for [rlwrap](https://github.com/hanslub42/rlwrap) filters
6
7* Synopsis
8
9    filter = rlwrapfilter.RlwrapFilter(message_handler=do_something)
10    filter.help_text = 'useful help'
11    filter.output_handler = lambda x: re.sub('apple', 'orange', x) # re−write output
12    filter.prompt_handler = munge_prompt
13    filter.completion_handler = complete_handler
14    filter.history_handler = lambda x: re.sub(r'(identified\s+by\s+)(\S+)', r'\\1xXxXxXxX', x)
15    filter.run()
16
17This is an [RlwrapFilter](https://github.com/hanslub42/rlwrap/wiki/RlwrapFilter.pm-manpage)
18clone written in Python 3. The focus is on providing the same API's
19and usage of the Perl version of [RlwrapFilter](https://github.com/hanslub42/rlwrap/wiki/RlwrapFilter.pm-manpage)
20as possible.
21
22[rlwrap](https://github.com/hanslub42/rlwrap) is a tiny
23utility that sits between the user and any console command, in order
24to bestow readline capabilities (line editing, history recall) to
25commands that don't have them.
26
27Since version 0.32, rlwrap can use filters to script almost every
28aspect of rlwrap's interaction with the user: changing the history,
29re-writing output and input, calling a pager or computing completion
30word lists from the current input.
31
32rlwrapfilter.py makes it very simple to write rlwrap
33filters in Python 3. A filter only needs to instantiate a RlwrapFilter
34object, change a few of its default handlers and then call its 'run'
35method.
36"""
37
38
39import sys
40import os
41import io
42import types
43import time
44import struct
45import select
46import re
47import traceback
48import binascii
49import collections
50import numbers
51
52TAG_INPUT                       = 0
53TAG_OUTPUT                      = 1
54TAG_HISTORY                     = 2
55TAG_COMPLETION                  = 3
56TAG_PROMPT                      = 4
57TAG_HOTKEY                      = 5
58TAG_SIGNAL                      = 6
59TAG_WHAT_ARE_YOUR_INTERESTS     = 127
60TAG_IGNORE                      = 251
61TAG_ADD_TO_COMPLETION_LIST      = 252
62TAG_REMOVE_FROM_COMPLETION_LIST = 253
63TAG_OUTPUT_OUT_OF_BAND          = 254
64TAG_ERROR                       = 255
65
66
67REJECT_PROMPT = '_THIS_CANNOT_BE_A_PROMPT_'
68
69
70# we want to behave differently when running outside rlwrap
71we_are_running_under_rlwrap = 'RLWRAP_COMMAND_PID' in os.environ
72
73# rlwrap version
74rlwrap_version = float(os.environ.get('RLWRAP_VERSION', "0.41"))
75
76# open communication lines with rlwrap (or with the terminal when not running under rlwrap)
77if (we_are_running_under_rlwrap):
78    CMD_IN = int(os.environ['RLWRAP_MASTER_PTY_FD'])
79    CMD_OUT = int(os.environ['RLWRAP_MASTER_PTY_FD'])
80
81    FILTER_IN = int(os.environ['RLWRAP_INPUT_PIPE_FD'])
82    FILTER_OUT = int(os.environ['RLWRAP_OUTPUT_PIPE_FD'])
83else:
84    CMD_IN = sys.stdout.fileno()
85    CMD_OUT = sys.stdin.fileno()
86
87    FILTER_IN = sys.stdout.fileno()
88    FILTER_OUT = sys.stdin.fileno()
89
90
91
92def when_defined(maybe_ref_to_sub, *args):
93    """
94`    when_defined(f, x, y, ...) returns f(x, y, ...) if f is defined, x otherwise
95    """
96    if (maybe_ref_to_sub is not None):
97        try:
98            return maybe_ref_to_sub(*args)
99        except Exception as e:
100            send_error(
101                "improper handler <{0}> of type {1} (expected a ref to a sub)\n{2}"
102                .format(maybe_ref_to_sub, type(maybe_ref_to_sub), traceback.format_exc()),e)
103    else:
104        return args[0]
105
106
107def out_of_band(tag):
108    return tag > 128
109
110
111def read_until(fh, stoptext, timeout,
112               prompt_search_from=0, prompt_search_to=None):
113    """
114    read chunks from pty pointed to by fh until either inactive for timeout or stoptext is seen at end-of-chunk
115    """
116    res = ''
117    while (True):
118        chunk = read_chunk(fh, timeout);
119        if(not chunk):
120            # got "" back: timeout
121            #send_warn("read_until: timeout")
122            return res
123        res = res + chunk
124        # multi-line mode so that "^" matches a head of each line
125        slice = res[prompt_search_from:prompt_search_to]
126        if re.search(stoptext, slice, re.MULTILINE):
127            return res
128
129
130def read_chunk(fh, timeout):
131    """
132    read chunk from pty pointed to by fh with timeout if timed out, returns 0-length string
133    """
134    if (len(select.select([fh], [], [], timeout)[0]) > 0):
135        chunk = os.read(fh, 2**16); # read up-to 2^16=65536 bytes
136        return chunk.decode(sys.stdin.encoding, errors="ignore")
137    return ""
138
139
140def read_patiently(fh, count):
141    """
142    keep reading until count total bytes were read from filehandle fh
143    """
144    already_read = 0
145    buf = bytearray()
146    while(already_read < count):
147        buf += os.read(fh, count-already_read)
148        nread = len(buf)
149        if (nread == 0):
150            break
151        already_read += nread
152    return buf
153
154
155def write_patiently(fh, buffer):
156    """
157    keep writing until all bytes from $buffer were written to $fh
158    """
159    already_written = 0
160    count = len(buffer)
161    while(already_written < count):
162        try:
163            nwritten = os.write(fh, buffer[already_written:])
164            if (nwritten <= 0):
165                send_error("error writing: {0}".format(str(buffer)))
166            already_written = already_written + nwritten
167        except BrokenPipeError: # quit when rlwrap dies
168            sys.exit(1)
169
170def read_message():
171    """
172    read message (tag, length word and contents) from FILTER_IN
173    """
174    if not we_are_running_under_rlwrap:
175        return read_from_stdin()
176
177    tag = int.from_bytes(read_patiently(FILTER_IN,1), sys.byteorder)
178    length = int.from_bytes(read_patiently(FILTER_IN,4), sys.byteorder)
179    message = read_patiently(FILTER_IN, length).decode(sys.stdin.encoding, errors = "ignore")
180    # \Z matches only at the end of the string in python
181    message = re.sub(r'\n\Z', '', str(message or ""))
182    return tag, message
183
184
185def write_message(tag, message):
186    if (not we_are_running_under_rlwrap):
187        return write_to_stdout(tag, message)
188
189    message = '\n' if message is None else message + '\n'  # allow undefined message
190    bmessage = bytearray(message, sys.stdin.encoding)
191    length = len(bmessage)
192
193    write_patiently(FILTER_OUT, tag.to_bytes(1, sys.byteorder, signed=False))
194    write_patiently(FILTER_OUT, length.to_bytes(4, sys.byteorder, signed=False))
195    write_patiently(FILTER_OUT, bmessage)
196
197
198def read_from_stdin():
199    tag = None
200    prompt = None
201    tagname = None
202    while (tag is None):
203        try:
204            m = re.match("(\S+) (.*?)\r?\n", sys.stdin.readline())
205        except KeyboardInterrupt:
206            sys.exit()
207        if not m:
208            sys.exit()
209        tagname, message = m.groups()
210        message.replace("\\t","\t").replace("\\n","\n")
211        tag = name2tag(tagname)
212    return tag, message
213
214def name2tag(name):
215        """
216        Convert a valid tag name like " TAG_PROMPT " to a tag (an integer)
217        """
218        try:
219            tag = eval(name)
220        except Exception as e:
221            raise SystemExit('unknown tagname {0}'.format(name))
222        return tag
223
224
225def tag2name(tag):
226    """
227    Convert the tag (an integer) to its name (e.g. " TAG_PROMPT ")
228    """
229    for name in ['TAG_REMOVE_FROM_COMPLETION_LIST',
230                 'TAG_ADD_TO_COMPLETION_LIST',
231                 'TAG_WHAT_ARE_YOUR_INTERESTS',
232                 'TAG_INPUT',
233                 'TAG_PROMPT',
234                 'TAG_COMPLETION',
235                 'TAG_HOTKEY',
236                 'TAG_SIGNAL',
237                 'TAG_HISTORY',
238                 'TAG_OUTPUT_OUT_OF_BAND',
239                 'TAG_ERROR',
240                 'TAG_IGNORE',
241                 'TAG_OUTPUT']:
242        if (eval('{0} == {1}'.format(str(tag), name))):
243            return name
244
245
246
247
248def write_to_stdout(tag, message):
249    print('{0}: {1}'.format(tag2name(tag), message))
250
251
252def send_warn(message):
253    """
254    send message to rlwrap.
255    """
256    write_message(TAG_OUTPUT_OUT_OF_BAND, "{0}: {1}".format(__name__, message))
257
258
259def send_error(message,e = None):
260    """
261    send message to rlwrap, and raise e.
262    """
263    write_message(TAG_OUTPUT_OUT_OF_BAND if e else TAG_ERROR, "{0}: {1}".format(__name__, message))
264    if e:
265       raise e
266    else:
267       time.sleep(2) # timeout doesn't matter much because rlwrap will kill us anyway
268       exit()
269
270def intercept_error(func):
271    """
272    A decorator to intercept an exception, send the message to rlwrap, and raise an exception.
273    """
274    def wrapper(*args, **kwargs):
275        try:
276            return func(*args, **kwargs)
277        except Exception as e:
278            write_message(
279                TAG_ERROR,
280                "{0}: {1}".format(__name__, '/'.join(map(str,e.args)))
281            )
282            raise e
283    return wrapper
284
285
286
287def intercept_error_with_message(message=None):
288    """
289    A decorator (-factory) to intercept an exception, send the message
290    to rlwrap, print a message and exit (or re-raise the exception, if
291    message = None) N.B: decorators, hence also <message> are evaluated
292    at compile time. @intercept_error_with_message (f"This script
293    crashed after {sec} seconds") doesn't make sense.
294    """
295    def intercept_error_closure(func):
296        def wrapper(*args, **kwargs):
297            try:
298                return func(*args, **kwargs)
299            except Exception as e:
300                complete_message = "{0}: {1}".format(__name__, '/'.join(map(str,e.args))) if message == None else "{0}\n{1}".format(message, e)
301                write_message(TAG_ERROR, complete_message)
302                if message:
303                    exit()
304                else:
305                    raise e
306        return wrapper
307    return intercept_error_closure
308
309def is_string(value):
310    return isinstance(value, str) or value == None
311
312
313def is_boolean(value):
314    return isinstance(value, bool) or value == None
315
316
317def is_integer(value):
318    return isinstance(value, int) or value == None
319
320
321def is_float(value):
322    return isinstance(value, numbers.Number) or value == None
323
324
325def is_callable(value):
326    return isinstance(value, collections.Callable) or value == None
327
328
329@intercept_error
330def test_intercept():
331    print('test intercept!!! + + + +')
332    raise Exception('test exception = = = = = .........')
333
334
335DIGIT_NUMBER=8
336
337def split_rlwrap_message(message):
338    bmessage = bytes(message, sys.stdin.encoding)
339    fields = []
340
341    while(len(bmessage) != 0):
342        blen = bmessage[:DIGIT_NUMBER]
343        bmessage = bmessage[DIGIT_NUMBER:]
344        length = int(str(blen, sys.stdin.encoding), base=16)
345        bfield = bmessage[:length]
346        bmessage = bmessage[length:]
347        fields.append(str(bfield, sys.stdin.encoding))
348    return fields
349
350
351def merge_fields(fields):
352    message = ""
353
354    for field in fields:
355        length = len(bytes(field, sys.stdin.encoding))
356        lenstr = format(length, '0' + str(DIGIT_NUMBER) + 'x')
357        message = message + lenstr + field
358    return message
359
360
361class RlwrapFilterError(Exception):
362    """
363    A custom exception for rlwrap
364    """
365    def __init__(self, *args):
366        super().__init__(args)
367
368
369class RlwrapFilter:
370    """
371    class for rlwrap filters
372    """
373
374    def __setattr__(self, name, value):
375        if not name in self._fields:
376            self.warn("There is no '{0}' attribute in class {1}\n".format(name, self.__class__.__name__))
377
378        is_valid_type = self._field_types[name]
379        if not is_valid_type(value):
380            self.warn("{0} should not be '{1}'\n".format(name, type(value)))
381
382        if name == 'minimal_rlwrap_version' and (value > rlwrap_version):
383            self.error("requires rlwrap version {0} or newer.\n".format(str(value)))
384        self.__dict__[name] = value
385
386
387    """
388    def __getattr__(self, name):
389        if(name in self.fields):
390            return self.__dict__[name]
391        elif(name in self.handlers):
392            return self.__dict__[name]
393        else:
394            send_error("There is no '{0}' attribute in class {1}"
395                       .format(name, self.__class__.__name__))
396    """
397
398
399    def __init__(self, **kwargs):
400        self.__dict__['_field_types'] = {
401            'input_handler':is_callable,
402            'output_handler':is_callable,
403            'prompt_handler':is_callable,
404            'hotkey_handler':is_callable,
405            'signal_handler':is_callable,
406            'echo_handler':is_callable,
407            'message_handler':is_callable,
408            'history_handler':is_callable,
409            'completion_handler':is_callable,
410            'help_text':is_string,
411            'cloak_and_dagger_verbose':is_boolean,
412            'cumulative_output':is_string,
413            'prompts_are_never_empty':is_boolean,
414            'previous_tag':is_integer,
415            'previous_message':is_string,
416            'echo_has_been_handled':is_boolean,
417            'saved_output':is_string,
418            'prompt_rejected':is_string,
419            'command_line':is_string,
420            'running_under_rlwrap':is_boolean,
421            'minimal_rlwrap_version':is_float,
422            'name':is_string
423        }
424
425        self.__dict__['_fields'] = self.__dict__['_field_types'].keys()
426
427        for field in self._fields:
428            self.__dict__[field] = None
429
430        self.previous_tag = -1
431        self.echo_has_been_handled = False
432        self.saved_output = ''
433        self.cumulative_output = ''
434        self.minimal_rlwrap_version = rlwrap_version
435        self.command_line = os.environ.get('RLWRAP_COMMAND_LINE')
436        self.running_under_rlwrap = 'RLWRAP_COMMAND_PID' in os.environ
437        self.name = os.path.basename(sys.argv[0])
438
439        for key in kwargs:
440            exec('self.{0} = kwargs[key]'.format(key))
441
442
443
444    def handle_output(self, message):
445        """
446        split output in echo and the rest and call the appropriate handlers on them
447        """
448        (handled_echo, nl) = ('', '')
449        if (self.previous_tag is not None and self.previous_tag == TAG_INPUT):
450            self.cumulative_output = ""
451            self.echo_has_been_handled = False
452
453        if (not self.echo_has_been_handled):
454            if (not re.search(r'\n', message)):
455                # save all output until we have one *whole* echo line
456                self.saved_output = self.saved_output + message
457                return ""
458            else:
459                # ... then process it
460                message = self.saved_output + message
461                self.echo_has_been_handled = True
462                self.saved_output = ""
463                (echo, nl, message) = re.match(r'^([^\n\r]*)(\r?\n)?(.*)?', message, re.DOTALL).groups()
464                handled_echo = when_defined(self.echo_handler, echo)
465        self.cumulative_output = self.cumulative_output + message
466        return handled_echo + str(nl or "") + str(when_defined(self.output_handler, message))
467
468
469    def add_to_completion_list(self, *args):
470        write_message(TAG_ADD_TO_COMPLETION_LIST, ' '.join(args))
471
472
473    def remove_from_completion_list(self, *args):
474        write_message(TAG_REMOVE_FROM_COMPLETION_LIST, ' '.join(args))
475
476
477    def cloak_and_dagger(self, question, prompt, timeout,
478                         prompt_search_from=0, prompt_search_to=None):
479        """
480        have a private chat with the rlwrapped command. This relies very much om the assumption that command stops.
481        talking, and only listens, when it has displayed the prompt
482        """
483        write_patiently(CMD_IN, bytearray(question + "\n", sys.stdin.encoding))
484        if (self.cloak_and_dagger_verbose):
485            self.send_output_oob("cloak_and_dagger question: {0}\n".format(question))
486        response = read_until(CMD_OUT, prompt, timeout,
487                              prompt_search_from=prompt_search_from,
488                              prompt_search_to=prompt_search_to)
489        response = re.sub('^.*?\n', '', response) # chop off echoed question;
490        response = re.sub('{0}$'.format(prompt), '', response) # chop off prompt;
491        if (self.cloak_and_dagger_verbose):
492            self.send_output_oob("cloak_and_dagger response: {0}\n".format(response))
493        return response
494
495
496    def vacuum_stale_message(self, prompt, timeout):
497        """
498        Some command returns messages asynchronously
499        and tends to delay message when invoking multiple `cloak_and_dagger`.
500        You may want to drop message at such time.
501
502        rlwrap_filter.cloak_and_dagger(command1, prompt, timeout)
503        rlwrap_filter.cloak_and_dagger(command2, prompt, timeout)
504        ...
505        time.sleep(1)
506        rlwrap_filter.vacuum_stale_message(prompt, timeout)
507        """
508        response = read_until(CMD_OUT, prompt, timeout)
509        return response
510
511
512    def add_interests(self, message):
513        interested = list(message)
514        tag2handler = {TAG_OUTPUT      : self.output_handler or self.echo_handler, # echo is the first OUTPUT after an INPUT
515                       TAG_INPUT       : self.input_handler or self.echo_handler,  # so to determine what is ECHO we need to see INPUT...
516                       TAG_HISTORY     : self.history_handler,
517                       TAG_COMPLETION  : self.completion_handler,
518                       TAG_PROMPT      : self.prompt_handler,
519                       TAG_HOTKEY      : self.hotkey_handler,
520                       TAG_SIGNAL      : self.signal_handler}
521
522        for tag in range(0, len(message)):
523            if interested[tag] == 'y':
524                continue   # a preceding filter in the pipeline has already shown interest
525            if tag2handler[tag] is not None:
526                interested[tag] = 'y'
527        return ''.join(interested)
528
529    def name2tag(self, name):
530        """
531        Convert a valid tag name like " TAG_PROMPT " to a tag (an integer)
532        """
533        return name2tag(name)
534
535    def tag2name(self, tag):
536        """
537        Convert the tag (an integer) to its name (e.g. " TAG_PROMPT ")
538        """
539        return tag2name(tag)
540
541    def warn(self, message):
542        """
543        send message to rlwrap.
544        """
545        send_warn(message)
546
547
548    def error(self, message,e = None):
549        """
550        send message to rlwrap, and raise e.
551        """
552        send_error(message, e)
553
554
555    def send_output_oob(self, text):
556        write_message(TAG_OUTPUT_OUT_OF_BAND, text)
557
558
559    def send_ignore_oob(self, text):
560        write_message(TAG_IGNORE, text)
561
562
563    def tweak_readline_oob(self, rl_tweak, *args):
564        nargs = {'rl_variable_bind'                   : 2,
565                 'rl_completer_word_break_characters' : 1,
566                 'rl_completer_quote_characters'      : 1,
567                 'rl_filename_completion_desired'     : 1}
568                 # the above list can be extended in future versions
569        if rl_tweak not in nargs:
570            self.error("tweak_readline_oob() called with unknown/unimplemented readline function '{}'".format(rl_tweak))
571        if len(args) !=  nargs[rl_tweak]:
572            self.error("tweak_readline_oob({},...) should be called with exactly {} args".format(rl_tweak, nargs[rl_tweak] + 1))
573        self.send_ignore_oob("@" + "::".join((rl_tweak,) + args + ("\n",)))
574
575
576    def cwd(self):
577        return os.getcwd()
578
579    def run(self):
580        """
581        event loop
582        """
583
584        # $RLWRAP_COMMAND_PID can be undefined (e.g. when run interactively, or under rlwrap -z listing
585        # or == "0" (when rlwrap is called without a command name, like in rlwrap -z filter.py)
586        # In both cases: print help text
587        if os.environ.get('RLWRAP_COMMAND_PID') in [None, '0']:
588            write_message(TAG_OUTPUT_OUT_OF_BAND, self.help_text + '\n')
589        while(True):
590            tag, message = read_message()
591
592            message = when_defined(self.message_handler, message, tag) # ignore return value
593
594            if (tag == TAG_INPUT):
595                response = when_defined(self.input_handler, message)
596            elif (tag == TAG_OUTPUT):
597                response = self.handle_output(message)
598            elif (tag == TAG_HISTORY):
599                response = when_defined(self.history_handler, message)
600            elif (tag == TAG_COMPLETION):
601                if (self.completion_handler is not None):
602                    params = split_rlwrap_message(message)
603                    (line, prefix, completions) = (params[0], params[1], params[2:])
604                    completions = self.completion_handler(line, prefix, completions)
605                    response = merge_fields([line, prefix] + completions)
606                else:
607                    response = message
608            elif (tag == TAG_HOTKEY):
609                if (self.hotkey_handler is not None):
610                    params = split_rlwrap_message(message)
611                    result = self.hotkey_handler(*params)
612                    response = merge_fields(result)
613                else:
614                    response = message
615            elif (tag == TAG_PROMPT):
616                if (message == REJECT_PROMPT or
617                    (self.prompts_are_never_empty is not None and message == '')):
618                    write_message(tag,REJECT_PROMPT);
619                    # don't update <previous_tag> and don't reset <cumulative_input>
620                    next
621                if (os.environ.get('RLWRAP_IMPATIENT') and not re.search('\n$', self.cumulative_output)):
622                    # cumulative output contains prompt: chop it off!
623                    # s/[^\n]*$// takes way too long on big strings,
624                    # what is the optimal regex to do this?
625                    self.cumulative_output = re.sub('(?<![^\n])[^\n]*$', '', self.cumulative_output)
626
627                response = when_defined(self.prompt_handler, message)
628                if (re.search('\n', response)):
629                    send_error('prompts may not contain newlines!')
630            elif (tag == TAG_SIGNAL):
631                response = when_defined(self.signal_handler, message)
632            elif (tag == TAG_WHAT_ARE_YOUR_INTERESTS):
633                response = self.add_interests(message)
634            else:
635                # No error message, compatible with future rlwrap
636                # versions that may define new tag types
637                response = message
638
639            if (not (out_of_band(tag) and (tag == TAG_PROMPT and response == REJECT_PROMPT))):
640                self.previous_tag = tag
641                self.previous_message = message
642
643            write_message(tag, response)
644
645
646
647
648if __name__ == '__main__':
649    import doctest
650    doctest.testmod()
651