1# -*- test-case-name: twisted.conch.test.test_helper -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Partial in-memory terminal emulator 7 8@author: Jp Calderone 9""" 10 11 12import re 13import string 14 15from zope.interface import implementer 16 17from incremental import Version 18 19from twisted.conch.insults import insults 20from twisted.internet import defer, protocol, reactor 21from twisted.logger import Logger 22from twisted.python import _textattributes 23from twisted.python.compat import iterbytes 24from twisted.python.deprecate import deprecated, deprecatedModuleAttribute 25 26FOREGROUND = 30 27BACKGROUND = 40 28BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9) 29 30 31class _FormattingState(_textattributes._FormattingStateMixin): 32 """ 33 Represents the formatting state/attributes of a single character. 34 35 Character set, intensity, underlinedness, blinkitude, video 36 reversal, as well as foreground and background colors made up a 37 character's attributes. 38 """ 39 40 compareAttributes = ( 41 "charset", 42 "bold", 43 "underline", 44 "blink", 45 "reverseVideo", 46 "foreground", 47 "background", 48 "_subtracting", 49 ) 50 51 def __init__( 52 self, 53 charset=insults.G0, 54 bold=False, 55 underline=False, 56 blink=False, 57 reverseVideo=False, 58 foreground=WHITE, 59 background=BLACK, 60 _subtracting=False, 61 ): 62 self.charset = charset 63 self.bold = bold 64 self.underline = underline 65 self.blink = blink 66 self.reverseVideo = reverseVideo 67 self.foreground = foreground 68 self.background = background 69 self._subtracting = _subtracting 70 71 @deprecated(Version("Twisted", 13, 1, 0)) 72 def wantOne(self, **kw): 73 """ 74 Add a character attribute to a copy of this formatting state. 75 76 @param kw: An optional attribute name and value can be provided with 77 a keyword argument. 78 79 @return: A formatting state instance with the new attribute. 80 81 @see: L{DefaultFormattingState._withAttribute}. 82 """ 83 k, v = kw.popitem() 84 return self._withAttribute(k, v) 85 86 def toVT102(self): 87 # Spit out a vt102 control sequence that will set up 88 # all the attributes set here. Except charset. 89 attrs = [] 90 if self._subtracting: 91 attrs.append(0) 92 if self.bold: 93 attrs.append(insults.BOLD) 94 if self.underline: 95 attrs.append(insults.UNDERLINE) 96 if self.blink: 97 attrs.append(insults.BLINK) 98 if self.reverseVideo: 99 attrs.append(insults.REVERSE_VIDEO) 100 if self.foreground != WHITE: 101 attrs.append(FOREGROUND + self.foreground) 102 if self.background != BLACK: 103 attrs.append(BACKGROUND + self.background) 104 if attrs: 105 return "\x1b[" + ";".join(map(str, attrs)) + "m" 106 return "" 107 108 109CharacterAttribute = _FormattingState 110 111deprecatedModuleAttribute( 112 Version("Twisted", 13, 1, 0), 113 "Use twisted.conch.insults.text.assembleFormattedText instead.", 114 "twisted.conch.insults.helper", 115 "CharacterAttribute", 116) 117 118 119# XXX - need to support scroll regions and scroll history 120@implementer(insults.ITerminalTransport) 121class TerminalBuffer(protocol.Protocol): 122 """ 123 An in-memory terminal emulator. 124 """ 125 126 for keyID in ( 127 b"UP_ARROW", 128 b"DOWN_ARROW", 129 b"RIGHT_ARROW", 130 b"LEFT_ARROW", 131 b"HOME", 132 b"INSERT", 133 b"DELETE", 134 b"END", 135 b"PGUP", 136 b"PGDN", 137 b"F1", 138 b"F2", 139 b"F3", 140 b"F4", 141 b"F5", 142 b"F6", 143 b"F7", 144 b"F8", 145 b"F9", 146 b"F10", 147 b"F11", 148 b"F12", 149 ): 150 execBytes = keyID + b" = object()" 151 execStr = execBytes.decode("ascii") 152 exec(execStr) 153 154 TAB = b"\t" 155 BACKSPACE = b"\x7f" 156 157 width = 80 158 height = 24 159 160 fill = b" " 161 void = object() 162 _log = Logger() 163 164 def getCharacter(self, x, y): 165 return self.lines[y][x] 166 167 def connectionMade(self): 168 self.reset() 169 170 def write(self, data): 171 """ 172 Add the given printable bytes to the terminal. 173 174 Line feeds in L{bytes} will be replaced with carriage return / line 175 feed pairs. 176 """ 177 for b in iterbytes(data.replace(b"\n", b"\r\n")): 178 self.insertAtCursor(b) 179 180 def _currentFormattingState(self): 181 return _FormattingState(self.activeCharset, **self.graphicRendition) 182 183 def insertAtCursor(self, b): 184 """ 185 Add one byte to the terminal at the cursor and make consequent state 186 updates. 187 188 If b is a carriage return, move the cursor to the beginning of the 189 current row. 190 191 If b is a line feed, move the cursor to the next row or scroll down if 192 the cursor is already in the last row. 193 194 Otherwise, if b is printable, put it at the cursor position (inserting 195 or overwriting as dictated by the current mode) and move the cursor. 196 """ 197 if b == b"\r": 198 self.x = 0 199 elif b == b"\n": 200 self._scrollDown() 201 elif b in string.printable.encode("ascii"): 202 if self.x >= self.width: 203 self.nextLine() 204 ch = (b, self._currentFormattingState()) 205 if self.modes.get(insults.modes.IRM): 206 self.lines[self.y][self.x : self.x] = [ch] 207 self.lines[self.y].pop() 208 else: 209 self.lines[self.y][self.x] = ch 210 self.x += 1 211 212 def _emptyLine(self, width): 213 return [(self.void, self._currentFormattingState()) for i in range(width)] 214 215 def _scrollDown(self): 216 self.y += 1 217 if self.y >= self.height: 218 self.y -= 1 219 del self.lines[0] 220 self.lines.append(self._emptyLine(self.width)) 221 222 def _scrollUp(self): 223 self.y -= 1 224 if self.y < 0: 225 self.y = 0 226 del self.lines[-1] 227 self.lines.insert(0, self._emptyLine(self.width)) 228 229 def cursorUp(self, n=1): 230 self.y = max(0, self.y - n) 231 232 def cursorDown(self, n=1): 233 self.y = min(self.height - 1, self.y + n) 234 235 def cursorBackward(self, n=1): 236 self.x = max(0, self.x - n) 237 238 def cursorForward(self, n=1): 239 self.x = min(self.width, self.x + n) 240 241 def cursorPosition(self, column, line): 242 self.x = column 243 self.y = line 244 245 def cursorHome(self): 246 self.x = self.home.x 247 self.y = self.home.y 248 249 def index(self): 250 self._scrollDown() 251 252 def reverseIndex(self): 253 self._scrollUp() 254 255 def nextLine(self): 256 """ 257 Update the cursor position attributes and scroll down if appropriate. 258 """ 259 self.x = 0 260 self._scrollDown() 261 262 def saveCursor(self): 263 self._savedCursor = (self.x, self.y) 264 265 def restoreCursor(self): 266 self.x, self.y = self._savedCursor 267 del self._savedCursor 268 269 def setModes(self, modes): 270 for m in modes: 271 self.modes[m] = True 272 273 def resetModes(self, modes): 274 for m in modes: 275 try: 276 del self.modes[m] 277 except KeyError: 278 pass 279 280 def setPrivateModes(self, modes): 281 """ 282 Enable the given modes. 283 284 Track which modes have been enabled so that the implementations of 285 other L{insults.ITerminalTransport} methods can be properly implemented 286 to respect these settings. 287 288 @see: L{resetPrivateModes} 289 @see: L{insults.ITerminalTransport.setPrivateModes} 290 """ 291 for m in modes: 292 self.privateModes[m] = True 293 294 def resetPrivateModes(self, modes): 295 """ 296 Disable the given modes. 297 298 @see: L{setPrivateModes} 299 @see: L{insults.ITerminalTransport.resetPrivateModes} 300 """ 301 for m in modes: 302 try: 303 del self.privateModes[m] 304 except KeyError: 305 pass 306 307 def applicationKeypadMode(self): 308 self.keypadMode = "app" 309 310 def numericKeypadMode(self): 311 self.keypadMode = "num" 312 313 def selectCharacterSet(self, charSet, which): 314 self.charsets[which] = charSet 315 316 def shiftIn(self): 317 self.activeCharset = insults.G0 318 319 def shiftOut(self): 320 self.activeCharset = insults.G1 321 322 def singleShift2(self): 323 oldActiveCharset = self.activeCharset 324 self.activeCharset = insults.G2 325 f = self.insertAtCursor 326 327 def insertAtCursor(b): 328 f(b) 329 del self.insertAtCursor 330 self.activeCharset = oldActiveCharset 331 332 self.insertAtCursor = insertAtCursor 333 334 def singleShift3(self): 335 oldActiveCharset = self.activeCharset 336 self.activeCharset = insults.G3 337 f = self.insertAtCursor 338 339 def insertAtCursor(b): 340 f(b) 341 del self.insertAtCursor 342 self.activeCharset = oldActiveCharset 343 344 self.insertAtCursor = insertAtCursor 345 346 def selectGraphicRendition(self, *attributes): 347 for a in attributes: 348 if a == insults.NORMAL: 349 self.graphicRendition = { 350 "bold": False, 351 "underline": False, 352 "blink": False, 353 "reverseVideo": False, 354 "foreground": WHITE, 355 "background": BLACK, 356 } 357 elif a == insults.BOLD: 358 self.graphicRendition["bold"] = True 359 elif a == insults.UNDERLINE: 360 self.graphicRendition["underline"] = True 361 elif a == insults.BLINK: 362 self.graphicRendition["blink"] = True 363 elif a == insults.REVERSE_VIDEO: 364 self.graphicRendition["reverseVideo"] = True 365 else: 366 try: 367 v = int(a) 368 except ValueError: 369 self._log.error( 370 "Unknown graphic rendition attribute: {attr!r}", attr=a 371 ) 372 else: 373 if FOREGROUND <= v <= FOREGROUND + N_COLORS: 374 self.graphicRendition["foreground"] = v - FOREGROUND 375 elif BACKGROUND <= v <= BACKGROUND + N_COLORS: 376 self.graphicRendition["background"] = v - BACKGROUND 377 else: 378 self._log.error( 379 "Unknown graphic rendition attribute: {attr!r}", attr=a 380 ) 381 382 def eraseLine(self): 383 self.lines[self.y] = self._emptyLine(self.width) 384 385 def eraseToLineEnd(self): 386 width = self.width - self.x 387 self.lines[self.y][self.x :] = self._emptyLine(width) 388 389 def eraseToLineBeginning(self): 390 self.lines[self.y][: self.x + 1] = self._emptyLine(self.x + 1) 391 392 def eraseDisplay(self): 393 self.lines = [self._emptyLine(self.width) for i in range(self.height)] 394 395 def eraseToDisplayEnd(self): 396 self.eraseToLineEnd() 397 height = self.height - self.y - 1 398 self.lines[self.y + 1 :] = [self._emptyLine(self.width) for i in range(height)] 399 400 def eraseToDisplayBeginning(self): 401 self.eraseToLineBeginning() 402 self.lines[: self.y] = [self._emptyLine(self.width) for i in range(self.y)] 403 404 def deleteCharacter(self, n=1): 405 del self.lines[self.y][self.x : self.x + n] 406 self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n))) 407 408 def insertLine(self, n=1): 409 self.lines[self.y : self.y] = [self._emptyLine(self.width) for i in range(n)] 410 del self.lines[self.height :] 411 412 def deleteLine(self, n=1): 413 del self.lines[self.y : self.y + n] 414 self.lines.extend([self._emptyLine(self.width) for i in range(n)]) 415 416 def reportCursorPosition(self): 417 return (self.x, self.y) 418 419 def reset(self): 420 self.home = insults.Vector(0, 0) 421 self.x = self.y = 0 422 self.modes = {} 423 self.privateModes = {} 424 self.setPrivateModes( 425 [insults.privateModes.AUTO_WRAP, insults.privateModes.CURSOR_MODE] 426 ) 427 self.numericKeypad = "app" 428 self.activeCharset = insults.G0 429 self.graphicRendition = { 430 "bold": False, 431 "underline": False, 432 "blink": False, 433 "reverseVideo": False, 434 "foreground": WHITE, 435 "background": BLACK, 436 } 437 self.charsets = { 438 insults.G0: insults.CS_US, 439 insults.G1: insults.CS_US, 440 insults.G2: insults.CS_ALTERNATE, 441 insults.G3: insults.CS_ALTERNATE_SPECIAL, 442 } 443 self.eraseDisplay() 444 445 def unhandledControlSequence(self, buf): 446 print("Could not handle", repr(buf)) 447 448 def __bytes__(self): 449 lines = [] 450 for L in self.lines: 451 buf = [] 452 length = 0 453 for (ch, attr) in L: 454 if ch is not self.void: 455 buf.append(ch) 456 length = len(buf) 457 else: 458 buf.append(self.fill) 459 lines.append(b"".join(buf[:length])) 460 return b"\n".join(lines) 461 462 def getHost(self): 463 # ITransport.getHost 464 raise NotImplementedError("Unimplemented: TerminalBuffer.getHost") 465 466 def getPeer(self): 467 # ITransport.getPeer 468 raise NotImplementedError("Unimplemented: TerminalBuffer.getPeer") 469 470 def loseConnection(self): 471 # ITransport.loseConnection 472 raise NotImplementedError("Unimplemented: TerminalBuffer.loseConnection") 473 474 def writeSequence(self, data): 475 # ITransport.writeSequence 476 raise NotImplementedError("Unimplemented: TerminalBuffer.writeSequence") 477 478 def horizontalTabulationSet(self): 479 # ITerminalTransport.horizontalTabulationSet 480 raise NotImplementedError( 481 "Unimplemented: TerminalBuffer.horizontalTabulationSet" 482 ) 483 484 def tabulationClear(self): 485 # TerminalTransport.tabulationClear 486 raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClear") 487 488 def tabulationClearAll(self): 489 # TerminalTransport.tabulationClearAll 490 raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClearAll") 491 492 def doubleHeightLine(self, top=True): 493 # ITerminalTransport.doubleHeightLine 494 raise NotImplementedError("Unimplemented: TerminalBuffer.doubleHeightLine") 495 496 def singleWidthLine(self): 497 # ITerminalTransport.singleWidthLine 498 raise NotImplementedError("Unimplemented: TerminalBuffer.singleWidthLine") 499 500 def doubleWidthLine(self): 501 # ITerminalTransport.doubleWidthLine 502 raise NotImplementedError("Unimplemented: TerminalBuffer.doubleWidthLine") 503 504 505class ExpectationTimeout(Exception): 506 pass 507 508 509class ExpectableBuffer(TerminalBuffer): 510 _mark = 0 511 512 def connectionMade(self): 513 TerminalBuffer.connectionMade(self) 514 self._expecting = [] 515 516 def write(self, data): 517 TerminalBuffer.write(self, data) 518 self._checkExpected() 519 520 def cursorHome(self): 521 TerminalBuffer.cursorHome(self) 522 self._mark = 0 523 524 def _timeoutExpected(self, d): 525 d.errback(ExpectationTimeout()) 526 self._checkExpected() 527 528 def _checkExpected(self): 529 s = self.__bytes__()[self._mark :] 530 while self._expecting: 531 expr, timer, deferred = self._expecting[0] 532 if timer and not timer.active(): 533 del self._expecting[0] 534 continue 535 for match in expr.finditer(s): 536 if timer: 537 timer.cancel() 538 del self._expecting[0] 539 self._mark += match.end() 540 s = s[match.end() :] 541 deferred.callback(match) 542 break 543 else: 544 return 545 546 def expect(self, expression, timeout=None, scheduler=reactor): 547 d = defer.Deferred() 548 timer = None 549 if timeout: 550 timer = scheduler.callLater(timeout, self._timeoutExpected, d) 551 self._expecting.append((re.compile(expression), timer, d)) 552 self._checkExpected() 553 return d 554 555 556__all__ = ["CharacterAttribute", "TerminalBuffer", "ExpectableBuffer"] 557