1# -*- test-case-name: twisted.test.test_stdio -*- 2 3"""Standard input/out/err support. 4 5Future Plans:: 6 7 support for stderr, perhaps 8 Rewrite to use the reactor instead of an ad-hoc mechanism for connecting 9 protocols to transport. 10 11Maintainer: James Y Knight 12""" 13 14from zope.interface import implements 15 16from twisted.internet import process, error, interfaces 17from twisted.python import log, failure 18 19 20class PipeAddress(object): 21 implements(interfaces.IAddress) 22 23 24class StandardIO(object): 25 implements(interfaces.ITransport, interfaces.IProducer, 26 interfaces.IConsumer, interfaces.IHalfCloseableDescriptor) 27 28 _reader = None 29 _writer = None 30 disconnected = False 31 disconnecting = False 32 33 def __init__(self, proto, stdin=0, stdout=1, reactor=None): 34 if reactor is None: 35 from twisted.internet import reactor 36 self.protocol = proto 37 38 self._writer = process.ProcessWriter(reactor, self, 'write', stdout) 39 self._reader = process.ProcessReader(reactor, self, 'read', stdin) 40 self._reader.startReading() 41 self.protocol.makeConnection(self) 42 43 # ITransport 44 45 # XXX Actually, see #3597. 46 def loseWriteConnection(self): 47 if self._writer is not None: 48 self._writer.loseConnection() 49 50 def write(self, data): 51 if self._writer is not None: 52 self._writer.write(data) 53 54 def writeSequence(self, data): 55 if self._writer is not None: 56 self._writer.writeSequence(data) 57 58 def loseConnection(self): 59 self.disconnecting = True 60 61 if self._writer is not None: 62 self._writer.loseConnection() 63 if self._reader is not None: 64 # Don't loseConnection, because we don't want to SIGPIPE it. 65 self._reader.stopReading() 66 67 def getPeer(self): 68 return PipeAddress() 69 70 def getHost(self): 71 return PipeAddress() 72 73 74 # Callbacks from process.ProcessReader/ProcessWriter 75 def childDataReceived(self, fd, data): 76 self.protocol.dataReceived(data) 77 78 def childConnectionLost(self, fd, reason): 79 if self.disconnected: 80 return 81 82 if reason.value.__class__ == error.ConnectionDone: 83 # Normal close 84 if fd == 'read': 85 self._readConnectionLost(reason) 86 else: 87 self._writeConnectionLost(reason) 88 else: 89 self.connectionLost(reason) 90 91 def connectionLost(self, reason): 92 self.disconnected = True 93 94 # Make sure to cleanup the other half 95 _reader = self._reader 96 _writer = self._writer 97 protocol = self.protocol 98 self._reader = self._writer = None 99 self.protocol = None 100 101 if _writer is not None and not _writer.disconnected: 102 _writer.connectionLost(reason) 103 104 if _reader is not None and not _reader.disconnected: 105 _reader.connectionLost(reason) 106 107 try: 108 protocol.connectionLost(reason) 109 except: 110 log.err() 111 112 def _writeConnectionLost(self, reason): 113 self._writer=None 114 if self.disconnecting: 115 self.connectionLost(reason) 116 return 117 118 p = interfaces.IHalfCloseableProtocol(self.protocol, None) 119 if p: 120 try: 121 p.writeConnectionLost() 122 except: 123 log.err() 124 self.connectionLost(failure.Failure()) 125 126 def _readConnectionLost(self, reason): 127 self._reader=None 128 p = interfaces.IHalfCloseableProtocol(self.protocol, None) 129 if p: 130 try: 131 p.readConnectionLost() 132 except: 133 log.err() 134 self.connectionLost(failure.Failure()) 135 else: 136 self.connectionLost(reason) 137 138 # IConsumer 139 def registerProducer(self, producer, streaming): 140 if self._writer is None: 141 producer.stopProducing() 142 else: 143 self._writer.registerProducer(producer, streaming) 144 145 def unregisterProducer(self): 146 if self._writer is not None: 147 self._writer.unregisterProducer() 148 149 # IProducer 150 def stopProducing(self): 151 self.loseConnection() 152 153 def pauseProducing(self): 154 if self._reader is not None: 155 self._reader.pauseProducing() 156 157 def resumeProducing(self): 158 if self._reader is not None: 159 self._reader.resumeProducing() 160 161 def stopReading(self): 162 """Compatibility only, don't use. Call pauseProducing.""" 163 self.pauseProducing() 164 165 def startReading(self): 166 """Compatibility only, don't use. Call resumeProducing.""" 167 self.resumeProducing() 168