1# -*- test-case-name: twisted.conch.test.test_recvline -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Basic line editing support. 7 8@author: Jp Calderone 9""" 10 11import string 12from typing import Dict 13 14from zope.interface import implementer 15 16from twisted.conch.insults import helper, insults 17from twisted.logger import Logger 18from twisted.python import reflect 19from twisted.python.compat import iterbytes 20 21_counters: Dict[str, int] = {} 22 23 24class Logging: 25 """ 26 Wrapper which logs attribute lookups. 27 28 This was useful in debugging something, I guess. I forget what. 29 It can probably be deleted or moved somewhere more appropriate. 30 Nothing special going on here, really. 31 """ 32 33 def __init__(self, original): 34 self.original = original 35 key = reflect.qual(original.__class__) 36 count = _counters.get(key, 0) 37 _counters[key] = count + 1 38 self._logFile = open(key + "-" + str(count), "w") 39 40 def __str__(self) -> str: 41 return str(super().__getattribute__("original")) 42 43 def __repr__(self) -> str: 44 return repr(super().__getattribute__("original")) 45 46 def __getattribute__(self, name): 47 original = super().__getattribute__("original") 48 logFile = super().__getattribute__("_logFile") 49 logFile.write(name + "\n") 50 return getattr(original, name) 51 52 53@implementer(insults.ITerminalTransport) 54class TransportSequence: 55 """ 56 An L{ITerminalTransport} implementation which forwards calls to 57 one or more other L{ITerminalTransport}s. 58 59 This is a cheap way for servers to keep track of the state they 60 expect the client to see, since all terminal manipulations can be 61 send to the real client and to a terminal emulator that lives in 62 the server process. 63 """ 64 65 for keyID in ( 66 b"UP_ARROW", 67 b"DOWN_ARROW", 68 b"RIGHT_ARROW", 69 b"LEFT_ARROW", 70 b"HOME", 71 b"INSERT", 72 b"DELETE", 73 b"END", 74 b"PGUP", 75 b"PGDN", 76 b"F1", 77 b"F2", 78 b"F3", 79 b"F4", 80 b"F5", 81 b"F6", 82 b"F7", 83 b"F8", 84 b"F9", 85 b"F10", 86 b"F11", 87 b"F12", 88 ): 89 execBytes = keyID + b" = object()" 90 execStr = execBytes.decode("ascii") 91 exec(execStr) 92 93 TAB = b"\t" 94 BACKSPACE = b"\x7f" 95 96 def __init__(self, *transports): 97 assert transports, "Cannot construct a TransportSequence with no transports" 98 self.transports = transports 99 100 for method in insults.ITerminalTransport: 101 exec( 102 """\ 103def %s(self, *a, **kw): 104 for tpt in self.transports: 105 result = tpt.%s(*a, **kw) 106 return result 107""" 108 % (method, method) 109 ) 110 111 def getHost(self): 112 # ITransport.getHost 113 raise NotImplementedError("Unimplemented: TransportSequence.getHost") 114 115 def getPeer(self): 116 # ITransport.getPeer 117 raise NotImplementedError("Unimplemented: TransportSequence.getPeer") 118 119 def loseConnection(self): 120 # ITransport.loseConnection 121 raise NotImplementedError("Unimplemented: TransportSequence.loseConnection") 122 123 def write(self, data): 124 # ITransport.write 125 raise NotImplementedError("Unimplemented: TransportSequence.write") 126 127 def writeSequence(self, data): 128 # ITransport.writeSequence 129 raise NotImplementedError("Unimplemented: TransportSequence.writeSequence") 130 131 def cursorUp(self, n=1): 132 # ITerminalTransport.cursorUp 133 raise NotImplementedError("Unimplemented: TransportSequence.cursorUp") 134 135 def cursorDown(self, n=1): 136 # ITerminalTransport.cursorDown 137 raise NotImplementedError("Unimplemented: TransportSequence.cursorDown") 138 139 def cursorForward(self, n=1): 140 # ITerminalTransport.cursorForward 141 raise NotImplementedError("Unimplemented: TransportSequence.cursorForward") 142 143 def cursorBackward(self, n=1): 144 # ITerminalTransport.cursorBackward 145 raise NotImplementedError("Unimplemented: TransportSequence.cursorBackward") 146 147 def cursorPosition(self, column, line): 148 # ITerminalTransport.cursorPosition 149 raise NotImplementedError("Unimplemented: TransportSequence.cursorPosition") 150 151 def cursorHome(self): 152 # ITerminalTransport.cursorHome 153 raise NotImplementedError("Unimplemented: TransportSequence.cursorHome") 154 155 def index(self): 156 # ITerminalTransport.index 157 raise NotImplementedError("Unimplemented: TransportSequence.index") 158 159 def reverseIndex(self): 160 # ITerminalTransport.reverseIndex 161 raise NotImplementedError("Unimplemented: TransportSequence.reverseIndex") 162 163 def nextLine(self): 164 # ITerminalTransport.nextLine 165 raise NotImplementedError("Unimplemented: TransportSequence.nextLine") 166 167 def saveCursor(self): 168 # ITerminalTransport.saveCursor 169 raise NotImplementedError("Unimplemented: TransportSequence.saveCursor") 170 171 def restoreCursor(self): 172 # ITerminalTransport.restoreCursor 173 raise NotImplementedError("Unimplemented: TransportSequence.restoreCursor") 174 175 def setModes(self, modes): 176 # ITerminalTransport.setModes 177 raise NotImplementedError("Unimplemented: TransportSequence.setModes") 178 179 def resetModes(self, mode): 180 # ITerminalTransport.resetModes 181 raise NotImplementedError("Unimplemented: TransportSequence.resetModes") 182 183 def setPrivateModes(self, modes): 184 # ITerminalTransport.setPrivateModes 185 raise NotImplementedError("Unimplemented: TransportSequence.setPrivateModes") 186 187 def resetPrivateModes(self, modes): 188 # ITerminalTransport.resetPrivateModes 189 raise NotImplementedError("Unimplemented: TransportSequence.resetPrivateModes") 190 191 def applicationKeypadMode(self): 192 # ITerminalTransport.applicationKeypadMode 193 raise NotImplementedError( 194 "Unimplemented: TransportSequence.applicationKeypadMode" 195 ) 196 197 def numericKeypadMode(self): 198 # ITerminalTransport.numericKeypadMode 199 raise NotImplementedError("Unimplemented: TransportSequence.numericKeypadMode") 200 201 def selectCharacterSet(self, charSet, which): 202 # ITerminalTransport.selectCharacterSet 203 raise NotImplementedError("Unimplemented: TransportSequence.selectCharacterSet") 204 205 def shiftIn(self): 206 # ITerminalTransport.shiftIn 207 raise NotImplementedError("Unimplemented: TransportSequence.shiftIn") 208 209 def shiftOut(self): 210 # ITerminalTransport.shiftOut 211 raise NotImplementedError("Unimplemented: TransportSequence.shiftOut") 212 213 def singleShift2(self): 214 # ITerminalTransport.singleShift2 215 raise NotImplementedError("Unimplemented: TransportSequence.singleShift2") 216 217 def singleShift3(self): 218 # ITerminalTransport.singleShift3 219 raise NotImplementedError("Unimplemented: TransportSequence.singleShift3") 220 221 def selectGraphicRendition(self, *attributes): 222 # ITerminalTransport.selectGraphicRendition 223 raise NotImplementedError( 224 "Unimplemented: TransportSequence.selectGraphicRendition" 225 ) 226 227 def horizontalTabulationSet(self): 228 # ITerminalTransport.horizontalTabulationSet 229 raise NotImplementedError( 230 "Unimplemented: TransportSequence.horizontalTabulationSet" 231 ) 232 233 def tabulationClear(self): 234 # ITerminalTransport.tabulationClear 235 raise NotImplementedError("Unimplemented: TransportSequence.tabulationClear") 236 237 def tabulationClearAll(self): 238 # ITerminalTransport.tabulationClearAll 239 raise NotImplementedError("Unimplemented: TransportSequence.tabulationClearAll") 240 241 def doubleHeightLine(self, top=True): 242 # ITerminalTransport.doubleHeightLine 243 raise NotImplementedError("Unimplemented: TransportSequence.doubleHeightLine") 244 245 def singleWidthLine(self): 246 # ITerminalTransport.singleWidthLine 247 raise NotImplementedError("Unimplemented: TransportSequence.singleWidthLine") 248 249 def doubleWidthLine(self): 250 # ITerminalTransport.doubleWidthLine 251 raise NotImplementedError("Unimplemented: TransportSequence.doubleWidthLine") 252 253 def eraseToLineEnd(self): 254 # ITerminalTransport.eraseToLineEnd 255 raise NotImplementedError("Unimplemented: TransportSequence.eraseToLineEnd") 256 257 def eraseToLineBeginning(self): 258 # ITerminalTransport.eraseToLineBeginning 259 raise NotImplementedError( 260 "Unimplemented: TransportSequence.eraseToLineBeginning" 261 ) 262 263 def eraseLine(self): 264 # ITerminalTransport.eraseLine 265 raise NotImplementedError("Unimplemented: TransportSequence.eraseLine") 266 267 def eraseToDisplayEnd(self): 268 # ITerminalTransport.eraseToDisplayEnd 269 raise NotImplementedError("Unimplemented: TransportSequence.eraseToDisplayEnd") 270 271 def eraseToDisplayBeginning(self): 272 # ITerminalTransport.eraseToDisplayBeginning 273 raise NotImplementedError( 274 "Unimplemented: TransportSequence.eraseToDisplayBeginning" 275 ) 276 277 def eraseDisplay(self): 278 # ITerminalTransport.eraseDisplay 279 raise NotImplementedError("Unimplemented: TransportSequence.eraseDisplay") 280 281 def deleteCharacter(self, n=1): 282 # ITerminalTransport.deleteCharacter 283 raise NotImplementedError("Unimplemented: TransportSequence.deleteCharacter") 284 285 def insertLine(self, n=1): 286 # ITerminalTransport.insertLine 287 raise NotImplementedError("Unimplemented: TransportSequence.insertLine") 288 289 def deleteLine(self, n=1): 290 # ITerminalTransport.deleteLine 291 raise NotImplementedError("Unimplemented: TransportSequence.deleteLine") 292 293 def reportCursorPosition(self): 294 # ITerminalTransport.reportCursorPosition 295 raise NotImplementedError( 296 "Unimplemented: TransportSequence.reportCursorPosition" 297 ) 298 299 def reset(self): 300 # ITerminalTransport.reset 301 raise NotImplementedError("Unimplemented: TransportSequence.reset") 302 303 def unhandledControlSequence(self, seq): 304 # ITerminalTransport.unhandledControlSequence 305 raise NotImplementedError( 306 "Unimplemented: TransportSequence.unhandledControlSequence" 307 ) 308 309 310class LocalTerminalBufferMixin: 311 """ 312 A mixin for RecvLine subclasses which records the state of the terminal. 313 314 This is accomplished by performing all L{ITerminalTransport} operations on both 315 the transport passed to makeConnection and an instance of helper.TerminalBuffer. 316 317 @ivar terminalCopy: A L{helper.TerminalBuffer} instance which efforts 318 will be made to keep up to date with the actual terminal 319 associated with this protocol instance. 320 """ 321 322 def makeConnection(self, transport): 323 self.terminalCopy = helper.TerminalBuffer() 324 self.terminalCopy.connectionMade() 325 return super().makeConnection(TransportSequence(transport, self.terminalCopy)) 326 327 def __str__(self) -> str: 328 return str(self.terminalCopy) 329 330 331class RecvLine(insults.TerminalProtocol): 332 """ 333 L{TerminalProtocol} which adds line editing features. 334 335 Clients will be prompted for lines of input with all the usual 336 features: character echoing, left and right arrow support for 337 moving the cursor to different areas of the line buffer, backspace 338 and delete for removing characters, and insert for toggling 339 between typeover and insert mode. Tabs will be expanded to enough 340 spaces to move the cursor to the next tabstop (every four 341 characters by default). Enter causes the line buffer to be 342 cleared and the line to be passed to the lineReceived() method 343 which, by default, does nothing. Subclasses are responsible for 344 redrawing the input prompt (this will probably change). 345 """ 346 347 width = 80 348 height = 24 349 350 TABSTOP = 4 351 352 ps = (b">>> ", b"... ") 353 pn = 0 354 _printableChars = string.printable.encode("ascii") 355 356 _log = Logger() 357 358 def connectionMade(self): 359 # A list containing the characters making up the current line 360 self.lineBuffer = [] 361 362 # A zero-based (wtf else?) index into self.lineBuffer. 363 # Indicates the current cursor position. 364 self.lineBufferIndex = 0 365 366 t = self.terminal 367 # A map of keyIDs to bound instance methods. 368 self.keyHandlers = { 369 t.LEFT_ARROW: self.handle_LEFT, 370 t.RIGHT_ARROW: self.handle_RIGHT, 371 t.TAB: self.handle_TAB, 372 # Both of these should not be necessary, but figuring out 373 # which is necessary is a huge hassle. 374 b"\r": self.handle_RETURN, 375 b"\n": self.handle_RETURN, 376 t.BACKSPACE: self.handle_BACKSPACE, 377 t.DELETE: self.handle_DELETE, 378 t.INSERT: self.handle_INSERT, 379 t.HOME: self.handle_HOME, 380 t.END: self.handle_END, 381 } 382 383 self.initializeScreen() 384 385 def initializeScreen(self): 386 # Hmm, state sucks. Oh well. 387 # For now we will just take over the whole terminal. 388 self.terminal.reset() 389 self.terminal.write(self.ps[self.pn]) 390 # XXX Note: I would prefer to default to starting in insert 391 # mode, however this does not seem to actually work! I do not 392 # know why. This is probably of interest to implementors 393 # subclassing RecvLine. 394 395 # XXX XXX Note: But the unit tests all expect the initial mode 396 # to be insert right now. Fuck, there needs to be a way to 397 # query the current mode or something. 398 # self.setTypeoverMode() 399 self.setInsertMode() 400 401 def currentLineBuffer(self): 402 s = b"".join(self.lineBuffer) 403 return s[: self.lineBufferIndex], s[self.lineBufferIndex :] 404 405 def setInsertMode(self): 406 self.mode = "insert" 407 self.terminal.setModes([insults.modes.IRM]) 408 409 def setTypeoverMode(self): 410 self.mode = "typeover" 411 self.terminal.resetModes([insults.modes.IRM]) 412 413 def drawInputLine(self): 414 """ 415 Write a line containing the current input prompt and the current line 416 buffer at the current cursor position. 417 """ 418 self.terminal.write(self.ps[self.pn] + b"".join(self.lineBuffer)) 419 420 def terminalSize(self, width, height): 421 # XXX - Clear the previous input line, redraw it at the new 422 # cursor position 423 self.terminal.eraseDisplay() 424 self.terminal.cursorHome() 425 self.width = width 426 self.height = height 427 self.drawInputLine() 428 429 def unhandledControlSequence(self, seq): 430 pass 431 432 def keystrokeReceived(self, keyID, modifier): 433 m = self.keyHandlers.get(keyID) 434 if m is not None: 435 m() 436 elif keyID in self._printableChars: 437 self.characterReceived(keyID, False) 438 else: 439 self._log.warn("Received unhandled keyID: {keyID!r}", keyID=keyID) 440 441 def characterReceived(self, ch, moreCharactersComing): 442 if self.mode == "insert": 443 self.lineBuffer.insert(self.lineBufferIndex, ch) 444 else: 445 self.lineBuffer[self.lineBufferIndex : self.lineBufferIndex + 1] = [ch] 446 self.lineBufferIndex += 1 447 self.terminal.write(ch) 448 449 def handle_TAB(self): 450 n = self.TABSTOP - (len(self.lineBuffer) % self.TABSTOP) 451 self.terminal.cursorForward(n) 452 self.lineBufferIndex += n 453 self.lineBuffer.extend(iterbytes(b" " * n)) 454 455 def handle_LEFT(self): 456 if self.lineBufferIndex > 0: 457 self.lineBufferIndex -= 1 458 self.terminal.cursorBackward() 459 460 def handle_RIGHT(self): 461 if self.lineBufferIndex < len(self.lineBuffer): 462 self.lineBufferIndex += 1 463 self.terminal.cursorForward() 464 465 def handle_HOME(self): 466 if self.lineBufferIndex: 467 self.terminal.cursorBackward(self.lineBufferIndex) 468 self.lineBufferIndex = 0 469 470 def handle_END(self): 471 offset = len(self.lineBuffer) - self.lineBufferIndex 472 if offset: 473 self.terminal.cursorForward(offset) 474 self.lineBufferIndex = len(self.lineBuffer) 475 476 def handle_BACKSPACE(self): 477 if self.lineBufferIndex > 0: 478 self.lineBufferIndex -= 1 479 del self.lineBuffer[self.lineBufferIndex] 480 self.terminal.cursorBackward() 481 self.terminal.deleteCharacter() 482 483 def handle_DELETE(self): 484 if self.lineBufferIndex < len(self.lineBuffer): 485 del self.lineBuffer[self.lineBufferIndex] 486 self.terminal.deleteCharacter() 487 488 def handle_RETURN(self): 489 line = b"".join(self.lineBuffer) 490 self.lineBuffer = [] 491 self.lineBufferIndex = 0 492 self.terminal.nextLine() 493 self.lineReceived(line) 494 495 def handle_INSERT(self): 496 assert self.mode in ("typeover", "insert") 497 if self.mode == "typeover": 498 self.setInsertMode() 499 else: 500 self.setTypeoverMode() 501 502 def lineReceived(self, line): 503 pass 504 505 506class HistoricRecvLine(RecvLine): 507 """ 508 L{TerminalProtocol} which adds both basic line-editing features and input history. 509 510 Everything supported by L{RecvLine} is also supported by this class. In addition, the 511 up and down arrows traverse the input history. Each received line is automatically 512 added to the end of the input history. 513 """ 514 515 def connectionMade(self): 516 RecvLine.connectionMade(self) 517 518 self.historyLines = [] 519 self.historyPosition = 0 520 521 t = self.terminal 522 self.keyHandlers.update( 523 {t.UP_ARROW: self.handle_UP, t.DOWN_ARROW: self.handle_DOWN} 524 ) 525 526 def currentHistoryBuffer(self): 527 b = tuple(self.historyLines) 528 return b[: self.historyPosition], b[self.historyPosition :] 529 530 def _deliverBuffer(self, buf): 531 if buf: 532 for ch in iterbytes(buf[:-1]): 533 self.characterReceived(ch, True) 534 self.characterReceived(buf[-1:], False) 535 536 def handle_UP(self): 537 if self.lineBuffer and self.historyPosition == len(self.historyLines): 538 self.historyLines.append(b"".join(self.lineBuffer)) 539 if self.historyPosition > 0: 540 self.handle_HOME() 541 self.terminal.eraseToLineEnd() 542 543 self.historyPosition -= 1 544 self.lineBuffer = [] 545 546 self._deliverBuffer(self.historyLines[self.historyPosition]) 547 548 def handle_DOWN(self): 549 if self.historyPosition < len(self.historyLines) - 1: 550 self.handle_HOME() 551 self.terminal.eraseToLineEnd() 552 553 self.historyPosition += 1 554 self.lineBuffer = [] 555 556 self._deliverBuffer(self.historyLines[self.historyPosition]) 557 else: 558 self.handle_HOME() 559 self.terminal.eraseToLineEnd() 560 561 self.historyPosition = len(self.historyLines) 562 self.lineBuffer = [] 563 self.lineBufferIndex = 0 564 565 def handle_RETURN(self): 566 if self.lineBuffer: 567 self.historyLines.append(b"".join(self.lineBuffer)) 568 self.historyPosition = len(self.historyLines) 569 return RecvLine.handle_RETURN(self) 570