1# -*- test-case-name: twisted.conch.test.test_manhole -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Line-input oriented interactive interpreter loop.
7
8Provides classes for handling Python source input and arbitrary output
9interactively from a Twisted application.  Also included is syntax coloring
10code with support for VT102 terminals, control code handling (^C, ^D, ^Q),
11and reasonable handling of Deferreds.
12
13@author: Jp Calderone
14"""
15
16import code
17import sys
18import tokenize
19from io import BytesIO
20
21from twisted.conch import recvline
22from twisted.internet import defer
23from twisted.python.compat import _get_async_param
24from twisted.python.htmlizer import TokenPrinter
25
26
27class FileWrapper:
28    """
29    Minimal write-file-like object.
30
31    Writes are translated into addOutput calls on an object passed to
32    __init__.  Newlines are also converted from network to local style.
33    """
34
35    softspace = 0
36    state = "normal"
37
38    def __init__(self, o):
39        self.o = o
40
41    def flush(self):
42        pass
43
44    def write(self, data):
45        self.o.addOutput(data.replace("\r\n", "\n"))
46
47    def writelines(self, lines):
48        self.write("".join(lines))
49
50
51class ManholeInterpreter(code.InteractiveInterpreter):
52    """
53    Interactive Interpreter with special output and Deferred support.
54
55    Aside from the features provided by L{code.InteractiveInterpreter}, this
56    class captures sys.stdout output and redirects it to the appropriate
57    location (the Manhole protocol instance).  It also treats Deferreds
58    which reach the top-level specially: each is formatted to the user with
59    a unique identifier and a new callback and errback added to it, each of
60    which will format the unique identifier and the result with which the
61    Deferred fires and then pass it on to the next participant in the
62    callback chain.
63    """
64
65    numDeferreds = 0
66
67    def __init__(self, handler, locals=None, filename="<console>"):
68        code.InteractiveInterpreter.__init__(self, locals)
69        self._pendingDeferreds = {}
70        self.handler = handler
71        self.filename = filename
72        self.resetBuffer()
73
74    def resetBuffer(self):
75        """
76        Reset the input buffer.
77        """
78        self.buffer = []
79
80    def push(self, line):
81        """
82        Push a line to the interpreter.
83
84        The line should not have a trailing newline; it may have
85        internal newlines.  The line is appended to a buffer and the
86        interpreter's runsource() method is called with the
87        concatenated contents of the buffer as source.  If this
88        indicates that the command was executed or invalid, the buffer
89        is reset; otherwise, the command is incomplete, and the buffer
90        is left as it was after the line was appended.  The return
91        value is 1 if more input is required, 0 if the line was dealt
92        with in some way (this is the same as runsource()).
93
94        @param line: line of text
95        @type line: L{bytes}
96        @return: L{bool} from L{code.InteractiveInterpreter.runsource}
97        """
98        self.buffer.append(line)
99        source = b"\n".join(self.buffer)
100        source = source.decode("utf-8")
101        more = self.runsource(source, self.filename)
102        if not more:
103            self.resetBuffer()
104        return more
105
106    def runcode(self, *a, **kw):
107        orighook, sys.displayhook = sys.displayhook, self.displayhook
108        try:
109            origout, sys.stdout = sys.stdout, FileWrapper(self.handler)
110            try:
111                code.InteractiveInterpreter.runcode(self, *a, **kw)
112            finally:
113                sys.stdout = origout
114        finally:
115            sys.displayhook = orighook
116
117    def displayhook(self, obj):
118        self.locals["_"] = obj
119        if isinstance(obj, defer.Deferred):
120            # XXX Ick, where is my "hasFired()" interface?
121            if hasattr(obj, "result"):
122                self.write(repr(obj))
123            elif id(obj) in self._pendingDeferreds:
124                self.write("<Deferred #%d>" % (self._pendingDeferreds[id(obj)][0],))
125            else:
126                d = self._pendingDeferreds
127                k = self.numDeferreds
128                d[id(obj)] = (k, obj)
129                self.numDeferreds += 1
130                obj.addCallbacks(
131                    self._cbDisplayDeferred,
132                    self._ebDisplayDeferred,
133                    callbackArgs=(k, obj),
134                    errbackArgs=(k, obj),
135                )
136                self.write("<Deferred #%d>" % (k,))
137        elif obj is not None:
138            self.write(repr(obj))
139
140    def _cbDisplayDeferred(self, result, k, obj):
141        self.write("Deferred #%d called back: %r" % (k, result), True)
142        del self._pendingDeferreds[id(obj)]
143        return result
144
145    def _ebDisplayDeferred(self, failure, k, obj):
146        self.write("Deferred #%d failed: %r" % (k, failure.getErrorMessage()), True)
147        del self._pendingDeferreds[id(obj)]
148        return failure
149
150    def write(self, data, isAsync=None, **kwargs):
151        isAsync = _get_async_param(isAsync, **kwargs)
152        self.handler.addOutput(data, isAsync)
153
154
155CTRL_C = b"\x03"
156CTRL_D = b"\x04"
157CTRL_BACKSLASH = b"\x1c"
158CTRL_L = b"\x0c"
159CTRL_A = b"\x01"
160CTRL_E = b"\x05"
161
162
163class Manhole(recvline.HistoricRecvLine):
164    r"""
165    Mediator between a fancy line source and an interactive interpreter.
166
167    This accepts lines from its transport and passes them on to a
168    L{ManholeInterpreter}.  Control commands (^C, ^D, ^\) are also handled
169    with something approximating their normal terminal-mode behavior.  It
170    can optionally be constructed with a dict which will be used as the
171    local namespace for any code executed.
172    """
173
174    namespace = None
175
176    def __init__(self, namespace=None):
177        recvline.HistoricRecvLine.__init__(self)
178        if namespace is not None:
179            self.namespace = namespace.copy()
180
181    def connectionMade(self):
182        recvline.HistoricRecvLine.connectionMade(self)
183        self.interpreter = ManholeInterpreter(self, self.namespace)
184        self.keyHandlers[CTRL_C] = self.handle_INT
185        self.keyHandlers[CTRL_D] = self.handle_EOF
186        self.keyHandlers[CTRL_L] = self.handle_FF
187        self.keyHandlers[CTRL_A] = self.handle_HOME
188        self.keyHandlers[CTRL_E] = self.handle_END
189        self.keyHandlers[CTRL_BACKSLASH] = self.handle_QUIT
190
191    def handle_INT(self):
192        """
193        Handle ^C as an interrupt keystroke by resetting the current input
194        variables to their initial state.
195        """
196        self.pn = 0
197        self.lineBuffer = []
198        self.lineBufferIndex = 0
199        self.interpreter.resetBuffer()
200
201        self.terminal.nextLine()
202        self.terminal.write(b"KeyboardInterrupt")
203        self.terminal.nextLine()
204        self.terminal.write(self.ps[self.pn])
205
206    def handle_EOF(self):
207        if self.lineBuffer:
208            self.terminal.write(b"\a")
209        else:
210            self.handle_QUIT()
211
212    def handle_FF(self):
213        """
214        Handle a 'form feed' byte - generally used to request a screen
215        refresh/redraw.
216        """
217        self.terminal.eraseDisplay()
218        self.terminal.cursorHome()
219        self.drawInputLine()
220
221    def handle_QUIT(self):
222        self.terminal.loseConnection()
223
224    def _needsNewline(self):
225        w = self.terminal.lastWrite
226        return not w.endswith(b"\n") and not w.endswith(b"\x1bE")
227
228    def addOutput(self, data, isAsync=None, **kwargs):
229        isAsync = _get_async_param(isAsync, **kwargs)
230        if isAsync:
231            self.terminal.eraseLine()
232            self.terminal.cursorBackward(len(self.lineBuffer) + len(self.ps[self.pn]))
233
234        self.terminal.write(data)
235
236        if isAsync:
237            if self._needsNewline():
238                self.terminal.nextLine()
239
240            self.terminal.write(self.ps[self.pn])
241
242            if self.lineBuffer:
243                oldBuffer = self.lineBuffer
244                self.lineBuffer = []
245                self.lineBufferIndex = 0
246
247                self._deliverBuffer(oldBuffer)
248
249    def lineReceived(self, line):
250        more = self.interpreter.push(line)
251        self.pn = bool(more)
252        if self._needsNewline():
253            self.terminal.nextLine()
254        self.terminal.write(self.ps[self.pn])
255
256
257class VT102Writer:
258    """
259    Colorizer for Python tokens.
260
261    A series of tokens are written to instances of this object.  Each is
262    colored in a particular way.  The final line of the result of this is
263    generally added to the output.
264    """
265
266    typeToColor = {
267        "identifier": b"\x1b[31m",
268        "keyword": b"\x1b[32m",
269        "parameter": b"\x1b[33m",
270        "variable": b"\x1b[1;33m",
271        "string": b"\x1b[35m",
272        "number": b"\x1b[36m",
273        "op": b"\x1b[37m",
274    }
275
276    normalColor = b"\x1b[0m"
277
278    def __init__(self):
279        self.written = []
280
281    def color(self, type):
282        r = self.typeToColor.get(type, b"")
283        return r
284
285    def write(self, token, type=None):
286        if token and token != b"\r":
287            c = self.color(type)
288            if c:
289                self.written.append(c)
290            self.written.append(token)
291            if c:
292                self.written.append(self.normalColor)
293
294    def __bytes__(self):
295        s = b"".join(self.written)
296        return s.strip(b"\n").splitlines()[-1]
297
298    if bytes == str:
299        # Compat with Python 2.7
300        __str__ = __bytes__
301
302
303def lastColorizedLine(source):
304    """
305    Tokenize and colorize the given Python source.
306
307    Returns a VT102-format colorized version of the last line of C{source}.
308
309    @param source: Python source code
310    @type source: L{str} or L{bytes}
311    @return: L{bytes} of colorized source
312    """
313    if not isinstance(source, bytes):
314        source = source.encode("utf-8")
315    w = VT102Writer()
316    p = TokenPrinter(w.write).printtoken
317    s = BytesIO(source)
318
319    for token in tokenize.tokenize(s.readline):
320        (tokenType, string, start, end, line) = token
321        p(tokenType, string, start, end, line)
322
323    return bytes(w)
324
325
326class ColoredManhole(Manhole):
327    """
328    A REPL which syntax colors input as users type it.
329    """
330
331    def getSource(self):
332        """
333        Return a string containing the currently entered source.
334
335        This is only the code which will be considered for execution
336        next.
337        """
338        return b"\n".join(self.interpreter.buffer) + b"\n" + b"".join(self.lineBuffer)
339
340    def characterReceived(self, ch, moreCharactersComing):
341        if self.mode == "insert":
342            self.lineBuffer.insert(self.lineBufferIndex, ch)
343        else:
344            self.lineBuffer[self.lineBufferIndex : self.lineBufferIndex + 1] = [ch]
345        self.lineBufferIndex += 1
346
347        if moreCharactersComing:
348            # Skip it all, we'll get called with another character in
349            # like 2 femtoseconds.
350            return
351
352        if ch == b" ":
353            # Don't bother to try to color whitespace
354            self.terminal.write(ch)
355            return
356
357        source = self.getSource()
358
359        # Try to write some junk
360        try:
361            coloredLine = lastColorizedLine(source)
362        except tokenize.TokenError:
363            # We couldn't do it.  Strange.  Oh well, just add the character.
364            self.terminal.write(ch)
365        else:
366            # Success!  Clear the source on this line.
367            self.terminal.eraseLine()
368            self.terminal.cursorBackward(
369                len(self.lineBuffer) + len(self.ps[self.pn]) - 1
370            )
371
372            # And write a new, colorized one.
373            self.terminal.write(self.ps[self.pn] + coloredLine)
374
375            # And move the cursor to where it belongs
376            n = len(self.lineBuffer) - self.lineBufferIndex
377            if n:
378                self.terminal.cursorBackward(n)
379