1# -*- test-case-name: twisted.conch.test.test_helper -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Partial in-memory terminal emulator
7
8@author: Jp Calderone
9"""
10
11
12import re
13import string
14
15from zope.interface import implementer
16
17from incremental import Version
18
19from twisted.conch.insults import insults
20from twisted.internet import defer, protocol, reactor
21from twisted.logger import Logger
22from twisted.python import _textattributes
23from twisted.python.compat import iterbytes
24from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
25
26FOREGROUND = 30
27BACKGROUND = 40
28BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
29
30
31class _FormattingState(_textattributes._FormattingStateMixin):
32    """
33    Represents the formatting state/attributes of a single character.
34
35    Character set, intensity, underlinedness, blinkitude, video
36    reversal, as well as foreground and background colors made up a
37    character's attributes.
38    """
39
40    compareAttributes = (
41        "charset",
42        "bold",
43        "underline",
44        "blink",
45        "reverseVideo",
46        "foreground",
47        "background",
48        "_subtracting",
49    )
50
51    def __init__(
52        self,
53        charset=insults.G0,
54        bold=False,
55        underline=False,
56        blink=False,
57        reverseVideo=False,
58        foreground=WHITE,
59        background=BLACK,
60        _subtracting=False,
61    ):
62        self.charset = charset
63        self.bold = bold
64        self.underline = underline
65        self.blink = blink
66        self.reverseVideo = reverseVideo
67        self.foreground = foreground
68        self.background = background
69        self._subtracting = _subtracting
70
71    @deprecated(Version("Twisted", 13, 1, 0))
72    def wantOne(self, **kw):
73        """
74        Add a character attribute to a copy of this formatting state.
75
76        @param kw: An optional attribute name and value can be provided with
77            a keyword argument.
78
79        @return: A formatting state instance with the new attribute.
80
81        @see: L{DefaultFormattingState._withAttribute}.
82        """
83        k, v = kw.popitem()
84        return self._withAttribute(k, v)
85
86    def toVT102(self):
87        # Spit out a vt102 control sequence that will set up
88        # all the attributes set here.  Except charset.
89        attrs = []
90        if self._subtracting:
91            attrs.append(0)
92        if self.bold:
93            attrs.append(insults.BOLD)
94        if self.underline:
95            attrs.append(insults.UNDERLINE)
96        if self.blink:
97            attrs.append(insults.BLINK)
98        if self.reverseVideo:
99            attrs.append(insults.REVERSE_VIDEO)
100        if self.foreground != WHITE:
101            attrs.append(FOREGROUND + self.foreground)
102        if self.background != BLACK:
103            attrs.append(BACKGROUND + self.background)
104        if attrs:
105            return "\x1b[" + ";".join(map(str, attrs)) + "m"
106        return ""
107
108
109CharacterAttribute = _FormattingState
110
111deprecatedModuleAttribute(
112    Version("Twisted", 13, 1, 0),
113    "Use twisted.conch.insults.text.assembleFormattedText instead.",
114    "twisted.conch.insults.helper",
115    "CharacterAttribute",
116)
117
118
119# XXX - need to support scroll regions and scroll history
120@implementer(insults.ITerminalTransport)
121class TerminalBuffer(protocol.Protocol):
122    """
123    An in-memory terminal emulator.
124    """
125
126    for keyID in (
127        b"UP_ARROW",
128        b"DOWN_ARROW",
129        b"RIGHT_ARROW",
130        b"LEFT_ARROW",
131        b"HOME",
132        b"INSERT",
133        b"DELETE",
134        b"END",
135        b"PGUP",
136        b"PGDN",
137        b"F1",
138        b"F2",
139        b"F3",
140        b"F4",
141        b"F5",
142        b"F6",
143        b"F7",
144        b"F8",
145        b"F9",
146        b"F10",
147        b"F11",
148        b"F12",
149    ):
150        execBytes = keyID + b" = object()"
151        execStr = execBytes.decode("ascii")
152        exec(execStr)
153
154    TAB = b"\t"
155    BACKSPACE = b"\x7f"
156
157    width = 80
158    height = 24
159
160    fill = b" "
161    void = object()
162    _log = Logger()
163
164    def getCharacter(self, x, y):
165        return self.lines[y][x]
166
167    def connectionMade(self):
168        self.reset()
169
170    def write(self, data):
171        """
172        Add the given printable bytes to the terminal.
173
174        Line feeds in L{bytes} will be replaced with carriage return / line
175        feed pairs.
176        """
177        for b in iterbytes(data.replace(b"\n", b"\r\n")):
178            self.insertAtCursor(b)
179
180    def _currentFormattingState(self):
181        return _FormattingState(self.activeCharset, **self.graphicRendition)
182
183    def insertAtCursor(self, b):
184        """
185        Add one byte to the terminal at the cursor and make consequent state
186        updates.
187
188        If b is a carriage return, move the cursor to the beginning of the
189        current row.
190
191        If b is a line feed, move the cursor to the next row or scroll down if
192        the cursor is already in the last row.
193
194        Otherwise, if b is printable, put it at the cursor position (inserting
195        or overwriting as dictated by the current mode) and move the cursor.
196        """
197        if b == b"\r":
198            self.x = 0
199        elif b == b"\n":
200            self._scrollDown()
201        elif b in string.printable.encode("ascii"):
202            if self.x >= self.width:
203                self.nextLine()
204            ch = (b, self._currentFormattingState())
205            if self.modes.get(insults.modes.IRM):
206                self.lines[self.y][self.x : self.x] = [ch]
207                self.lines[self.y].pop()
208            else:
209                self.lines[self.y][self.x] = ch
210            self.x += 1
211
212    def _emptyLine(self, width):
213        return [(self.void, self._currentFormattingState()) for i in range(width)]
214
215    def _scrollDown(self):
216        self.y += 1
217        if self.y >= self.height:
218            self.y -= 1
219            del self.lines[0]
220            self.lines.append(self._emptyLine(self.width))
221
222    def _scrollUp(self):
223        self.y -= 1
224        if self.y < 0:
225            self.y = 0
226            del self.lines[-1]
227            self.lines.insert(0, self._emptyLine(self.width))
228
229    def cursorUp(self, n=1):
230        self.y = max(0, self.y - n)
231
232    def cursorDown(self, n=1):
233        self.y = min(self.height - 1, self.y + n)
234
235    def cursorBackward(self, n=1):
236        self.x = max(0, self.x - n)
237
238    def cursorForward(self, n=1):
239        self.x = min(self.width, self.x + n)
240
241    def cursorPosition(self, column, line):
242        self.x = column
243        self.y = line
244
245    def cursorHome(self):
246        self.x = self.home.x
247        self.y = self.home.y
248
249    def index(self):
250        self._scrollDown()
251
252    def reverseIndex(self):
253        self._scrollUp()
254
255    def nextLine(self):
256        """
257        Update the cursor position attributes and scroll down if appropriate.
258        """
259        self.x = 0
260        self._scrollDown()
261
262    def saveCursor(self):
263        self._savedCursor = (self.x, self.y)
264
265    def restoreCursor(self):
266        self.x, self.y = self._savedCursor
267        del self._savedCursor
268
269    def setModes(self, modes):
270        for m in modes:
271            self.modes[m] = True
272
273    def resetModes(self, modes):
274        for m in modes:
275            try:
276                del self.modes[m]
277            except KeyError:
278                pass
279
280    def setPrivateModes(self, modes):
281        """
282        Enable the given modes.
283
284        Track which modes have been enabled so that the implementations of
285        other L{insults.ITerminalTransport} methods can be properly implemented
286        to respect these settings.
287
288        @see: L{resetPrivateModes}
289        @see: L{insults.ITerminalTransport.setPrivateModes}
290        """
291        for m in modes:
292            self.privateModes[m] = True
293
294    def resetPrivateModes(self, modes):
295        """
296        Disable the given modes.
297
298        @see: L{setPrivateModes}
299        @see: L{insults.ITerminalTransport.resetPrivateModes}
300        """
301        for m in modes:
302            try:
303                del self.privateModes[m]
304            except KeyError:
305                pass
306
307    def applicationKeypadMode(self):
308        self.keypadMode = "app"
309
310    def numericKeypadMode(self):
311        self.keypadMode = "num"
312
313    def selectCharacterSet(self, charSet, which):
314        self.charsets[which] = charSet
315
316    def shiftIn(self):
317        self.activeCharset = insults.G0
318
319    def shiftOut(self):
320        self.activeCharset = insults.G1
321
322    def singleShift2(self):
323        oldActiveCharset = self.activeCharset
324        self.activeCharset = insults.G2
325        f = self.insertAtCursor
326
327        def insertAtCursor(b):
328            f(b)
329            del self.insertAtCursor
330            self.activeCharset = oldActiveCharset
331
332        self.insertAtCursor = insertAtCursor
333
334    def singleShift3(self):
335        oldActiveCharset = self.activeCharset
336        self.activeCharset = insults.G3
337        f = self.insertAtCursor
338
339        def insertAtCursor(b):
340            f(b)
341            del self.insertAtCursor
342            self.activeCharset = oldActiveCharset
343
344        self.insertAtCursor = insertAtCursor
345
346    def selectGraphicRendition(self, *attributes):
347        for a in attributes:
348            if a == insults.NORMAL:
349                self.graphicRendition = {
350                    "bold": False,
351                    "underline": False,
352                    "blink": False,
353                    "reverseVideo": False,
354                    "foreground": WHITE,
355                    "background": BLACK,
356                }
357            elif a == insults.BOLD:
358                self.graphicRendition["bold"] = True
359            elif a == insults.UNDERLINE:
360                self.graphicRendition["underline"] = True
361            elif a == insults.BLINK:
362                self.graphicRendition["blink"] = True
363            elif a == insults.REVERSE_VIDEO:
364                self.graphicRendition["reverseVideo"] = True
365            else:
366                try:
367                    v = int(a)
368                except ValueError:
369                    self._log.error(
370                        "Unknown graphic rendition attribute: {attr!r}", attr=a
371                    )
372                else:
373                    if FOREGROUND <= v <= FOREGROUND + N_COLORS:
374                        self.graphicRendition["foreground"] = v - FOREGROUND
375                    elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
376                        self.graphicRendition["background"] = v - BACKGROUND
377                    else:
378                        self._log.error(
379                            "Unknown graphic rendition attribute: {attr!r}", attr=a
380                        )
381
382    def eraseLine(self):
383        self.lines[self.y] = self._emptyLine(self.width)
384
385    def eraseToLineEnd(self):
386        width = self.width - self.x
387        self.lines[self.y][self.x :] = self._emptyLine(width)
388
389    def eraseToLineBeginning(self):
390        self.lines[self.y][: self.x + 1] = self._emptyLine(self.x + 1)
391
392    def eraseDisplay(self):
393        self.lines = [self._emptyLine(self.width) for i in range(self.height)]
394
395    def eraseToDisplayEnd(self):
396        self.eraseToLineEnd()
397        height = self.height - self.y - 1
398        self.lines[self.y + 1 :] = [self._emptyLine(self.width) for i in range(height)]
399
400    def eraseToDisplayBeginning(self):
401        self.eraseToLineBeginning()
402        self.lines[: self.y] = [self._emptyLine(self.width) for i in range(self.y)]
403
404    def deleteCharacter(self, n=1):
405        del self.lines[self.y][self.x : self.x + n]
406        self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
407
408    def insertLine(self, n=1):
409        self.lines[self.y : self.y] = [self._emptyLine(self.width) for i in range(n)]
410        del self.lines[self.height :]
411
412    def deleteLine(self, n=1):
413        del self.lines[self.y : self.y + n]
414        self.lines.extend([self._emptyLine(self.width) for i in range(n)])
415
416    def reportCursorPosition(self):
417        return (self.x, self.y)
418
419    def reset(self):
420        self.home = insults.Vector(0, 0)
421        self.x = self.y = 0
422        self.modes = {}
423        self.privateModes = {}
424        self.setPrivateModes(
425            [insults.privateModes.AUTO_WRAP, insults.privateModes.CURSOR_MODE]
426        )
427        self.numericKeypad = "app"
428        self.activeCharset = insults.G0
429        self.graphicRendition = {
430            "bold": False,
431            "underline": False,
432            "blink": False,
433            "reverseVideo": False,
434            "foreground": WHITE,
435            "background": BLACK,
436        }
437        self.charsets = {
438            insults.G0: insults.CS_US,
439            insults.G1: insults.CS_US,
440            insults.G2: insults.CS_ALTERNATE,
441            insults.G3: insults.CS_ALTERNATE_SPECIAL,
442        }
443        self.eraseDisplay()
444
445    def unhandledControlSequence(self, buf):
446        print("Could not handle", repr(buf))
447
448    def __bytes__(self):
449        lines = []
450        for L in self.lines:
451            buf = []
452            length = 0
453            for (ch, attr) in L:
454                if ch is not self.void:
455                    buf.append(ch)
456                    length = len(buf)
457                else:
458                    buf.append(self.fill)
459            lines.append(b"".join(buf[:length]))
460        return b"\n".join(lines)
461
462    def getHost(self):
463        # ITransport.getHost
464        raise NotImplementedError("Unimplemented: TerminalBuffer.getHost")
465
466    def getPeer(self):
467        # ITransport.getPeer
468        raise NotImplementedError("Unimplemented: TerminalBuffer.getPeer")
469
470    def loseConnection(self):
471        # ITransport.loseConnection
472        raise NotImplementedError("Unimplemented: TerminalBuffer.loseConnection")
473
474    def writeSequence(self, data):
475        # ITransport.writeSequence
476        raise NotImplementedError("Unimplemented: TerminalBuffer.writeSequence")
477
478    def horizontalTabulationSet(self):
479        # ITerminalTransport.horizontalTabulationSet
480        raise NotImplementedError(
481            "Unimplemented: TerminalBuffer.horizontalTabulationSet"
482        )
483
484    def tabulationClear(self):
485        # TerminalTransport.tabulationClear
486        raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClear")
487
488    def tabulationClearAll(self):
489        # TerminalTransport.tabulationClearAll
490        raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClearAll")
491
492    def doubleHeightLine(self, top=True):
493        # ITerminalTransport.doubleHeightLine
494        raise NotImplementedError("Unimplemented: TerminalBuffer.doubleHeightLine")
495
496    def singleWidthLine(self):
497        # ITerminalTransport.singleWidthLine
498        raise NotImplementedError("Unimplemented: TerminalBuffer.singleWidthLine")
499
500    def doubleWidthLine(self):
501        # ITerminalTransport.doubleWidthLine
502        raise NotImplementedError("Unimplemented: TerminalBuffer.doubleWidthLine")
503
504
505class ExpectationTimeout(Exception):
506    pass
507
508
509class ExpectableBuffer(TerminalBuffer):
510    _mark = 0
511
512    def connectionMade(self):
513        TerminalBuffer.connectionMade(self)
514        self._expecting = []
515
516    def write(self, data):
517        TerminalBuffer.write(self, data)
518        self._checkExpected()
519
520    def cursorHome(self):
521        TerminalBuffer.cursorHome(self)
522        self._mark = 0
523
524    def _timeoutExpected(self, d):
525        d.errback(ExpectationTimeout())
526        self._checkExpected()
527
528    def _checkExpected(self):
529        s = self.__bytes__()[self._mark :]
530        while self._expecting:
531            expr, timer, deferred = self._expecting[0]
532            if timer and not timer.active():
533                del self._expecting[0]
534                continue
535            for match in expr.finditer(s):
536                if timer:
537                    timer.cancel()
538                del self._expecting[0]
539                self._mark += match.end()
540                s = s[match.end() :]
541                deferred.callback(match)
542                break
543            else:
544                return
545
546    def expect(self, expression, timeout=None, scheduler=reactor):
547        d = defer.Deferred()
548        timer = None
549        if timeout:
550            timer = scheduler.callLater(timeout, self._timeoutExpected, d)
551        self._expecting.append((re.compile(expression), timer, d))
552        self._checkExpected()
553        return d
554
555
556__all__ = ["CharacterAttribute", "TerminalBuffer", "ExpectableBuffer"]
557