1# -*- test-case-name: twisted.conch.test.test_manhole -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Line-input oriented interactive interpreter loop. 7 8Provides classes for handling Python source input and arbitrary output 9interactively from a Twisted application. Also included is syntax coloring 10code with support for VT102 terminals, control code handling (^C, ^D, ^Q), 11and reasonable handling of Deferreds. 12 13@author: Jp Calderone 14""" 15 16import code 17import sys 18import tokenize 19from io import BytesIO 20 21from twisted.conch import recvline 22from twisted.internet import defer 23from twisted.python.compat import _get_async_param 24from twisted.python.htmlizer import TokenPrinter 25 26 27class FileWrapper: 28 """ 29 Minimal write-file-like object. 30 31 Writes are translated into addOutput calls on an object passed to 32 __init__. Newlines are also converted from network to local style. 33 """ 34 35 softspace = 0 36 state = "normal" 37 38 def __init__(self, o): 39 self.o = o 40 41 def flush(self): 42 pass 43 44 def write(self, data): 45 self.o.addOutput(data.replace("\r\n", "\n")) 46 47 def writelines(self, lines): 48 self.write("".join(lines)) 49 50 51class ManholeInterpreter(code.InteractiveInterpreter): 52 """ 53 Interactive Interpreter with special output and Deferred support. 54 55 Aside from the features provided by L{code.InteractiveInterpreter}, this 56 class captures sys.stdout output and redirects it to the appropriate 57 location (the Manhole protocol instance). It also treats Deferreds 58 which reach the top-level specially: each is formatted to the user with 59 a unique identifier and a new callback and errback added to it, each of 60 which will format the unique identifier and the result with which the 61 Deferred fires and then pass it on to the next participant in the 62 callback chain. 63 """ 64 65 numDeferreds = 0 66 67 def __init__(self, handler, locals=None, filename="<console>"): 68 code.InteractiveInterpreter.__init__(self, locals) 69 self._pendingDeferreds = {} 70 self.handler = handler 71 self.filename = filename 72 self.resetBuffer() 73 74 def resetBuffer(self): 75 """ 76 Reset the input buffer. 77 """ 78 self.buffer = [] 79 80 def push(self, line): 81 """ 82 Push a line to the interpreter. 83 84 The line should not have a trailing newline; it may have 85 internal newlines. The line is appended to a buffer and the 86 interpreter's runsource() method is called with the 87 concatenated contents of the buffer as source. If this 88 indicates that the command was executed or invalid, the buffer 89 is reset; otherwise, the command is incomplete, and the buffer 90 is left as it was after the line was appended. The return 91 value is 1 if more input is required, 0 if the line was dealt 92 with in some way (this is the same as runsource()). 93 94 @param line: line of text 95 @type line: L{bytes} 96 @return: L{bool} from L{code.InteractiveInterpreter.runsource} 97 """ 98 self.buffer.append(line) 99 source = b"\n".join(self.buffer) 100 source = source.decode("utf-8") 101 more = self.runsource(source, self.filename) 102 if not more: 103 self.resetBuffer() 104 return more 105 106 def runcode(self, *a, **kw): 107 orighook, sys.displayhook = sys.displayhook, self.displayhook 108 try: 109 origout, sys.stdout = sys.stdout, FileWrapper(self.handler) 110 try: 111 code.InteractiveInterpreter.runcode(self, *a, **kw) 112 finally: 113 sys.stdout = origout 114 finally: 115 sys.displayhook = orighook 116 117 def displayhook(self, obj): 118 self.locals["_"] = obj 119 if isinstance(obj, defer.Deferred): 120 # XXX Ick, where is my "hasFired()" interface? 121 if hasattr(obj, "result"): 122 self.write(repr(obj)) 123 elif id(obj) in self._pendingDeferreds: 124 self.write("<Deferred #%d>" % (self._pendingDeferreds[id(obj)][0],)) 125 else: 126 d = self._pendingDeferreds 127 k = self.numDeferreds 128 d[id(obj)] = (k, obj) 129 self.numDeferreds += 1 130 obj.addCallbacks( 131 self._cbDisplayDeferred, 132 self._ebDisplayDeferred, 133 callbackArgs=(k, obj), 134 errbackArgs=(k, obj), 135 ) 136 self.write("<Deferred #%d>" % (k,)) 137 elif obj is not None: 138 self.write(repr(obj)) 139 140 def _cbDisplayDeferred(self, result, k, obj): 141 self.write("Deferred #%d called back: %r" % (k, result), True) 142 del self._pendingDeferreds[id(obj)] 143 return result 144 145 def _ebDisplayDeferred(self, failure, k, obj): 146 self.write("Deferred #%d failed: %r" % (k, failure.getErrorMessage()), True) 147 del self._pendingDeferreds[id(obj)] 148 return failure 149 150 def write(self, data, isAsync=None, **kwargs): 151 isAsync = _get_async_param(isAsync, **kwargs) 152 self.handler.addOutput(data, isAsync) 153 154 155CTRL_C = b"\x03" 156CTRL_D = b"\x04" 157CTRL_BACKSLASH = b"\x1c" 158CTRL_L = b"\x0c" 159CTRL_A = b"\x01" 160CTRL_E = b"\x05" 161 162 163class Manhole(recvline.HistoricRecvLine): 164 r""" 165 Mediator between a fancy line source and an interactive interpreter. 166 167 This accepts lines from its transport and passes them on to a 168 L{ManholeInterpreter}. Control commands (^C, ^D, ^\) are also handled 169 with something approximating their normal terminal-mode behavior. It 170 can optionally be constructed with a dict which will be used as the 171 local namespace for any code executed. 172 """ 173 174 namespace = None 175 176 def __init__(self, namespace=None): 177 recvline.HistoricRecvLine.__init__(self) 178 if namespace is not None: 179 self.namespace = namespace.copy() 180 181 def connectionMade(self): 182 recvline.HistoricRecvLine.connectionMade(self) 183 self.interpreter = ManholeInterpreter(self, self.namespace) 184 self.keyHandlers[CTRL_C] = self.handle_INT 185 self.keyHandlers[CTRL_D] = self.handle_EOF 186 self.keyHandlers[CTRL_L] = self.handle_FF 187 self.keyHandlers[CTRL_A] = self.handle_HOME 188 self.keyHandlers[CTRL_E] = self.handle_END 189 self.keyHandlers[CTRL_BACKSLASH] = self.handle_QUIT 190 191 def handle_INT(self): 192 """ 193 Handle ^C as an interrupt keystroke by resetting the current input 194 variables to their initial state. 195 """ 196 self.pn = 0 197 self.lineBuffer = [] 198 self.lineBufferIndex = 0 199 self.interpreter.resetBuffer() 200 201 self.terminal.nextLine() 202 self.terminal.write(b"KeyboardInterrupt") 203 self.terminal.nextLine() 204 self.terminal.write(self.ps[self.pn]) 205 206 def handle_EOF(self): 207 if self.lineBuffer: 208 self.terminal.write(b"\a") 209 else: 210 self.handle_QUIT() 211 212 def handle_FF(self): 213 """ 214 Handle a 'form feed' byte - generally used to request a screen 215 refresh/redraw. 216 """ 217 self.terminal.eraseDisplay() 218 self.terminal.cursorHome() 219 self.drawInputLine() 220 221 def handle_QUIT(self): 222 self.terminal.loseConnection() 223 224 def _needsNewline(self): 225 w = self.terminal.lastWrite 226 return not w.endswith(b"\n") and not w.endswith(b"\x1bE") 227 228 def addOutput(self, data, isAsync=None, **kwargs): 229 isAsync = _get_async_param(isAsync, **kwargs) 230 if isAsync: 231 self.terminal.eraseLine() 232 self.terminal.cursorBackward(len(self.lineBuffer) + len(self.ps[self.pn])) 233 234 self.terminal.write(data) 235 236 if isAsync: 237 if self._needsNewline(): 238 self.terminal.nextLine() 239 240 self.terminal.write(self.ps[self.pn]) 241 242 if self.lineBuffer: 243 oldBuffer = self.lineBuffer 244 self.lineBuffer = [] 245 self.lineBufferIndex = 0 246 247 self._deliverBuffer(oldBuffer) 248 249 def lineReceived(self, line): 250 more = self.interpreter.push(line) 251 self.pn = bool(more) 252 if self._needsNewline(): 253 self.terminal.nextLine() 254 self.terminal.write(self.ps[self.pn]) 255 256 257class VT102Writer: 258 """ 259 Colorizer for Python tokens. 260 261 A series of tokens are written to instances of this object. Each is 262 colored in a particular way. The final line of the result of this is 263 generally added to the output. 264 """ 265 266 typeToColor = { 267 "identifier": b"\x1b[31m", 268 "keyword": b"\x1b[32m", 269 "parameter": b"\x1b[33m", 270 "variable": b"\x1b[1;33m", 271 "string": b"\x1b[35m", 272 "number": b"\x1b[36m", 273 "op": b"\x1b[37m", 274 } 275 276 normalColor = b"\x1b[0m" 277 278 def __init__(self): 279 self.written = [] 280 281 def color(self, type): 282 r = self.typeToColor.get(type, b"") 283 return r 284 285 def write(self, token, type=None): 286 if token and token != b"\r": 287 c = self.color(type) 288 if c: 289 self.written.append(c) 290 self.written.append(token) 291 if c: 292 self.written.append(self.normalColor) 293 294 def __bytes__(self): 295 s = b"".join(self.written) 296 return s.strip(b"\n").splitlines()[-1] 297 298 if bytes == str: 299 # Compat with Python 2.7 300 __str__ = __bytes__ 301 302 303def lastColorizedLine(source): 304 """ 305 Tokenize and colorize the given Python source. 306 307 Returns a VT102-format colorized version of the last line of C{source}. 308 309 @param source: Python source code 310 @type source: L{str} or L{bytes} 311 @return: L{bytes} of colorized source 312 """ 313 if not isinstance(source, bytes): 314 source = source.encode("utf-8") 315 w = VT102Writer() 316 p = TokenPrinter(w.write).printtoken 317 s = BytesIO(source) 318 319 for token in tokenize.tokenize(s.readline): 320 (tokenType, string, start, end, line) = token 321 p(tokenType, string, start, end, line) 322 323 return bytes(w) 324 325 326class ColoredManhole(Manhole): 327 """ 328 A REPL which syntax colors input as users type it. 329 """ 330 331 def getSource(self): 332 """ 333 Return a string containing the currently entered source. 334 335 This is only the code which will be considered for execution 336 next. 337 """ 338 return b"\n".join(self.interpreter.buffer) + b"\n" + b"".join(self.lineBuffer) 339 340 def characterReceived(self, ch, moreCharactersComing): 341 if self.mode == "insert": 342 self.lineBuffer.insert(self.lineBufferIndex, ch) 343 else: 344 self.lineBuffer[self.lineBufferIndex : self.lineBufferIndex + 1] = [ch] 345 self.lineBufferIndex += 1 346 347 if moreCharactersComing: 348 # Skip it all, we'll get called with another character in 349 # like 2 femtoseconds. 350 return 351 352 if ch == b" ": 353 # Don't bother to try to color whitespace 354 self.terminal.write(ch) 355 return 356 357 source = self.getSource() 358 359 # Try to write some junk 360 try: 361 coloredLine = lastColorizedLine(source) 362 except tokenize.TokenError: 363 # We couldn't do it. Strange. Oh well, just add the character. 364 self.terminal.write(ch) 365 else: 366 # Success! Clear the source on this line. 367 self.terminal.eraseLine() 368 self.terminal.cursorBackward( 369 len(self.lineBuffer) + len(self.ps[self.pn]) - 1 370 ) 371 372 # And write a new, colorized one. 373 self.terminal.write(self.ps[self.pn] + coloredLine) 374 375 # And move the cursor to where it belongs 376 n = len(self.lineBuffer) - self.lineBufferIndex 377 if n: 378 self.terminal.cursorBackward(n) 379