1import socket 2import selectors 3import telnetlib 4import threading 5import contextlib 6 7from test import support 8import unittest 9 10HOST = support.HOST 11 12def server(evt, serv): 13 serv.listen() 14 evt.set() 15 try: 16 conn, addr = serv.accept() 17 conn.close() 18 except socket.timeout: 19 pass 20 finally: 21 serv.close() 22 23class GeneralTests(unittest.TestCase): 24 25 def setUp(self): 26 self.evt = threading.Event() 27 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 28 self.sock.settimeout(60) # Safety net. Look issue 11812 29 self.port = support.bind_port(self.sock) 30 self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) 31 self.thread.setDaemon(True) 32 self.thread.start() 33 self.evt.wait() 34 35 def tearDown(self): 36 self.thread.join() 37 del self.thread # Clear out any dangling Thread objects. 38 39 def testBasic(self): 40 # connects 41 telnet = telnetlib.Telnet(HOST, self.port) 42 telnet.sock.close() 43 44 def testContextManager(self): 45 with telnetlib.Telnet(HOST, self.port) as tn: 46 self.assertIsNotNone(tn.get_socket()) 47 self.assertIsNone(tn.get_socket()) 48 49 def testTimeoutDefault(self): 50 self.assertTrue(socket.getdefaulttimeout() is None) 51 socket.setdefaulttimeout(30) 52 try: 53 telnet = telnetlib.Telnet(HOST, self.port) 54 finally: 55 socket.setdefaulttimeout(None) 56 self.assertEqual(telnet.sock.gettimeout(), 30) 57 telnet.sock.close() 58 59 def testTimeoutNone(self): 60 # None, having other default 61 self.assertTrue(socket.getdefaulttimeout() is None) 62 socket.setdefaulttimeout(30) 63 try: 64 telnet = telnetlib.Telnet(HOST, self.port, timeout=None) 65 finally: 66 socket.setdefaulttimeout(None) 67 self.assertTrue(telnet.sock.gettimeout() is None) 68 telnet.sock.close() 69 70 def testTimeoutValue(self): 71 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 72 self.assertEqual(telnet.sock.gettimeout(), 30) 73 telnet.sock.close() 74 75 def testTimeoutOpen(self): 76 telnet = telnetlib.Telnet() 77 telnet.open(HOST, self.port, timeout=30) 78 self.assertEqual(telnet.sock.gettimeout(), 30) 79 telnet.sock.close() 80 81 def testGetters(self): 82 # Test telnet getter methods 83 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 84 t_sock = telnet.sock 85 self.assertEqual(telnet.get_socket(), t_sock) 86 self.assertEqual(telnet.fileno(), t_sock.fileno()) 87 telnet.sock.close() 88 89class SocketStub(object): 90 ''' a socket proxy that re-defines sendall() ''' 91 def __init__(self, reads=()): 92 self.reads = list(reads) # Intentionally make a copy. 93 self.writes = [] 94 self.block = False 95 def sendall(self, data): 96 self.writes.append(data) 97 def recv(self, size): 98 out = b'' 99 while self.reads and len(out) < size: 100 out += self.reads.pop(0) 101 if len(out) > size: 102 self.reads.insert(0, out[size:]) 103 out = out[:size] 104 return out 105 106class TelnetAlike(telnetlib.Telnet): 107 def fileno(self): 108 raise NotImplementedError() 109 def close(self): pass 110 def sock_avail(self): 111 return (not self.sock.block) 112 def msg(self, msg, *args): 113 with support.captured_stdout() as out: 114 telnetlib.Telnet.msg(self, msg, *args) 115 self._messages += out.getvalue() 116 return 117 118class MockSelector(selectors.BaseSelector): 119 120 def __init__(self): 121 self.keys = {} 122 123 @property 124 def resolution(self): 125 return 1e-3 126 127 def register(self, fileobj, events, data=None): 128 key = selectors.SelectorKey(fileobj, 0, events, data) 129 self.keys[fileobj] = key 130 return key 131 132 def unregister(self, fileobj): 133 return self.keys.pop(fileobj) 134 135 def select(self, timeout=None): 136 block = False 137 for fileobj in self.keys: 138 if isinstance(fileobj, TelnetAlike): 139 block = fileobj.sock.block 140 break 141 if block: 142 return [] 143 else: 144 return [(key, key.events) for key in self.keys.values()] 145 146 def get_map(self): 147 return self.keys 148 149 150@contextlib.contextmanager 151def test_socket(reads): 152 def new_conn(*ignored): 153 return SocketStub(reads) 154 try: 155 old_conn = socket.create_connection 156 socket.create_connection = new_conn 157 yield None 158 finally: 159 socket.create_connection = old_conn 160 return 161 162def test_telnet(reads=(), cls=TelnetAlike): 163 ''' return a telnetlib.Telnet object that uses a SocketStub with 164 reads queued up to be read ''' 165 for x in reads: 166 assert type(x) is bytes, x 167 with test_socket(reads): 168 telnet = cls('dummy', 0) 169 telnet._messages = '' # debuglevel output 170 return telnet 171 172class ExpectAndReadTestCase(unittest.TestCase): 173 def setUp(self): 174 self.old_selector = telnetlib._TelnetSelector 175 telnetlib._TelnetSelector = MockSelector 176 def tearDown(self): 177 telnetlib._TelnetSelector = self.old_selector 178 179class ReadTests(ExpectAndReadTestCase): 180 def test_read_until(self): 181 """ 182 read_until(expected, timeout=None) 183 test the blocking version of read_util 184 """ 185 want = [b'xxxmatchyyy'] 186 telnet = test_telnet(want) 187 data = telnet.read_until(b'match') 188 self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads)) 189 190 reads = [b'x' * 50, b'match', b'y' * 50] 191 expect = b''.join(reads[:-1]) 192 telnet = test_telnet(reads) 193 data = telnet.read_until(b'match') 194 self.assertEqual(data, expect) 195 196 197 def test_read_all(self): 198 """ 199 read_all() 200 Read all data until EOF; may block. 201 """ 202 reads = [b'x' * 500, b'y' * 500, b'z' * 500] 203 expect = b''.join(reads) 204 telnet = test_telnet(reads) 205 data = telnet.read_all() 206 self.assertEqual(data, expect) 207 return 208 209 def test_read_some(self): 210 """ 211 read_some() 212 Read at least one byte or EOF; may block. 213 """ 214 # test 'at least one byte' 215 telnet = test_telnet([b'x' * 500]) 216 data = telnet.read_some() 217 self.assertTrue(len(data) >= 1) 218 # test EOF 219 telnet = test_telnet() 220 data = telnet.read_some() 221 self.assertEqual(b'', data) 222 223 def _read_eager(self, func_name): 224 """ 225 read_*_eager() 226 Read all data available already queued or on the socket, 227 without blocking. 228 """ 229 want = b'x' * 100 230 telnet = test_telnet([want]) 231 func = getattr(telnet, func_name) 232 telnet.sock.block = True 233 self.assertEqual(b'', func()) 234 telnet.sock.block = False 235 data = b'' 236 while True: 237 try: 238 data += func() 239 except EOFError: 240 break 241 self.assertEqual(data, want) 242 243 def test_read_eager(self): 244 # read_eager and read_very_eager make the same guarantees 245 # (they behave differently but we only test the guarantees) 246 self._read_eager('read_eager') 247 self._read_eager('read_very_eager') 248 # NB -- we need to test the IAC block which is mentioned in the 249 # docstring but not in the module docs 250 251 def read_very_lazy(self): 252 want = b'x' * 100 253 telnet = test_telnet([want]) 254 self.assertEqual(b'', telnet.read_very_lazy()) 255 while telnet.sock.reads: 256 telnet.fill_rawq() 257 data = telnet.read_very_lazy() 258 self.assertEqual(want, data) 259 self.assertRaises(EOFError, telnet.read_very_lazy) 260 261 def test_read_lazy(self): 262 want = b'x' * 100 263 telnet = test_telnet([want]) 264 self.assertEqual(b'', telnet.read_lazy()) 265 data = b'' 266 while True: 267 try: 268 read_data = telnet.read_lazy() 269 data += read_data 270 if not read_data: 271 telnet.fill_rawq() 272 except EOFError: 273 break 274 self.assertTrue(want.startswith(data)) 275 self.assertEqual(data, want) 276 277class nego_collector(object): 278 def __init__(self, sb_getter=None): 279 self.seen = b'' 280 self.sb_getter = sb_getter 281 self.sb_seen = b'' 282 283 def do_nego(self, sock, cmd, opt): 284 self.seen += cmd + opt 285 if cmd == tl.SE and self.sb_getter: 286 sb_data = self.sb_getter() 287 self.sb_seen += sb_data 288 289tl = telnetlib 290 291class WriteTests(unittest.TestCase): 292 '''The only thing that write does is replace each tl.IAC for 293 tl.IAC+tl.IAC''' 294 295 def test_write(self): 296 data_sample = [b'data sample without IAC', 297 b'data sample with' + tl.IAC + b' one IAC', 298 b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, 299 tl.IAC, 300 b''] 301 for data in data_sample: 302 telnet = test_telnet() 303 telnet.write(data) 304 written = b''.join(telnet.sock.writes) 305 self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written) 306 307class OptionTests(unittest.TestCase): 308 # RFC 854 commands 309 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] 310 311 def _test_command(self, data): 312 """ helper for testing IAC + cmd """ 313 telnet = test_telnet(data) 314 data_len = len(b''.join(data)) 315 nego = nego_collector() 316 telnet.set_option_negotiation_callback(nego.do_nego) 317 txt = telnet.read_all() 318 cmd = nego.seen 319 self.assertTrue(len(cmd) > 0) # we expect at least one command 320 self.assertIn(cmd[:1], self.cmds) 321 self.assertEqual(cmd[1:2], tl.NOOPT) 322 self.assertEqual(data_len, len(txt + cmd)) 323 nego.sb_getter = None # break the nego => telnet cycle 324 325 def test_IAC_commands(self): 326 for cmd in self.cmds: 327 self._test_command([tl.IAC, cmd]) 328 self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100]) 329 self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10]) 330 # all at once 331 self._test_command([tl.IAC + cmd for (cmd) in self.cmds]) 332 333 def test_SB_commands(self): 334 # RFC 855, subnegotiations portion 335 send = [tl.IAC + tl.SB + tl.IAC + tl.SE, 336 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, 337 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, 338 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, 339 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, 340 ] 341 telnet = test_telnet(send) 342 nego = nego_collector(telnet.read_sb_data) 343 telnet.set_option_negotiation_callback(nego.do_nego) 344 txt = telnet.read_all() 345 self.assertEqual(txt, b'') 346 want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd' 347 self.assertEqual(nego.sb_seen, want_sb_data) 348 self.assertEqual(b'', telnet.read_sb_data()) 349 nego.sb_getter = None # break the nego => telnet cycle 350 351 def test_debuglevel_reads(self): 352 # test all the various places that self.msg(...) is called 353 given_a_expect_b = [ 354 # Telnet.fill_rawq 355 (b'a', ": recv b''\n"), 356 # Telnet.process_rawq 357 (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"), 358 (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"), 359 (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), 360 (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), 361 (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), 362 ] 363 for a, b in given_a_expect_b: 364 telnet = test_telnet([a]) 365 telnet.set_debuglevel(1) 366 txt = telnet.read_all() 367 self.assertIn(b, telnet._messages) 368 return 369 370 def test_debuglevel_write(self): 371 telnet = test_telnet() 372 telnet.set_debuglevel(1) 373 telnet.write(b'xxx') 374 expected = "send b'xxx'\n" 375 self.assertIn(expected, telnet._messages) 376 377 def test_debug_accepts_str_port(self): 378 # Issue 10695 379 with test_socket([]): 380 telnet = TelnetAlike('dummy', '0') 381 telnet._messages = '' 382 telnet.set_debuglevel(1) 383 telnet.msg('test') 384 self.assertRegex(telnet._messages, r'0.*test') 385 386 387class ExpectTests(ExpectAndReadTestCase): 388 def test_expect(self): 389 """ 390 expect(expected, [timeout]) 391 Read until the expected string has been seen, or a timeout is 392 hit (default is no timeout); may block. 393 """ 394 want = [b'x' * 10, b'match', b'y' * 10] 395 telnet = test_telnet(want) 396 (_,_,data) = telnet.expect([b'match']) 397 self.assertEqual(data, b''.join(want[:-1])) 398 399 400if __name__ == '__main__': 401 unittest.main() 402