1# -*- test-case-name: twisted.test.test_stdio -*-
2
3"""Standard input/out/err support.
4
5Future Plans::
6
7    support for stderr, perhaps
8    Rewrite to use the reactor instead of an ad-hoc mechanism for connecting
9        protocols to transport.
10
11Maintainer: James Y Knight
12"""
13
14from zope.interface import implements
15
16from twisted.internet import process, error, interfaces
17from twisted.python import log, failure
18
19
20class PipeAddress(object):
21    implements(interfaces.IAddress)
22
23
24class StandardIO(object):
25    implements(interfaces.ITransport, interfaces.IProducer,
26               interfaces.IConsumer, interfaces.IHalfCloseableDescriptor)
27
28    _reader = None
29    _writer = None
30    disconnected = False
31    disconnecting = False
32
33    def __init__(self, proto, stdin=0, stdout=1, reactor=None):
34        if reactor is None:
35            from twisted.internet import reactor
36        self.protocol = proto
37
38        self._writer = process.ProcessWriter(reactor, self, 'write', stdout)
39        self._reader = process.ProcessReader(reactor, self, 'read', stdin)
40        self._reader.startReading()
41        self.protocol.makeConnection(self)
42
43    # ITransport
44
45    # XXX Actually, see #3597.
46    def loseWriteConnection(self):
47        if self._writer is not None:
48            self._writer.loseConnection()
49
50    def write(self, data):
51        if self._writer is not None:
52            self._writer.write(data)
53
54    def writeSequence(self, data):
55        if self._writer is not None:
56            self._writer.writeSequence(data)
57
58    def loseConnection(self):
59        self.disconnecting = True
60
61        if self._writer is not None:
62            self._writer.loseConnection()
63        if self._reader is not None:
64            # Don't loseConnection, because we don't want to SIGPIPE it.
65            self._reader.stopReading()
66
67    def getPeer(self):
68        return PipeAddress()
69
70    def getHost(self):
71        return PipeAddress()
72
73
74    # Callbacks from process.ProcessReader/ProcessWriter
75    def childDataReceived(self, fd, data):
76        self.protocol.dataReceived(data)
77
78    def childConnectionLost(self, fd, reason):
79        if self.disconnected:
80            return
81
82        if reason.value.__class__ == error.ConnectionDone:
83            # Normal close
84            if fd == 'read':
85                self._readConnectionLost(reason)
86            else:
87                self._writeConnectionLost(reason)
88        else:
89            self.connectionLost(reason)
90
91    def connectionLost(self, reason):
92        self.disconnected = True
93
94        # Make sure to cleanup the other half
95        _reader = self._reader
96        _writer = self._writer
97        protocol = self.protocol
98        self._reader = self._writer = None
99        self.protocol = None
100
101        if _writer is not None and not _writer.disconnected:
102            _writer.connectionLost(reason)
103
104        if _reader is not None and not _reader.disconnected:
105            _reader.connectionLost(reason)
106
107        try:
108            protocol.connectionLost(reason)
109        except:
110            log.err()
111
112    def _writeConnectionLost(self, reason):
113        self._writer=None
114        if self.disconnecting:
115            self.connectionLost(reason)
116            return
117
118        p = interfaces.IHalfCloseableProtocol(self.protocol, None)
119        if p:
120            try:
121                p.writeConnectionLost()
122            except:
123                log.err()
124                self.connectionLost(failure.Failure())
125
126    def _readConnectionLost(self, reason):
127        self._reader=None
128        p = interfaces.IHalfCloseableProtocol(self.protocol, None)
129        if p:
130            try:
131                p.readConnectionLost()
132            except:
133                log.err()
134                self.connectionLost(failure.Failure())
135        else:
136            self.connectionLost(reason)
137
138    # IConsumer
139    def registerProducer(self, producer, streaming):
140        if self._writer is None:
141            producer.stopProducing()
142        else:
143            self._writer.registerProducer(producer, streaming)
144
145    def unregisterProducer(self):
146        if self._writer is not None:
147            self._writer.unregisterProducer()
148
149    # IProducer
150    def stopProducing(self):
151        self.loseConnection()
152
153    def pauseProducing(self):
154        if self._reader is not None:
155            self._reader.pauseProducing()
156
157    def resumeProducing(self):
158        if self._reader is not None:
159            self._reader.resumeProducing()
160
161    def stopReading(self):
162        """Compatibility only, don't use. Call pauseProducing."""
163        self.pauseProducing()
164
165    def startReading(self):
166        """Compatibility only, don't use. Call resumeProducing."""
167        self.resumeProducing()
168