1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Tests for L{twisted.internet.stdio}. 6""" 7 8from twisted.python.runtime import platform 9from twisted.internet.test.reactormixins import ReactorBuilder 10from twisted.internet.protocol import Protocol 11if not platform.isWindows(): 12 from twisted.internet._posixstdio import StandardIO 13 14 15 16class StdioFilesTests(ReactorBuilder): 17 """ 18 L{StandardIO} supports reading and writing to filesystem files. 19 """ 20 21 def setUp(self): 22 path = self.mktemp() 23 file(path, "w").close() 24 self.extraFile = file(path, "r+") 25 26 27 def test_addReader(self): 28 """ 29 Adding a filesystem file reader to a reactor will make sure it is 30 polled. 31 """ 32 reactor = self.buildReactor() 33 34 class DataProtocol(Protocol): 35 data = "" 36 def dataReceived(self, data): 37 self.data += data 38 # It'd be better to stop reactor on connectionLost, but that 39 # fails on FreeBSD, probably due to 40 # http://bugs.python.org/issue9591: 41 if self.data == "hello!": 42 reactor.stop() 43 44 path = self.mktemp() 45 f = file(path, "w") 46 f.write("hello!") 47 f.close() 48 f = file(path, "r") 49 50 # Read bytes from a file, deliver them to a protocol instance: 51 protocol = DataProtocol() 52 StandardIO(protocol, stdin=f.fileno(), 53 stdout=self.extraFile.fileno(), 54 reactor=reactor) 55 56 self.runReactor(reactor) 57 self.assertEqual(protocol.data, "hello!") 58 59 60 def test_addWriter(self): 61 """ 62 Adding a filesystem file writer to a reactor will make sure it is 63 polled. 64 """ 65 reactor = self.buildReactor() 66 67 class DisconnectProtocol(Protocol): 68 def connectionLost(self, reason): 69 reactor.stop() 70 71 path = self.mktemp() 72 f = file(path, "w") 73 74 # Write bytes to a transport, hopefully have them written to a file: 75 protocol = DisconnectProtocol() 76 StandardIO(protocol, stdout=f.fileno(), 77 stdin=self.extraFile.fileno(), reactor=reactor) 78 protocol.transport.write("hello") 79 protocol.transport.write(", world") 80 protocol.transport.loseConnection() 81 82 self.runReactor(reactor) 83 f.close() 84 f = file(path, "r") 85 self.assertEqual(f.read(), "hello, world") 86 f.close() 87 88 89 def test_removeReader(self): 90 """ 91 Removing a filesystem file reader from a reactor will make sure it is 92 no longer polled. 93 """ 94 reactor = self.buildReactor() 95 self.addCleanup(self.unbuildReactor, reactor) 96 97 path = self.mktemp() 98 file(path, "w").close() 99 # Cleanup might fail if file is GCed too soon: 100 self.f = f = file(path, "r") 101 102 # Have the reader added: 103 stdio = StandardIO(Protocol(), stdin=f.fileno(), 104 stdout=self.extraFile.fileno(), 105 reactor=reactor) 106 self.assertIn(stdio._reader, reactor.getReaders()) 107 stdio._reader.stopReading() 108 self.assertNotIn(stdio._reader, reactor.getReaders()) 109 110 111 def test_removeWriter(self): 112 """ 113 Removing a filesystem file writer from a reactor will make sure it is 114 no longer polled. 115 """ 116 reactor = self.buildReactor() 117 self.addCleanup(self.unbuildReactor, reactor) 118 119 # Cleanup might fail if file is GCed too soon: 120 self.f = f = file(self.mktemp(), "w") 121 122 # Have the reader added: 123 protocol = Protocol() 124 stdio = StandardIO(protocol, stdout=f.fileno(), 125 stdin=self.extraFile.fileno(), 126 reactor=reactor) 127 protocol.transport.write("hello") 128 self.assertIn(stdio._writer, reactor.getWriters()) 129 stdio._writer.stopWriting() 130 self.assertNotIn(stdio._writer, reactor.getWriters()) 131 132 133 def test_removeAll(self): 134 """ 135 Calling C{removeAll} on a reactor includes descriptors that are 136 filesystem files. 137 """ 138 reactor = self.buildReactor() 139 self.addCleanup(self.unbuildReactor, reactor) 140 141 path = self.mktemp() 142 file(path, "w").close() 143 # Cleanup might fail if file is GCed too soon: 144 self.f = f = file(path, "r") 145 146 # Have the reader added: 147 stdio = StandardIO(Protocol(), stdin=f.fileno(), 148 stdout=self.extraFile.fileno(), reactor=reactor) 149 # And then removed: 150 removed = reactor.removeAll() 151 self.assertIn(stdio._reader, removed) 152 self.assertNotIn(stdio._reader, reactor.getReaders()) 153 154 155 def test_getReaders(self): 156 """ 157 C{reactor.getReaders} includes descriptors that are filesystem files. 158 """ 159 reactor = self.buildReactor() 160 self.addCleanup(self.unbuildReactor, reactor) 161 162 path = self.mktemp() 163 file(path, "w").close() 164 # Cleanup might fail if file is GCed too soon: 165 self.f = f = file(path, "r") 166 167 # Have the reader added: 168 stdio = StandardIO(Protocol(), stdin=f.fileno(), 169 stdout=self.extraFile.fileno(), reactor=reactor) 170 self.assertIn(stdio._reader, reactor.getReaders()) 171 172 173 def test_getWriters(self): 174 """ 175 C{reactor.getWriters} includes descriptors that are filesystem files. 176 """ 177 reactor = self.buildReactor() 178 self.addCleanup(self.unbuildReactor, reactor) 179 180 # Cleanup might fail if file is GCed too soon: 181 self.f = f = file(self.mktemp(), "w") 182 183 # Have the reader added: 184 stdio = StandardIO(Protocol(), stdout=f.fileno(), 185 stdin=self.extraFile.fileno(), reactor=reactor) 186 self.assertNotIn(stdio._writer, reactor.getWriters()) 187 stdio._writer.startWriting() 188 self.assertIn(stdio._writer, reactor.getWriters()) 189 190 if platform.isWindows(): 191 skip = ("StandardIO does not accept stdout as an argument to Windows. " 192 "Testing redirection to a file is therefore harder.") 193 194 195globals().update(StdioFilesTests.makeTestCaseClasses()) 196