1# -*- test-case-name: twisted.conch.test.test_recvline -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Basic line editing support.
7
8@author: Jp Calderone
9"""
10
11import string
12from typing import Dict
13
14from zope.interface import implementer
15
16from twisted.conch.insults import helper, insults
17from twisted.logger import Logger
18from twisted.python import reflect
19from twisted.python.compat import iterbytes
20
21_counters: Dict[str, int] = {}
22
23
24class Logging:
25    """
26    Wrapper which logs attribute lookups.
27
28    This was useful in debugging something, I guess.  I forget what.
29    It can probably be deleted or moved somewhere more appropriate.
30    Nothing special going on here, really.
31    """
32
33    def __init__(self, original):
34        self.original = original
35        key = reflect.qual(original.__class__)
36        count = _counters.get(key, 0)
37        _counters[key] = count + 1
38        self._logFile = open(key + "-" + str(count), "w")
39
40    def __str__(self) -> str:
41        return str(super().__getattribute__("original"))
42
43    def __repr__(self) -> str:
44        return repr(super().__getattribute__("original"))
45
46    def __getattribute__(self, name):
47        original = super().__getattribute__("original")
48        logFile = super().__getattribute__("_logFile")
49        logFile.write(name + "\n")
50        return getattr(original, name)
51
52
53@implementer(insults.ITerminalTransport)
54class TransportSequence:
55    """
56    An L{ITerminalTransport} implementation which forwards calls to
57    one or more other L{ITerminalTransport}s.
58
59    This is a cheap way for servers to keep track of the state they
60    expect the client to see, since all terminal manipulations can be
61    send to the real client and to a terminal emulator that lives in
62    the server process.
63    """
64
65    for keyID in (
66        b"UP_ARROW",
67        b"DOWN_ARROW",
68        b"RIGHT_ARROW",
69        b"LEFT_ARROW",
70        b"HOME",
71        b"INSERT",
72        b"DELETE",
73        b"END",
74        b"PGUP",
75        b"PGDN",
76        b"F1",
77        b"F2",
78        b"F3",
79        b"F4",
80        b"F5",
81        b"F6",
82        b"F7",
83        b"F8",
84        b"F9",
85        b"F10",
86        b"F11",
87        b"F12",
88    ):
89        execBytes = keyID + b" = object()"
90        execStr = execBytes.decode("ascii")
91        exec(execStr)
92
93    TAB = b"\t"
94    BACKSPACE = b"\x7f"
95
96    def __init__(self, *transports):
97        assert transports, "Cannot construct a TransportSequence with no transports"
98        self.transports = transports
99
100    for method in insults.ITerminalTransport:
101        exec(
102            """\
103def %s(self, *a, **kw):
104    for tpt in self.transports:
105        result = tpt.%s(*a, **kw)
106    return result
107"""
108            % (method, method)
109        )
110
111    def getHost(self):
112        # ITransport.getHost
113        raise NotImplementedError("Unimplemented: TransportSequence.getHost")
114
115    def getPeer(self):
116        # ITransport.getPeer
117        raise NotImplementedError("Unimplemented: TransportSequence.getPeer")
118
119    def loseConnection(self):
120        # ITransport.loseConnection
121        raise NotImplementedError("Unimplemented: TransportSequence.loseConnection")
122
123    def write(self, data):
124        # ITransport.write
125        raise NotImplementedError("Unimplemented: TransportSequence.write")
126
127    def writeSequence(self, data):
128        # ITransport.writeSequence
129        raise NotImplementedError("Unimplemented: TransportSequence.writeSequence")
130
131    def cursorUp(self, n=1):
132        # ITerminalTransport.cursorUp
133        raise NotImplementedError("Unimplemented: TransportSequence.cursorUp")
134
135    def cursorDown(self, n=1):
136        # ITerminalTransport.cursorDown
137        raise NotImplementedError("Unimplemented: TransportSequence.cursorDown")
138
139    def cursorForward(self, n=1):
140        # ITerminalTransport.cursorForward
141        raise NotImplementedError("Unimplemented: TransportSequence.cursorForward")
142
143    def cursorBackward(self, n=1):
144        # ITerminalTransport.cursorBackward
145        raise NotImplementedError("Unimplemented: TransportSequence.cursorBackward")
146
147    def cursorPosition(self, column, line):
148        # ITerminalTransport.cursorPosition
149        raise NotImplementedError("Unimplemented: TransportSequence.cursorPosition")
150
151    def cursorHome(self):
152        # ITerminalTransport.cursorHome
153        raise NotImplementedError("Unimplemented: TransportSequence.cursorHome")
154
155    def index(self):
156        # ITerminalTransport.index
157        raise NotImplementedError("Unimplemented: TransportSequence.index")
158
159    def reverseIndex(self):
160        # ITerminalTransport.reverseIndex
161        raise NotImplementedError("Unimplemented: TransportSequence.reverseIndex")
162
163    def nextLine(self):
164        # ITerminalTransport.nextLine
165        raise NotImplementedError("Unimplemented: TransportSequence.nextLine")
166
167    def saveCursor(self):
168        # ITerminalTransport.saveCursor
169        raise NotImplementedError("Unimplemented: TransportSequence.saveCursor")
170
171    def restoreCursor(self):
172        # ITerminalTransport.restoreCursor
173        raise NotImplementedError("Unimplemented: TransportSequence.restoreCursor")
174
175    def setModes(self, modes):
176        # ITerminalTransport.setModes
177        raise NotImplementedError("Unimplemented: TransportSequence.setModes")
178
179    def resetModes(self, mode):
180        # ITerminalTransport.resetModes
181        raise NotImplementedError("Unimplemented: TransportSequence.resetModes")
182
183    def setPrivateModes(self, modes):
184        # ITerminalTransport.setPrivateModes
185        raise NotImplementedError("Unimplemented: TransportSequence.setPrivateModes")
186
187    def resetPrivateModes(self, modes):
188        # ITerminalTransport.resetPrivateModes
189        raise NotImplementedError("Unimplemented: TransportSequence.resetPrivateModes")
190
191    def applicationKeypadMode(self):
192        # ITerminalTransport.applicationKeypadMode
193        raise NotImplementedError(
194            "Unimplemented: TransportSequence.applicationKeypadMode"
195        )
196
197    def numericKeypadMode(self):
198        # ITerminalTransport.numericKeypadMode
199        raise NotImplementedError("Unimplemented: TransportSequence.numericKeypadMode")
200
201    def selectCharacterSet(self, charSet, which):
202        # ITerminalTransport.selectCharacterSet
203        raise NotImplementedError("Unimplemented: TransportSequence.selectCharacterSet")
204
205    def shiftIn(self):
206        # ITerminalTransport.shiftIn
207        raise NotImplementedError("Unimplemented: TransportSequence.shiftIn")
208
209    def shiftOut(self):
210        # ITerminalTransport.shiftOut
211        raise NotImplementedError("Unimplemented: TransportSequence.shiftOut")
212
213    def singleShift2(self):
214        # ITerminalTransport.singleShift2
215        raise NotImplementedError("Unimplemented: TransportSequence.singleShift2")
216
217    def singleShift3(self):
218        # ITerminalTransport.singleShift3
219        raise NotImplementedError("Unimplemented: TransportSequence.singleShift3")
220
221    def selectGraphicRendition(self, *attributes):
222        # ITerminalTransport.selectGraphicRendition
223        raise NotImplementedError(
224            "Unimplemented: TransportSequence.selectGraphicRendition"
225        )
226
227    def horizontalTabulationSet(self):
228        # ITerminalTransport.horizontalTabulationSet
229        raise NotImplementedError(
230            "Unimplemented: TransportSequence.horizontalTabulationSet"
231        )
232
233    def tabulationClear(self):
234        # ITerminalTransport.tabulationClear
235        raise NotImplementedError("Unimplemented: TransportSequence.tabulationClear")
236
237    def tabulationClearAll(self):
238        # ITerminalTransport.tabulationClearAll
239        raise NotImplementedError("Unimplemented: TransportSequence.tabulationClearAll")
240
241    def doubleHeightLine(self, top=True):
242        # ITerminalTransport.doubleHeightLine
243        raise NotImplementedError("Unimplemented: TransportSequence.doubleHeightLine")
244
245    def singleWidthLine(self):
246        # ITerminalTransport.singleWidthLine
247        raise NotImplementedError("Unimplemented: TransportSequence.singleWidthLine")
248
249    def doubleWidthLine(self):
250        # ITerminalTransport.doubleWidthLine
251        raise NotImplementedError("Unimplemented: TransportSequence.doubleWidthLine")
252
253    def eraseToLineEnd(self):
254        # ITerminalTransport.eraseToLineEnd
255        raise NotImplementedError("Unimplemented: TransportSequence.eraseToLineEnd")
256
257    def eraseToLineBeginning(self):
258        # ITerminalTransport.eraseToLineBeginning
259        raise NotImplementedError(
260            "Unimplemented: TransportSequence.eraseToLineBeginning"
261        )
262
263    def eraseLine(self):
264        # ITerminalTransport.eraseLine
265        raise NotImplementedError("Unimplemented: TransportSequence.eraseLine")
266
267    def eraseToDisplayEnd(self):
268        # ITerminalTransport.eraseToDisplayEnd
269        raise NotImplementedError("Unimplemented: TransportSequence.eraseToDisplayEnd")
270
271    def eraseToDisplayBeginning(self):
272        # ITerminalTransport.eraseToDisplayBeginning
273        raise NotImplementedError(
274            "Unimplemented: TransportSequence.eraseToDisplayBeginning"
275        )
276
277    def eraseDisplay(self):
278        # ITerminalTransport.eraseDisplay
279        raise NotImplementedError("Unimplemented: TransportSequence.eraseDisplay")
280
281    def deleteCharacter(self, n=1):
282        # ITerminalTransport.deleteCharacter
283        raise NotImplementedError("Unimplemented: TransportSequence.deleteCharacter")
284
285    def insertLine(self, n=1):
286        # ITerminalTransport.insertLine
287        raise NotImplementedError("Unimplemented: TransportSequence.insertLine")
288
289    def deleteLine(self, n=1):
290        # ITerminalTransport.deleteLine
291        raise NotImplementedError("Unimplemented: TransportSequence.deleteLine")
292
293    def reportCursorPosition(self):
294        # ITerminalTransport.reportCursorPosition
295        raise NotImplementedError(
296            "Unimplemented: TransportSequence.reportCursorPosition"
297        )
298
299    def reset(self):
300        # ITerminalTransport.reset
301        raise NotImplementedError("Unimplemented: TransportSequence.reset")
302
303    def unhandledControlSequence(self, seq):
304        # ITerminalTransport.unhandledControlSequence
305        raise NotImplementedError(
306            "Unimplemented: TransportSequence.unhandledControlSequence"
307        )
308
309
310class LocalTerminalBufferMixin:
311    """
312    A mixin for RecvLine subclasses which records the state of the terminal.
313
314    This is accomplished by performing all L{ITerminalTransport} operations on both
315    the transport passed to makeConnection and an instance of helper.TerminalBuffer.
316
317    @ivar terminalCopy: A L{helper.TerminalBuffer} instance which efforts
318    will be made to keep up to date with the actual terminal
319    associated with this protocol instance.
320    """
321
322    def makeConnection(self, transport):
323        self.terminalCopy = helper.TerminalBuffer()
324        self.terminalCopy.connectionMade()
325        return super().makeConnection(TransportSequence(transport, self.terminalCopy))
326
327    def __str__(self) -> str:
328        return str(self.terminalCopy)
329
330
331class RecvLine(insults.TerminalProtocol):
332    """
333    L{TerminalProtocol} which adds line editing features.
334
335    Clients will be prompted for lines of input with all the usual
336    features: character echoing, left and right arrow support for
337    moving the cursor to different areas of the line buffer, backspace
338    and delete for removing characters, and insert for toggling
339    between typeover and insert mode.  Tabs will be expanded to enough
340    spaces to move the cursor to the next tabstop (every four
341    characters by default).  Enter causes the line buffer to be
342    cleared and the line to be passed to the lineReceived() method
343    which, by default, does nothing.  Subclasses are responsible for
344    redrawing the input prompt (this will probably change).
345    """
346
347    width = 80
348    height = 24
349
350    TABSTOP = 4
351
352    ps = (b">>> ", b"... ")
353    pn = 0
354    _printableChars = string.printable.encode("ascii")
355
356    _log = Logger()
357
358    def connectionMade(self):
359        # A list containing the characters making up the current line
360        self.lineBuffer = []
361
362        # A zero-based (wtf else?) index into self.lineBuffer.
363        # Indicates the current cursor position.
364        self.lineBufferIndex = 0
365
366        t = self.terminal
367        # A map of keyIDs to bound instance methods.
368        self.keyHandlers = {
369            t.LEFT_ARROW: self.handle_LEFT,
370            t.RIGHT_ARROW: self.handle_RIGHT,
371            t.TAB: self.handle_TAB,
372            # Both of these should not be necessary, but figuring out
373            # which is necessary is a huge hassle.
374            b"\r": self.handle_RETURN,
375            b"\n": self.handle_RETURN,
376            t.BACKSPACE: self.handle_BACKSPACE,
377            t.DELETE: self.handle_DELETE,
378            t.INSERT: self.handle_INSERT,
379            t.HOME: self.handle_HOME,
380            t.END: self.handle_END,
381        }
382
383        self.initializeScreen()
384
385    def initializeScreen(self):
386        # Hmm, state sucks.  Oh well.
387        # For now we will just take over the whole terminal.
388        self.terminal.reset()
389        self.terminal.write(self.ps[self.pn])
390        # XXX Note: I would prefer to default to starting in insert
391        # mode, however this does not seem to actually work!  I do not
392        # know why.  This is probably of interest to implementors
393        # subclassing RecvLine.
394
395        # XXX XXX Note: But the unit tests all expect the initial mode
396        # to be insert right now.  Fuck, there needs to be a way to
397        # query the current mode or something.
398        # self.setTypeoverMode()
399        self.setInsertMode()
400
401    def currentLineBuffer(self):
402        s = b"".join(self.lineBuffer)
403        return s[: self.lineBufferIndex], s[self.lineBufferIndex :]
404
405    def setInsertMode(self):
406        self.mode = "insert"
407        self.terminal.setModes([insults.modes.IRM])
408
409    def setTypeoverMode(self):
410        self.mode = "typeover"
411        self.terminal.resetModes([insults.modes.IRM])
412
413    def drawInputLine(self):
414        """
415        Write a line containing the current input prompt and the current line
416        buffer at the current cursor position.
417        """
418        self.terminal.write(self.ps[self.pn] + b"".join(self.lineBuffer))
419
420    def terminalSize(self, width, height):
421        # XXX - Clear the previous input line, redraw it at the new
422        # cursor position
423        self.terminal.eraseDisplay()
424        self.terminal.cursorHome()
425        self.width = width
426        self.height = height
427        self.drawInputLine()
428
429    def unhandledControlSequence(self, seq):
430        pass
431
432    def keystrokeReceived(self, keyID, modifier):
433        m = self.keyHandlers.get(keyID)
434        if m is not None:
435            m()
436        elif keyID in self._printableChars:
437            self.characterReceived(keyID, False)
438        else:
439            self._log.warn("Received unhandled keyID: {keyID!r}", keyID=keyID)
440
441    def characterReceived(self, ch, moreCharactersComing):
442        if self.mode == "insert":
443            self.lineBuffer.insert(self.lineBufferIndex, ch)
444        else:
445            self.lineBuffer[self.lineBufferIndex : self.lineBufferIndex + 1] = [ch]
446        self.lineBufferIndex += 1
447        self.terminal.write(ch)
448
449    def handle_TAB(self):
450        n = self.TABSTOP - (len(self.lineBuffer) % self.TABSTOP)
451        self.terminal.cursorForward(n)
452        self.lineBufferIndex += n
453        self.lineBuffer.extend(iterbytes(b" " * n))
454
455    def handle_LEFT(self):
456        if self.lineBufferIndex > 0:
457            self.lineBufferIndex -= 1
458            self.terminal.cursorBackward()
459
460    def handle_RIGHT(self):
461        if self.lineBufferIndex < len(self.lineBuffer):
462            self.lineBufferIndex += 1
463            self.terminal.cursorForward()
464
465    def handle_HOME(self):
466        if self.lineBufferIndex:
467            self.terminal.cursorBackward(self.lineBufferIndex)
468            self.lineBufferIndex = 0
469
470    def handle_END(self):
471        offset = len(self.lineBuffer) - self.lineBufferIndex
472        if offset:
473            self.terminal.cursorForward(offset)
474            self.lineBufferIndex = len(self.lineBuffer)
475
476    def handle_BACKSPACE(self):
477        if self.lineBufferIndex > 0:
478            self.lineBufferIndex -= 1
479            del self.lineBuffer[self.lineBufferIndex]
480            self.terminal.cursorBackward()
481            self.terminal.deleteCharacter()
482
483    def handle_DELETE(self):
484        if self.lineBufferIndex < len(self.lineBuffer):
485            del self.lineBuffer[self.lineBufferIndex]
486            self.terminal.deleteCharacter()
487
488    def handle_RETURN(self):
489        line = b"".join(self.lineBuffer)
490        self.lineBuffer = []
491        self.lineBufferIndex = 0
492        self.terminal.nextLine()
493        self.lineReceived(line)
494
495    def handle_INSERT(self):
496        assert self.mode in ("typeover", "insert")
497        if self.mode == "typeover":
498            self.setInsertMode()
499        else:
500            self.setTypeoverMode()
501
502    def lineReceived(self, line):
503        pass
504
505
506class HistoricRecvLine(RecvLine):
507    """
508    L{TerminalProtocol} which adds both basic line-editing features and input history.
509
510    Everything supported by L{RecvLine} is also supported by this class.  In addition, the
511    up and down arrows traverse the input history.  Each received line is automatically
512    added to the end of the input history.
513    """
514
515    def connectionMade(self):
516        RecvLine.connectionMade(self)
517
518        self.historyLines = []
519        self.historyPosition = 0
520
521        t = self.terminal
522        self.keyHandlers.update(
523            {t.UP_ARROW: self.handle_UP, t.DOWN_ARROW: self.handle_DOWN}
524        )
525
526    def currentHistoryBuffer(self):
527        b = tuple(self.historyLines)
528        return b[: self.historyPosition], b[self.historyPosition :]
529
530    def _deliverBuffer(self, buf):
531        if buf:
532            for ch in iterbytes(buf[:-1]):
533                self.characterReceived(ch, True)
534            self.characterReceived(buf[-1:], False)
535
536    def handle_UP(self):
537        if self.lineBuffer and self.historyPosition == len(self.historyLines):
538            self.historyLines.append(b"".join(self.lineBuffer))
539        if self.historyPosition > 0:
540            self.handle_HOME()
541            self.terminal.eraseToLineEnd()
542
543            self.historyPosition -= 1
544            self.lineBuffer = []
545
546            self._deliverBuffer(self.historyLines[self.historyPosition])
547
548    def handle_DOWN(self):
549        if self.historyPosition < len(self.historyLines) - 1:
550            self.handle_HOME()
551            self.terminal.eraseToLineEnd()
552
553            self.historyPosition += 1
554            self.lineBuffer = []
555
556            self._deliverBuffer(self.historyLines[self.historyPosition])
557        else:
558            self.handle_HOME()
559            self.terminal.eraseToLineEnd()
560
561            self.historyPosition = len(self.historyLines)
562            self.lineBuffer = []
563            self.lineBufferIndex = 0
564
565    def handle_RETURN(self):
566        if self.lineBuffer:
567            self.historyLines.append(b"".join(self.lineBuffer))
568        self.historyPosition = len(self.historyLines)
569        return RecvLine.handle_RETURN(self)
570