1import unittest 2from test import test_support 3 4import errno 5import itertools 6import socket 7import select 8import time 9import traceback 10import Queue 11import sys 12import os 13import array 14import contextlib 15import signal 16import math 17import weakref 18try: 19 import _socket 20except ImportError: 21 _socket = None 22 23 24MAIN_TIMEOUT = 60.0 25 26 27def try_address(host, port=0, family=socket.AF_INET): 28 """Try to bind a socket on the given host:port and return True 29 if that has been possible.""" 30 try: 31 sock = socket.socket(family, socket.SOCK_STREAM) 32 sock.bind((host, port)) 33 except (socket.error, socket.gaierror): 34 return False 35 else: 36 sock.close() 37 return True 38 39HOST = test_support.HOST 40MSG = b'Michael Gilfix was here\n' 41SUPPORTS_IPV6 = test_support.IPV6_ENABLED 42 43try: 44 import thread 45 import threading 46except ImportError: 47 thread = None 48 threading = None 49 50HOST = test_support.HOST 51MSG = 'Michael Gilfix was here\n' 52 53class SocketTCPTest(unittest.TestCase): 54 55 def setUp(self): 56 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 57 self.port = test_support.bind_port(self.serv) 58 self.serv.listen(1) 59 60 def tearDown(self): 61 self.serv.close() 62 self.serv = None 63 64class SocketUDPTest(unittest.TestCase): 65 66 def setUp(self): 67 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 68 self.port = test_support.bind_port(self.serv) 69 70 def tearDown(self): 71 self.serv.close() 72 self.serv = None 73 74class ThreadableTest: 75 """Threadable Test class 76 77 The ThreadableTest class makes it easy to create a threaded 78 client/server pair from an existing unit test. To create a 79 new threaded class from an existing unit test, use multiple 80 inheritance: 81 82 class NewClass (OldClass, ThreadableTest): 83 pass 84 85 This class defines two new fixture functions with obvious 86 purposes for overriding: 87 88 clientSetUp () 89 clientTearDown () 90 91 Any new test functions within the class must then define 92 tests in pairs, where the test name is preceded with a 93 '_' to indicate the client portion of the test. Ex: 94 95 def testFoo(self): 96 # Server portion 97 98 def _testFoo(self): 99 # Client portion 100 101 Any exceptions raised by the clients during their tests 102 are caught and transferred to the main thread to alert 103 the testing framework. 104 105 Note, the server setup function cannot call any blocking 106 functions that rely on the client thread during setup, 107 unless serverExplicitReady() is called just before 108 the blocking call (such as in setting up a client/server 109 connection and performing the accept() in setUp(). 110 """ 111 112 def __init__(self): 113 # Swap the true setup function 114 self.__setUp = self.setUp 115 self.__tearDown = self.tearDown 116 self.setUp = self._setUp 117 self.tearDown = self._tearDown 118 119 def serverExplicitReady(self): 120 """This method allows the server to explicitly indicate that 121 it wants the client thread to proceed. This is useful if the 122 server is about to execute a blocking routine that is 123 dependent upon the client thread during its setup routine.""" 124 self.server_ready.set() 125 126 def _setUp(self): 127 self.server_ready = threading.Event() 128 self.client_ready = threading.Event() 129 self.done = threading.Event() 130 self.queue = Queue.Queue(1) 131 132 # Do some munging to start the client test. 133 methodname = self.id() 134 i = methodname.rfind('.') 135 methodname = methodname[i+1:] 136 test_method = getattr(self, '_' + methodname) 137 self.client_thread = thread.start_new_thread( 138 self.clientRun, (test_method,)) 139 140 self.__setUp() 141 if not self.server_ready.is_set(): 142 self.server_ready.set() 143 self.client_ready.wait() 144 145 def _tearDown(self): 146 self.__tearDown() 147 self.done.wait() 148 149 if not self.queue.empty(): 150 msg = self.queue.get() 151 self.fail(msg) 152 153 def clientRun(self, test_func): 154 self.server_ready.wait() 155 self.clientSetUp() 156 self.client_ready.set() 157 if not callable(test_func): 158 raise TypeError("test_func must be a callable function.") 159 try: 160 test_func() 161 except Exception, strerror: 162 self.queue.put(strerror) 163 self.clientTearDown() 164 165 def clientSetUp(self): 166 raise NotImplementedError("clientSetUp must be implemented.") 167 168 def clientTearDown(self): 169 self.done.set() 170 thread.exit() 171 172class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 173 174 def __init__(self, methodName='runTest'): 175 SocketTCPTest.__init__(self, methodName=methodName) 176 ThreadableTest.__init__(self) 177 178 def clientSetUp(self): 179 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 180 181 def clientTearDown(self): 182 self.cli.close() 183 self.cli = None 184 ThreadableTest.clientTearDown(self) 185 186class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 187 188 def __init__(self, methodName='runTest'): 189 SocketUDPTest.__init__(self, methodName=methodName) 190 ThreadableTest.__init__(self) 191 192 def clientSetUp(self): 193 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 194 195 def clientTearDown(self): 196 self.cli.close() 197 self.cli = None 198 ThreadableTest.clientTearDown(self) 199 200class SocketConnectedTest(ThreadedTCPSocketTest): 201 202 def __init__(self, methodName='runTest'): 203 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 204 205 def setUp(self): 206 ThreadedTCPSocketTest.setUp(self) 207 # Indicate explicitly we're ready for the client thread to 208 # proceed and then perform the blocking call to accept 209 self.serverExplicitReady() 210 conn, addr = self.serv.accept() 211 self.cli_conn = conn 212 213 def tearDown(self): 214 self.cli_conn.close() 215 self.cli_conn = None 216 ThreadedTCPSocketTest.tearDown(self) 217 218 def clientSetUp(self): 219 ThreadedTCPSocketTest.clientSetUp(self) 220 self.cli.connect((HOST, self.port)) 221 self.serv_conn = self.cli 222 223 def clientTearDown(self): 224 self.serv_conn.close() 225 self.serv_conn = None 226 ThreadedTCPSocketTest.clientTearDown(self) 227 228class SocketPairTest(unittest.TestCase, ThreadableTest): 229 230 def __init__(self, methodName='runTest'): 231 unittest.TestCase.__init__(self, methodName=methodName) 232 ThreadableTest.__init__(self) 233 234 def setUp(self): 235 self.serv, self.cli = socket.socketpair() 236 237 def tearDown(self): 238 self.serv.close() 239 self.serv = None 240 241 def clientSetUp(self): 242 pass 243 244 def clientTearDown(self): 245 self.cli.close() 246 self.cli = None 247 ThreadableTest.clientTearDown(self) 248 249 250####################################################################### 251## Begin Tests 252 253class GeneralModuleTests(unittest.TestCase): 254 255 @unittest.skipUnless(_socket is not None, 'need _socket module') 256 def test_csocket_repr(self): 257 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 258 try: 259 expected = ('<socket object, fd=%s, family=%s, type=%s, protocol=%s>' 260 % (s.fileno(), s.family, s.type, s.proto)) 261 self.assertEqual(repr(s), expected) 262 finally: 263 s.close() 264 expected = ('<socket object, fd=-1, family=%s, type=%s, protocol=%s>' 265 % (s.family, s.type, s.proto)) 266 self.assertEqual(repr(s), expected) 267 268 def test_weakref(self): 269 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 270 p = weakref.proxy(s) 271 self.assertEqual(p.fileno(), s.fileno()) 272 s.close() 273 s = None 274 try: 275 p.fileno() 276 except ReferenceError: 277 pass 278 else: 279 self.fail('Socket proxy still exists') 280 281 def test_weakref__sock(self): 282 s = socket.socket()._sock 283 w = weakref.ref(s) 284 self.assertIs(w(), s) 285 del s 286 test_support.gc_collect() 287 self.assertIsNone(w()) 288 289 def testSocketError(self): 290 # Testing socket module exceptions 291 def raise_error(*args, **kwargs): 292 raise socket.error 293 def raise_herror(*args, **kwargs): 294 raise socket.herror 295 def raise_gaierror(*args, **kwargs): 296 raise socket.gaierror 297 self.assertRaises(socket.error, raise_error, 298 "Error raising socket exception.") 299 self.assertRaises(socket.error, raise_herror, 300 "Error raising socket exception.") 301 self.assertRaises(socket.error, raise_gaierror, 302 "Error raising socket exception.") 303 304 def testSendtoErrors(self): 305 # Testing that sendto doesn't mask failures. See #10169. 306 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 307 self.addCleanup(s.close) 308 s.bind(('', 0)) 309 sockname = s.getsockname() 310 # 2 args 311 with self.assertRaises(UnicodeEncodeError): 312 s.sendto(u'\u2620', sockname) 313 with self.assertRaises(TypeError) as cm: 314 s.sendto(5j, sockname) 315 self.assertIn('not complex', str(cm.exception)) 316 with self.assertRaises(TypeError) as cm: 317 s.sendto('foo', None) 318 self.assertIn('not NoneType', str(cm.exception)) 319 # 3 args 320 with self.assertRaises(UnicodeEncodeError): 321 s.sendto(u'\u2620', 0, sockname) 322 with self.assertRaises(TypeError) as cm: 323 s.sendto(5j, 0, sockname) 324 self.assertIn('not complex', str(cm.exception)) 325 with self.assertRaises(TypeError) as cm: 326 s.sendto('foo', 0, None) 327 self.assertIn('not NoneType', str(cm.exception)) 328 with self.assertRaises(TypeError) as cm: 329 s.sendto('foo', 'bar', sockname) 330 self.assertIn('an integer is required', str(cm.exception)) 331 with self.assertRaises(TypeError) as cm: 332 s.sendto('foo', None, None) 333 self.assertIn('an integer is required', str(cm.exception)) 334 # wrong number of args 335 with self.assertRaises(TypeError) as cm: 336 s.sendto('foo') 337 self.assertIn('(1 given)', str(cm.exception)) 338 with self.assertRaises(TypeError) as cm: 339 s.sendto('foo', 0, sockname, 4) 340 self.assertIn('(4 given)', str(cm.exception)) 341 342 343 def testCrucialConstants(self): 344 # Testing for mission critical constants 345 socket.AF_INET 346 socket.SOCK_STREAM 347 socket.SOCK_DGRAM 348 socket.SOCK_RAW 349 socket.SOCK_RDM 350 socket.SOCK_SEQPACKET 351 socket.SOL_SOCKET 352 socket.SO_REUSEADDR 353 354 def testHostnameRes(self): 355 # Testing hostname resolution mechanisms 356 hostname = socket.gethostname() 357 try: 358 ip = socket.gethostbyname(hostname) 359 except socket.error: 360 # Probably name lookup wasn't set up right; skip this test 361 self.skipTest('name lookup failure') 362 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 363 try: 364 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 365 except socket.error: 366 # Probably a similar problem as above; skip this test 367 self.skipTest('address lookup failure') 368 all_host_names = [hostname, hname] + aliases 369 fqhn = socket.getfqdn(ip) 370 if not fqhn in all_host_names: 371 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 372 373 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 374 'test needs sys.getrefcount()') 375 def testRefCountGetNameInfo(self): 376 # Testing reference count for getnameinfo 377 try: 378 # On some versions, this loses a reference 379 orig = sys.getrefcount(__name__) 380 socket.getnameinfo(__name__,0) 381 except TypeError: 382 self.assertEqual(sys.getrefcount(__name__), orig, 383 "socket.getnameinfo loses a reference") 384 385 def testInterpreterCrash(self): 386 # Making sure getnameinfo doesn't crash the interpreter 387 try: 388 # On some versions, this crashes the interpreter. 389 socket.getnameinfo(('x', 0, 0, 0), 0) 390 except socket.error: 391 pass 392 393 def testNtoH(self): 394 # This just checks that htons etc. are their own inverse, 395 # when looking at the lower 16 or 32 bits. 396 sizes = {socket.htonl: 32, socket.ntohl: 32, 397 socket.htons: 16, socket.ntohs: 16} 398 for func, size in sizes.items(): 399 mask = (1L<<size) - 1 400 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 401 self.assertEqual(i & mask, func(func(i&mask)) & mask) 402 403 swapped = func(mask) 404 self.assertEqual(swapped & mask, mask) 405 self.assertRaises(OverflowError, func, 1L<<34) 406 407 def testNtoHErrors(self): 408 good_values = [ 1, 2, 3, 1L, 2L, 3L ] 409 bad_values = [ -1, -2, -3, -1L, -2L, -3L ] 410 for k in good_values: 411 socket.ntohl(k) 412 socket.ntohs(k) 413 socket.htonl(k) 414 socket.htons(k) 415 for k in bad_values: 416 self.assertRaises(OverflowError, socket.ntohl, k) 417 self.assertRaises(OverflowError, socket.ntohs, k) 418 self.assertRaises(OverflowError, socket.htonl, k) 419 self.assertRaises(OverflowError, socket.htons, k) 420 421 def testGetServBy(self): 422 eq = self.assertEqual 423 # Find one service that exists, then check all the related interfaces. 424 # I've ordered this by protocols that have both a tcp and udp 425 # protocol, at least for modern Linuxes. 426 if (sys.platform.startswith('linux') or 427 sys.platform.startswith('freebsd') or 428 sys.platform.startswith('netbsd') or 429 sys.platform == 'darwin'): 430 # avoid the 'echo' service on this platform, as there is an 431 # assumption breaking non-standard port/protocol entry 432 services = ('daytime', 'qotd', 'domain') 433 else: 434 services = ('echo', 'daytime', 'domain') 435 for service in services: 436 try: 437 port = socket.getservbyname(service, 'tcp') 438 break 439 except socket.error: 440 pass 441 else: 442 raise socket.error 443 # Try same call with optional protocol omitted 444 port2 = socket.getservbyname(service) 445 eq(port, port2) 446 # Try udp, but don't barf if it doesn't exist 447 try: 448 udpport = socket.getservbyname(service, 'udp') 449 except socket.error: 450 udpport = None 451 else: 452 eq(udpport, port) 453 # Now make sure the lookup by port returns the same service name 454 eq(socket.getservbyport(port2), service) 455 eq(socket.getservbyport(port, 'tcp'), service) 456 if udpport is not None: 457 eq(socket.getservbyport(udpport, 'udp'), service) 458 # Make sure getservbyport does not accept out of range ports. 459 self.assertRaises(OverflowError, socket.getservbyport, -1) 460 self.assertRaises(OverflowError, socket.getservbyport, 65536) 461 462 def testDefaultTimeout(self): 463 # Testing default timeout 464 # The default timeout should initially be None 465 self.assertEqual(socket.getdefaulttimeout(), None) 466 s = socket.socket() 467 self.assertEqual(s.gettimeout(), None) 468 s.close() 469 470 # Set the default timeout to 10, and see if it propagates 471 socket.setdefaulttimeout(10) 472 self.assertEqual(socket.getdefaulttimeout(), 10) 473 s = socket.socket() 474 self.assertEqual(s.gettimeout(), 10) 475 s.close() 476 477 # Reset the default timeout to None, and see if it propagates 478 socket.setdefaulttimeout(None) 479 self.assertEqual(socket.getdefaulttimeout(), None) 480 s = socket.socket() 481 self.assertEqual(s.gettimeout(), None) 482 s.close() 483 484 # Check that setting it to an invalid value raises ValueError 485 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 486 487 # Check that setting it to an invalid type raises TypeError 488 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 489 490 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 491 'test needs socket.inet_aton()') 492 def testIPv4_inet_aton_fourbytes(self): 493 # Test that issue1008086 and issue767150 are fixed. 494 # It must return 4 bytes. 495 self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0')) 496 self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255')) 497 498 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 499 'test needs socket.inet_pton()') 500 def testIPv4toString(self): 501 from socket import inet_aton as f, inet_pton, AF_INET 502 g = lambda a: inet_pton(AF_INET, a) 503 504 self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0')) 505 self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0')) 506 self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170')) 507 self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4')) 508 self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255')) 509 510 self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0')) 511 self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0')) 512 self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170')) 513 self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255')) 514 515 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 516 'test needs socket.inet_pton()') 517 def testIPv6toString(self): 518 try: 519 from socket import inet_pton, AF_INET6, has_ipv6 520 if not has_ipv6: 521 self.skipTest('IPv6 not available') 522 except ImportError: 523 self.skipTest('could not import needed symbols from socket') 524 f = lambda a: inet_pton(AF_INET6, a) 525 526 self.assertEqual('\x00' * 16, f('::')) 527 self.assertEqual('\x00' * 16, f('0::0')) 528 self.assertEqual('\x00\x01' + '\x00' * 14, f('1::')) 529 self.assertEqual( 530 '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 531 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 532 ) 533 534 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 535 'test needs socket.inet_ntop()') 536 def testStringToIPv4(self): 537 from socket import inet_ntoa as f, inet_ntop, AF_INET 538 g = lambda a: inet_ntop(AF_INET, a) 539 540 self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00')) 541 self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55')) 542 self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff')) 543 self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04')) 544 545 self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00')) 546 self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55')) 547 self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff')) 548 549 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 550 'test needs socket.inet_ntop()') 551 def testStringToIPv6(self): 552 try: 553 from socket import inet_ntop, AF_INET6, has_ipv6 554 if not has_ipv6: 555 self.skipTest('IPv6 not available') 556 except ImportError: 557 self.skipTest('could not import needed symbols from socket') 558 f = lambda a: inet_ntop(AF_INET6, a) 559 560 self.assertEqual('::', f('\x00' * 16)) 561 self.assertEqual('::1', f('\x00' * 15 + '\x01')) 562 self.assertEqual( 563 'aef:b01:506:1001:ffff:9997:55:170', 564 f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 565 ) 566 567 # XXX The following don't test module-level functionality... 568 569 def _get_unused_port(self, bind_address='0.0.0.0'): 570 """Use a temporary socket to elicit an unused ephemeral port. 571 572 Args: 573 bind_address: Hostname or IP address to search for a port on. 574 575 Returns: A most likely to be unused port. 576 """ 577 tempsock = socket.socket() 578 tempsock.bind((bind_address, 0)) 579 host, port = tempsock.getsockname() 580 tempsock.close() 581 return port 582 583 def testSockName(self): 584 # Testing getsockname() 585 port = self._get_unused_port() 586 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 587 self.addCleanup(sock.close) 588 sock.bind(("0.0.0.0", port)) 589 name = sock.getsockname() 590 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 591 # it reasonable to get the host's addr in addition to 0.0.0.0. 592 # At least for eCos. This is required for the S/390 to pass. 593 try: 594 my_ip_addr = socket.gethostbyname(socket.gethostname()) 595 except socket.error: 596 # Probably name lookup wasn't set up right; skip this test 597 self.skipTest('name lookup failure') 598 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 599 self.assertEqual(name[1], port) 600 601 def testGetSockOpt(self): 602 # Testing getsockopt() 603 # We know a socket should start without reuse==0 604 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 605 self.addCleanup(sock.close) 606 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 607 self.assertFalse(reuse != 0, "initial mode is reuse") 608 609 def testSetSockOpt(self): 610 # Testing setsockopt() 611 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 612 self.addCleanup(sock.close) 613 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 614 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 615 self.assertFalse(reuse == 0, "failed to set reuse mode") 616 617 def testSendAfterClose(self): 618 # testing send() after close() with timeout 619 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 620 sock.settimeout(1) 621 sock.close() 622 self.assertRaises(socket.error, sock.send, "spam") 623 624 def testNewAttributes(self): 625 # testing .family, .type and .protocol 626 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 627 self.assertEqual(sock.family, socket.AF_INET) 628 self.assertEqual(sock.type, socket.SOCK_STREAM) 629 self.assertEqual(sock.proto, 0) 630 sock.close() 631 632 def test_getsockaddrarg(self): 633 sock = socket.socket() 634 self.addCleanup(sock.close) 635 port = test_support.find_unused_port() 636 big_port = port + 65536 637 neg_port = port - 65536 638 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 639 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 640 # Since find_unused_port() is inherently subject to race conditions, we 641 # call it a couple times if necessary. 642 for i in itertools.count(): 643 port = test_support.find_unused_port() 644 try: 645 sock.bind((HOST, port)) 646 except OSError as e: 647 if e.errno != errno.EADDRINUSE or i == 5: 648 raise 649 else: 650 break 651 652 @unittest.skipUnless(os.name == "nt", "Windows specific") 653 def test_sock_ioctl(self): 654 self.assertTrue(hasattr(socket.socket, 'ioctl')) 655 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 656 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 657 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 658 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 659 s = socket.socket() 660 self.addCleanup(s.close) 661 self.assertRaises(ValueError, s.ioctl, -1, None) 662 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 663 664 def testGetaddrinfo(self): 665 try: 666 socket.getaddrinfo('localhost', 80) 667 except socket.gaierror as err: 668 if err.errno == socket.EAI_SERVICE: 669 # see http://bugs.python.org/issue1282647 670 self.skipTest("buggy libc version") 671 raise 672 # len of every sequence is supposed to be == 5 673 for info in socket.getaddrinfo(HOST, None): 674 self.assertEqual(len(info), 5) 675 # host can be a domain name, a string representation of an 676 # IPv4/v6 address or None 677 socket.getaddrinfo('localhost', 80) 678 socket.getaddrinfo('127.0.0.1', 80) 679 socket.getaddrinfo(None, 80) 680 if SUPPORTS_IPV6: 681 socket.getaddrinfo('::1', 80) 682 # port can be a string service name such as "http", a numeric 683 # port number (int or long), or None 684 socket.getaddrinfo(HOST, "http") 685 socket.getaddrinfo(HOST, 80) 686 socket.getaddrinfo(HOST, 80L) 687 socket.getaddrinfo(HOST, None) 688 # test family and socktype filters 689 infos = socket.getaddrinfo(HOST, None, socket.AF_INET) 690 for family, _, _, _, _ in infos: 691 self.assertEqual(family, socket.AF_INET) 692 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 693 for _, socktype, _, _, _ in infos: 694 self.assertEqual(socktype, socket.SOCK_STREAM) 695 # test proto and flags arguments 696 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 697 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 698 # a server willing to support both IPv4 and IPv6 will 699 # usually do this 700 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 701 socket.AI_PASSIVE) 702 703 # Issue 17269: test workaround for OS X platform bug segfault 704 if hasattr(socket, 'AI_NUMERICSERV'): 705 try: 706 # The arguments here are undefined and the call may succeed 707 # or fail. All we care here is that it doesn't segfault. 708 socket.getaddrinfo("localhost", None, 0, 0, 0, 709 socket.AI_NUMERICSERV) 710 except socket.gaierror: 711 pass 712 713 def check_sendall_interrupted(self, with_timeout): 714 # socketpair() is not strictly required, but it makes things easier. 715 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 716 self.skipTest("signal.alarm and socket.socketpair required for this test") 717 # Our signal handlers clobber the C errno by calling a math function 718 # with an invalid domain value. 719 def ok_handler(*args): 720 self.assertRaises(ValueError, math.acosh, 0) 721 def raising_handler(*args): 722 self.assertRaises(ValueError, math.acosh, 0) 723 1 // 0 724 c, s = socket.socketpair() 725 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 726 try: 727 if with_timeout: 728 # Just above the one second minimum for signal.alarm 729 c.settimeout(1.5) 730 with self.assertRaises(ZeroDivisionError): 731 signal.alarm(1) 732 c.sendall(b"x" * test_support.SOCK_MAX_SIZE) 733 if with_timeout: 734 signal.signal(signal.SIGALRM, ok_handler) 735 signal.alarm(1) 736 self.assertRaises(socket.timeout, c.sendall, 737 b"x" * test_support.SOCK_MAX_SIZE) 738 finally: 739 signal.alarm(0) 740 signal.signal(signal.SIGALRM, old_alarm) 741 c.close() 742 s.close() 743 744 def test_sendall_interrupted(self): 745 self.check_sendall_interrupted(False) 746 747 def test_sendall_interrupted_with_timeout(self): 748 self.check_sendall_interrupted(True) 749 750 def test_listen_backlog(self): 751 for backlog in 0, -1: 752 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 753 srv.bind((HOST, 0)) 754 srv.listen(backlog) 755 srv.close() 756 757 @test_support.cpython_only 758 def test_listen_backlog_overflow(self): 759 # Issue 15989 760 import _testcapi 761 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 762 srv.bind((HOST, 0)) 763 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 764 srv.close() 765 766 @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.') 767 def test_flowinfo(self): 768 self.assertRaises(OverflowError, socket.getnameinfo, 769 ('::1',0, 0xffffffff), 0) 770 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 771 try: 772 self.assertRaises(OverflowError, s.bind, ('::1', 0, -10)) 773 finally: 774 s.close() 775 776 777@unittest.skipUnless(thread, 'Threading required for this test.') 778class BasicTCPTest(SocketConnectedTest): 779 780 def __init__(self, methodName='runTest'): 781 SocketConnectedTest.__init__(self, methodName=methodName) 782 783 def testRecv(self): 784 # Testing large receive over TCP 785 msg = self.cli_conn.recv(1024) 786 self.assertEqual(msg, MSG) 787 788 def _testRecv(self): 789 self.serv_conn.send(MSG) 790 791 def testOverFlowRecv(self): 792 # Testing receive in chunks over TCP 793 seg1 = self.cli_conn.recv(len(MSG) - 3) 794 seg2 = self.cli_conn.recv(1024) 795 msg = seg1 + seg2 796 self.assertEqual(msg, MSG) 797 798 def _testOverFlowRecv(self): 799 self.serv_conn.send(MSG) 800 801 def testRecvFrom(self): 802 # Testing large recvfrom() over TCP 803 msg, addr = self.cli_conn.recvfrom(1024) 804 self.assertEqual(msg, MSG) 805 806 def _testRecvFrom(self): 807 self.serv_conn.send(MSG) 808 809 def testOverFlowRecvFrom(self): 810 # Testing recvfrom() in chunks over TCP 811 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 812 seg2, addr = self.cli_conn.recvfrom(1024) 813 msg = seg1 + seg2 814 self.assertEqual(msg, MSG) 815 816 def _testOverFlowRecvFrom(self): 817 self.serv_conn.send(MSG) 818 819 def testSendAll(self): 820 # Testing sendall() with a 2048 byte string over TCP 821 msg = '' 822 while 1: 823 read = self.cli_conn.recv(1024) 824 if not read: 825 break 826 msg += read 827 self.assertEqual(msg, 'f' * 2048) 828 829 def _testSendAll(self): 830 big_chunk = 'f' * 2048 831 self.serv_conn.sendall(big_chunk) 832 833 @unittest.skipUnless(hasattr(socket, 'fromfd'), 834 'socket.fromfd not available') 835 def testFromFd(self): 836 # Testing fromfd() 837 fd = self.cli_conn.fileno() 838 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 839 self.addCleanup(sock.close) 840 msg = sock.recv(1024) 841 self.assertEqual(msg, MSG) 842 843 def _testFromFd(self): 844 self.serv_conn.send(MSG) 845 846 def testDup(self): 847 # Testing dup() 848 sock = self.cli_conn.dup() 849 self.addCleanup(sock.close) 850 msg = sock.recv(1024) 851 self.assertEqual(msg, MSG) 852 853 def _testDup(self): 854 self.serv_conn.send(MSG) 855 856 def testShutdown(self): 857 # Testing shutdown() 858 msg = self.cli_conn.recv(1024) 859 self.assertEqual(msg, MSG) 860 # wait for _testShutdown to finish: on OS X, when the server 861 # closes the connection the client also becomes disconnected, 862 # and the client's shutdown call will fail. (Issue #4397.) 863 self.done.wait() 864 865 def _testShutdown(self): 866 self.serv_conn.send(MSG) 867 self.serv_conn.shutdown(2) 868 869 testShutdown_overflow = test_support.cpython_only(testShutdown) 870 871 @test_support.cpython_only 872 def _testShutdown_overflow(self): 873 import _testcapi 874 self.serv_conn.send(MSG) 875 # Issue 15989 876 self.assertRaises(OverflowError, self.serv_conn.shutdown, 877 _testcapi.INT_MAX + 1) 878 self.assertRaises(OverflowError, self.serv_conn.shutdown, 879 2 + (_testcapi.UINT_MAX + 1)) 880 self.serv_conn.shutdown(2) 881 882@unittest.skipUnless(thread, 'Threading required for this test.') 883class BasicUDPTest(ThreadedUDPSocketTest): 884 885 def __init__(self, methodName='runTest'): 886 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 887 888 def testSendtoAndRecv(self): 889 # Testing sendto() and Recv() over UDP 890 msg = self.serv.recv(len(MSG)) 891 self.assertEqual(msg, MSG) 892 893 def _testSendtoAndRecv(self): 894 self.cli.sendto(MSG, 0, (HOST, self.port)) 895 896 def testRecvFrom(self): 897 # Testing recvfrom() over UDP 898 msg, addr = self.serv.recvfrom(len(MSG)) 899 self.assertEqual(msg, MSG) 900 901 def _testRecvFrom(self): 902 self.cli.sendto(MSG, 0, (HOST, self.port)) 903 904 def testRecvFromNegative(self): 905 # Negative lengths passed to recvfrom should give ValueError. 906 self.assertRaises(ValueError, self.serv.recvfrom, -1) 907 908 def _testRecvFromNegative(self): 909 self.cli.sendto(MSG, 0, (HOST, self.port)) 910 911@unittest.skipUnless(thread, 'Threading required for this test.') 912class TCPCloserTest(ThreadedTCPSocketTest): 913 914 def testClose(self): 915 conn, addr = self.serv.accept() 916 conn.close() 917 918 sd = self.cli 919 read, write, err = select.select([sd], [], [], 1.0) 920 self.assertEqual(read, [sd]) 921 self.assertEqual(sd.recv(1), '') 922 923 def _testClose(self): 924 self.cli.connect((HOST, self.port)) 925 time.sleep(1.0) 926 927@unittest.skipUnless(hasattr(socket, 'socketpair'), 928 'test needs socket.socketpair()') 929@unittest.skipUnless(thread, 'Threading required for this test.') 930class BasicSocketPairTest(SocketPairTest): 931 932 def __init__(self, methodName='runTest'): 933 SocketPairTest.__init__(self, methodName=methodName) 934 935 def testRecv(self): 936 msg = self.serv.recv(1024) 937 self.assertEqual(msg, MSG) 938 939 def _testRecv(self): 940 self.cli.send(MSG) 941 942 def testSend(self): 943 self.serv.send(MSG) 944 945 def _testSend(self): 946 msg = self.cli.recv(1024) 947 self.assertEqual(msg, MSG) 948 949@unittest.skipUnless(thread, 'Threading required for this test.') 950class NonBlockingTCPTests(ThreadedTCPSocketTest): 951 952 def __init__(self, methodName='runTest'): 953 self.event = threading.Event() 954 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 955 956 def testSetBlocking(self): 957 # Testing whether set blocking works 958 self.serv.setblocking(True) 959 self.assertIsNone(self.serv.gettimeout()) 960 self.serv.setblocking(False) 961 self.assertEqual(self.serv.gettimeout(), 0.0) 962 start = time.time() 963 try: 964 self.serv.accept() 965 except socket.error: 966 pass 967 end = time.time() 968 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") 969 970 def _testSetBlocking(self): 971 pass 972 973 @test_support.cpython_only 974 def testSetBlocking_overflow(self): 975 # Issue 15989 976 import _testcapi 977 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 978 self.skipTest('needs UINT_MAX < ULONG_MAX') 979 self.serv.setblocking(False) 980 self.assertEqual(self.serv.gettimeout(), 0.0) 981 self.serv.setblocking(_testcapi.UINT_MAX + 1) 982 self.assertIsNone(self.serv.gettimeout()) 983 984 _testSetBlocking_overflow = test_support.cpython_only(_testSetBlocking) 985 986 def testAccept(self): 987 # Testing non-blocking accept 988 self.serv.setblocking(0) 989 990 # connect() didn't start: non-blocking accept() fails 991 with self.assertRaises(socket.error): 992 conn, addr = self.serv.accept() 993 994 self.event.set() 995 996 read, write, err = select.select([self.serv], [], [], MAIN_TIMEOUT) 997 if self.serv not in read: 998 self.fail("Error trying to do accept after select.") 999 1000 # connect() completed: non-blocking accept() doesn't block 1001 conn, addr = self.serv.accept() 1002 self.addCleanup(conn.close) 1003 self.assertIsNone(conn.gettimeout()) 1004 1005 def _testAccept(self): 1006 # don't connect before event is set to check 1007 # that non-blocking accept() raises socket.error 1008 self.event.wait() 1009 1010 self.cli.connect((HOST, self.port)) 1011 1012 def testConnect(self): 1013 # Testing non-blocking connect 1014 conn, addr = self.serv.accept() 1015 conn.close() 1016 1017 def _testConnect(self): 1018 self.cli.settimeout(10) 1019 self.cli.connect((HOST, self.port)) 1020 1021 def testRecv(self): 1022 # Testing non-blocking recv 1023 conn, addr = self.serv.accept() 1024 self.addCleanup(conn.close) 1025 conn.setblocking(0) 1026 1027 # the server didn't send data yet: non-blocking recv() fails 1028 with self.assertRaises(socket.error): 1029 msg = conn.recv(len(MSG)) 1030 1031 self.event.set() 1032 1033 read, write, err = select.select([conn], [], [], MAIN_TIMEOUT) 1034 if conn not in read: 1035 self.fail("Error during select call to non-blocking socket.") 1036 1037 # the server sent data yet: non-blocking recv() doesn't block 1038 msg = conn.recv(len(MSG)) 1039 self.assertEqual(msg, MSG) 1040 1041 def _testRecv(self): 1042 self.cli.connect((HOST, self.port)) 1043 1044 # don't send anything before event is set to check 1045 # that non-blocking recv() raises socket.error 1046 self.event.wait() 1047 1048 # send data: recv() will no longer block 1049 self.cli.sendall(MSG) 1050 1051@unittest.skipUnless(thread, 'Threading required for this test.') 1052class FileObjectClassTestCase(SocketConnectedTest): 1053 1054 bufsize = -1 # Use default buffer size 1055 1056 def __init__(self, methodName='runTest'): 1057 SocketConnectedTest.__init__(self, methodName=methodName) 1058 1059 def setUp(self): 1060 SocketConnectedTest.setUp(self) 1061 self.serv_file = self.cli_conn.makefile('rb', self.bufsize) 1062 1063 def tearDown(self): 1064 self.serv_file.close() 1065 self.assertTrue(self.serv_file.closed) 1066 SocketConnectedTest.tearDown(self) 1067 self.serv_file = None 1068 1069 def clientSetUp(self): 1070 SocketConnectedTest.clientSetUp(self) 1071 self.cli_file = self.serv_conn.makefile('wb') 1072 1073 def clientTearDown(self): 1074 self.cli_file.close() 1075 self.assertTrue(self.cli_file.closed) 1076 self.cli_file = None 1077 SocketConnectedTest.clientTearDown(self) 1078 1079 def testSmallRead(self): 1080 # Performing small file read test 1081 first_seg = self.serv_file.read(len(MSG)-3) 1082 second_seg = self.serv_file.read(3) 1083 msg = first_seg + second_seg 1084 self.assertEqual(msg, MSG) 1085 1086 def _testSmallRead(self): 1087 self.cli_file.write(MSG) 1088 self.cli_file.flush() 1089 1090 def testFullRead(self): 1091 # read until EOF 1092 msg = self.serv_file.read() 1093 self.assertEqual(msg, MSG) 1094 1095 def _testFullRead(self): 1096 self.cli_file.write(MSG) 1097 self.cli_file.close() 1098 1099 def testUnbufferedRead(self): 1100 # Performing unbuffered file read test 1101 buf = '' 1102 while 1: 1103 char = self.serv_file.read(1) 1104 if not char: 1105 break 1106 buf += char 1107 self.assertEqual(buf, MSG) 1108 1109 def _testUnbufferedRead(self): 1110 self.cli_file.write(MSG) 1111 self.cli_file.flush() 1112 1113 def testReadline(self): 1114 # Performing file readline test 1115 line = self.serv_file.readline() 1116 self.assertEqual(line, MSG) 1117 1118 def _testReadline(self): 1119 self.cli_file.write(MSG) 1120 self.cli_file.flush() 1121 1122 def testReadlineAfterRead(self): 1123 a_baloo_is = self.serv_file.read(len("A baloo is")) 1124 self.assertEqual("A baloo is", a_baloo_is) 1125 _a_bear = self.serv_file.read(len(" a bear")) 1126 self.assertEqual(" a bear", _a_bear) 1127 line = self.serv_file.readline() 1128 self.assertEqual("\n", line) 1129 line = self.serv_file.readline() 1130 self.assertEqual("A BALOO IS A BEAR.\n", line) 1131 line = self.serv_file.readline() 1132 self.assertEqual(MSG, line) 1133 1134 def _testReadlineAfterRead(self): 1135 self.cli_file.write("A baloo is a bear\n") 1136 self.cli_file.write("A BALOO IS A BEAR.\n") 1137 self.cli_file.write(MSG) 1138 self.cli_file.flush() 1139 1140 def testReadlineAfterReadNoNewline(self): 1141 end_of_ = self.serv_file.read(len("End Of ")) 1142 self.assertEqual("End Of ", end_of_) 1143 line = self.serv_file.readline() 1144 self.assertEqual("Line", line) 1145 1146 def _testReadlineAfterReadNoNewline(self): 1147 self.cli_file.write("End Of Line") 1148 1149 def testClosedAttr(self): 1150 self.assertTrue(not self.serv_file.closed) 1151 1152 def _testClosedAttr(self): 1153 self.assertTrue(not self.cli_file.closed) 1154 1155 1156class FileObjectInterruptedTestCase(unittest.TestCase): 1157 """Test that the file object correctly handles EINTR internally.""" 1158 1159 class MockSocket(object): 1160 def __init__(self, recv_funcs=()): 1161 # A generator that returns callables that we'll call for each 1162 # call to recv(). 1163 self._recv_step = iter(recv_funcs) 1164 1165 def recv(self, size): 1166 return self._recv_step.next()() 1167 1168 @staticmethod 1169 def _raise_eintr(): 1170 raise socket.error(errno.EINTR) 1171 1172 def _test_readline(self, size=-1, **kwargs): 1173 mock_sock = self.MockSocket(recv_funcs=[ 1174 lambda : "This is the first line\nAnd the sec", 1175 self._raise_eintr, 1176 lambda : "ond line is here\n", 1177 lambda : "", 1178 ]) 1179 fo = socket._fileobject(mock_sock, **kwargs) 1180 self.assertEqual(fo.readline(size), "This is the first line\n") 1181 self.assertEqual(fo.readline(size), "And the second line is here\n") 1182 1183 def _test_read(self, size=-1, **kwargs): 1184 mock_sock = self.MockSocket(recv_funcs=[ 1185 lambda : "This is the first line\nAnd the sec", 1186 self._raise_eintr, 1187 lambda : "ond line is here\n", 1188 lambda : "", 1189 ]) 1190 fo = socket._fileobject(mock_sock, **kwargs) 1191 self.assertEqual(fo.read(size), "This is the first line\n" 1192 "And the second line is here\n") 1193 1194 def test_default(self): 1195 self._test_readline() 1196 self._test_readline(size=100) 1197 self._test_read() 1198 self._test_read(size=100) 1199 1200 def test_with_1k_buffer(self): 1201 self._test_readline(bufsize=1024) 1202 self._test_readline(size=100, bufsize=1024) 1203 self._test_read(bufsize=1024) 1204 self._test_read(size=100, bufsize=1024) 1205 1206 def _test_readline_no_buffer(self, size=-1): 1207 mock_sock = self.MockSocket(recv_funcs=[ 1208 lambda : "aa", 1209 lambda : "\n", 1210 lambda : "BB", 1211 self._raise_eintr, 1212 lambda : "bb", 1213 lambda : "", 1214 ]) 1215 fo = socket._fileobject(mock_sock, bufsize=0) 1216 self.assertEqual(fo.readline(size), "aa\n") 1217 self.assertEqual(fo.readline(size), "BBbb") 1218 1219 def test_no_buffer(self): 1220 self._test_readline_no_buffer() 1221 self._test_readline_no_buffer(size=4) 1222 self._test_read(bufsize=0) 1223 self._test_read(size=100, bufsize=0) 1224 1225 1226class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 1227 1228 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 1229 1230 In this case (and in this case only), it should be possible to 1231 create a file object, read a line from it, create another file 1232 object, read another line from it, without loss of data in the 1233 first file object's buffer. Note that httplib relies on this 1234 when reading multiple requests from the same socket.""" 1235 1236 bufsize = 0 # Use unbuffered mode 1237 1238 def testUnbufferedReadline(self): 1239 # Read a line, create a new file object, read another line with it 1240 line = self.serv_file.readline() # first line 1241 self.assertEqual(line, "A. " + MSG) # first line 1242 self.serv_file = self.cli_conn.makefile('rb', 0) 1243 line = self.serv_file.readline() # second line 1244 self.assertEqual(line, "B. " + MSG) # second line 1245 1246 def _testUnbufferedReadline(self): 1247 self.cli_file.write("A. " + MSG) 1248 self.cli_file.write("B. " + MSG) 1249 self.cli_file.flush() 1250 1251class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 1252 1253 bufsize = 1 # Default-buffered for reading; line-buffered for writing 1254 1255 class SocketMemo(object): 1256 """A wrapper to keep track of sent data, needed to examine write behaviour""" 1257 def __init__(self, sock): 1258 self._sock = sock 1259 self.sent = [] 1260 1261 def send(self, data, flags=0): 1262 n = self._sock.send(data, flags) 1263 self.sent.append(data[:n]) 1264 return n 1265 1266 def sendall(self, data, flags=0): 1267 self._sock.sendall(data, flags) 1268 self.sent.append(data) 1269 1270 def __getattr__(self, attr): 1271 return getattr(self._sock, attr) 1272 1273 def getsent(self): 1274 return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent] 1275 1276 def setUp(self): 1277 FileObjectClassTestCase.setUp(self) 1278 self.serv_file._sock = self.SocketMemo(self.serv_file._sock) 1279 1280 def testLinebufferedWrite(self): 1281 # Write two lines, in small chunks 1282 msg = MSG.strip() 1283 print >> self.serv_file, msg, 1284 print >> self.serv_file, msg 1285 1286 # second line: 1287 print >> self.serv_file, msg, 1288 print >> self.serv_file, msg, 1289 print >> self.serv_file, msg 1290 1291 # third line 1292 print >> self.serv_file, '' 1293 1294 self.serv_file.flush() 1295 1296 msg1 = "%s %s\n"%(msg, msg) 1297 msg2 = "%s %s %s\n"%(msg, msg, msg) 1298 msg3 = "\n" 1299 self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3]) 1300 1301 def _testLinebufferedWrite(self): 1302 msg = MSG.strip() 1303 msg1 = "%s %s\n"%(msg, msg) 1304 msg2 = "%s %s %s\n"%(msg, msg, msg) 1305 msg3 = "\n" 1306 l1 = self.cli_file.readline() 1307 self.assertEqual(l1, msg1) 1308 l2 = self.cli_file.readline() 1309 self.assertEqual(l2, msg2) 1310 l3 = self.cli_file.readline() 1311 self.assertEqual(l3, msg3) 1312 1313 1314class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 1315 1316 bufsize = 2 # Exercise the buffering code 1317 1318 1319class NetworkConnectionTest(object): 1320 """Prove network connection.""" 1321 def clientSetUp(self): 1322 # We're inherited below by BasicTCPTest2, which also inherits 1323 # BasicTCPTest, which defines self.port referenced below. 1324 self.cli = socket.create_connection((HOST, self.port)) 1325 self.serv_conn = self.cli 1326 1327class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 1328 """Tests that NetworkConnection does not break existing TCP functionality. 1329 """ 1330 1331class NetworkConnectionNoServer(unittest.TestCase): 1332 class MockSocket(socket.socket): 1333 def connect(self, *args): 1334 raise socket.timeout('timed out') 1335 1336 @contextlib.contextmanager 1337 def mocked_socket_module(self): 1338 """Return a socket which times out on connect""" 1339 old_socket = socket.socket 1340 socket.socket = self.MockSocket 1341 try: 1342 yield 1343 finally: 1344 socket.socket = old_socket 1345 1346 def test_connect(self): 1347 port = test_support.find_unused_port() 1348 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1349 self.addCleanup(cli.close) 1350 with self.assertRaises(socket.error) as cm: 1351 cli.connect((HOST, port)) 1352 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 1353 1354 def test_create_connection(self): 1355 # Issue #9792: errors raised by create_connection() should have 1356 # a proper errno attribute. 1357 port = test_support.find_unused_port() 1358 with self.assertRaises(socket.error) as cm: 1359 socket.create_connection((HOST, port)) 1360 1361 # Issue #16257: create_connection() calls getaddrinfo() against 1362 # 'localhost'. This may result in an IPV6 addr being returned 1363 # as well as an IPV4 one: 1364 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 1365 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 1366 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 1367 # 1368 # create_connection() enumerates through all the addresses returned 1369 # and if it doesn't successfully bind to any of them, it propagates 1370 # the last exception it encountered. 1371 # 1372 # On Solaris, ENETUNREACH is returned in this circumstance instead 1373 # of ECONNREFUSED. So, if that errno exists, add it to our list of 1374 # expected errnos. 1375 expected_errnos = [ errno.ECONNREFUSED, ] 1376 if hasattr(errno, 'ENETUNREACH'): 1377 expected_errnos.append(errno.ENETUNREACH) 1378 if hasattr(errno, 'EADDRNOTAVAIL'): 1379 # bpo-31910: socket.create_connection() fails randomly 1380 # with EADDRNOTAVAIL on Travis CI 1381 expected_errnos.append(errno.EADDRNOTAVAIL) 1382 1383 self.assertIn(cm.exception.errno, expected_errnos) 1384 1385 def test_create_connection_timeout(self): 1386 # Issue #9792: create_connection() should not recast timeout errors 1387 # as generic socket errors. 1388 with self.mocked_socket_module(): 1389 with self.assertRaises(socket.timeout): 1390 socket.create_connection((HOST, 1234)) 1391 1392 1393@unittest.skipUnless(thread, 'Threading required for this test.') 1394class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 1395 1396 def __init__(self, methodName='runTest'): 1397 SocketTCPTest.__init__(self, methodName=methodName) 1398 ThreadableTest.__init__(self) 1399 1400 def clientSetUp(self): 1401 self.source_port = test_support.find_unused_port() 1402 1403 def clientTearDown(self): 1404 self.cli.close() 1405 self.cli = None 1406 ThreadableTest.clientTearDown(self) 1407 1408 def _justAccept(self): 1409 conn, addr = self.serv.accept() 1410 conn.close() 1411 1412 testFamily = _justAccept 1413 def _testFamily(self): 1414 self.cli = socket.create_connection((HOST, self.port), timeout=30) 1415 self.addCleanup(self.cli.close) 1416 self.assertEqual(self.cli.family, 2) 1417 1418 testSourceAddress = _justAccept 1419 def _testSourceAddress(self): 1420 self.cli = socket.create_connection((HOST, self.port), timeout=30, 1421 source_address=('', self.source_port)) 1422 self.addCleanup(self.cli.close) 1423 self.assertEqual(self.cli.getsockname()[1], self.source_port) 1424 # The port number being used is sufficient to show that the bind() 1425 # call happened. 1426 1427 testTimeoutDefault = _justAccept 1428 def _testTimeoutDefault(self): 1429 # passing no explicit timeout uses socket's global default 1430 self.assertTrue(socket.getdefaulttimeout() is None) 1431 socket.setdefaulttimeout(42) 1432 try: 1433 self.cli = socket.create_connection((HOST, self.port)) 1434 self.addCleanup(self.cli.close) 1435 finally: 1436 socket.setdefaulttimeout(None) 1437 self.assertEqual(self.cli.gettimeout(), 42) 1438 1439 testTimeoutNone = _justAccept 1440 def _testTimeoutNone(self): 1441 # None timeout means the same as sock.settimeout(None) 1442 self.assertTrue(socket.getdefaulttimeout() is None) 1443 socket.setdefaulttimeout(30) 1444 try: 1445 self.cli = socket.create_connection((HOST, self.port), timeout=None) 1446 self.addCleanup(self.cli.close) 1447 finally: 1448 socket.setdefaulttimeout(None) 1449 self.assertEqual(self.cli.gettimeout(), None) 1450 1451 testTimeoutValueNamed = _justAccept 1452 def _testTimeoutValueNamed(self): 1453 self.cli = socket.create_connection((HOST, self.port), timeout=30) 1454 self.assertEqual(self.cli.gettimeout(), 30) 1455 1456 testTimeoutValueNonamed = _justAccept 1457 def _testTimeoutValueNonamed(self): 1458 self.cli = socket.create_connection((HOST, self.port), 30) 1459 self.addCleanup(self.cli.close) 1460 self.assertEqual(self.cli.gettimeout(), 30) 1461 1462@unittest.skipUnless(thread, 'Threading required for this test.') 1463class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 1464 1465 def __init__(self, methodName='runTest'): 1466 SocketTCPTest.__init__(self, methodName=methodName) 1467 ThreadableTest.__init__(self) 1468 1469 def clientSetUp(self): 1470 pass 1471 1472 def clientTearDown(self): 1473 self.cli.close() 1474 self.cli = None 1475 ThreadableTest.clientTearDown(self) 1476 1477 def testInsideTimeout(self): 1478 conn, addr = self.serv.accept() 1479 self.addCleanup(conn.close) 1480 time.sleep(3) 1481 conn.send("done!") 1482 testOutsideTimeout = testInsideTimeout 1483 1484 def _testInsideTimeout(self): 1485 self.cli = sock = socket.create_connection((HOST, self.port)) 1486 data = sock.recv(5) 1487 self.assertEqual(data, "done!") 1488 1489 def _testOutsideTimeout(self): 1490 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 1491 self.assertRaises(socket.timeout, lambda: sock.recv(5)) 1492 1493 1494class Urllib2FileobjectTest(unittest.TestCase): 1495 1496 # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that 1497 # it close the socket if the close c'tor argument is true 1498 1499 def testClose(self): 1500 class MockSocket: 1501 closed = False 1502 def flush(self): pass 1503 def close(self): self.closed = True 1504 1505 # must not close unless we request it: the original use of _fileobject 1506 # by module socket requires that the underlying socket not be closed until 1507 # the _socketobject that created the _fileobject is closed 1508 s = MockSocket() 1509 f = socket._fileobject(s) 1510 f.close() 1511 self.assertTrue(not s.closed) 1512 1513 s = MockSocket() 1514 f = socket._fileobject(s, close=True) 1515 f.close() 1516 self.assertTrue(s.closed) 1517 1518class TCPTimeoutTest(SocketTCPTest): 1519 1520 def testTCPTimeout(self): 1521 def raise_timeout(*args, **kwargs): 1522 self.serv.settimeout(1.0) 1523 self.serv.accept() 1524 self.assertRaises(socket.timeout, raise_timeout, 1525 "Error generating a timeout exception (TCP)") 1526 1527 def testTimeoutZero(self): 1528 ok = False 1529 try: 1530 self.serv.settimeout(0.0) 1531 foo = self.serv.accept() 1532 except socket.timeout: 1533 self.fail("caught timeout instead of error (TCP)") 1534 except socket.error: 1535 ok = True 1536 except: 1537 self.fail("caught unexpected exception (TCP)") 1538 if not ok: 1539 self.fail("accept() returned success when we did not expect it") 1540 1541 @unittest.skipUnless(hasattr(signal, 'alarm'), 1542 'test needs signal.alarm()') 1543 def testInterruptedTimeout(self): 1544 # XXX I don't know how to do this test on MSWindows or any other 1545 # plaform that doesn't support signal.alarm() or os.kill(), though 1546 # the bug should have existed on all platforms. 1547 self.serv.settimeout(5.0) # must be longer than alarm 1548 class Alarm(Exception): 1549 pass 1550 def alarm_handler(signal, frame): 1551 raise Alarm 1552 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 1553 try: 1554 try: 1555 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 1556 foo = self.serv.accept() 1557 except socket.timeout: 1558 self.fail("caught timeout instead of Alarm") 1559 except Alarm: 1560 pass 1561 except: 1562 self.fail("caught other exception instead of Alarm:" 1563 " %s(%s):\n%s" % 1564 (sys.exc_info()[:2] + (traceback.format_exc(),))) 1565 else: 1566 self.fail("nothing caught") 1567 finally: 1568 signal.alarm(0) # shut off alarm 1569 except Alarm: 1570 self.fail("got Alarm in wrong place") 1571 finally: 1572 # no alarm can be pending. Safe to restore old handler. 1573 signal.signal(signal.SIGALRM, old_alarm) 1574 1575class UDPTimeoutTest(SocketUDPTest): 1576 1577 def testUDPTimeout(self): 1578 def raise_timeout(*args, **kwargs): 1579 self.serv.settimeout(1.0) 1580 self.serv.recv(1024) 1581 self.assertRaises(socket.timeout, raise_timeout, 1582 "Error generating a timeout exception (UDP)") 1583 1584 def testTimeoutZero(self): 1585 ok = False 1586 try: 1587 self.serv.settimeout(0.0) 1588 foo = self.serv.recv(1024) 1589 except socket.timeout: 1590 self.fail("caught timeout instead of error (UDP)") 1591 except socket.error: 1592 ok = True 1593 except: 1594 self.fail("caught unexpected exception (UDP)") 1595 if not ok: 1596 self.fail("recv() returned success when we did not expect it") 1597 1598class TestExceptions(unittest.TestCase): 1599 1600 def testExceptionTree(self): 1601 self.assertTrue(issubclass(socket.error, Exception)) 1602 self.assertTrue(issubclass(socket.herror, socket.error)) 1603 self.assertTrue(issubclass(socket.gaierror, socket.error)) 1604 self.assertTrue(issubclass(socket.timeout, socket.error)) 1605 1606@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 1607class TestLinuxAbstractNamespace(unittest.TestCase): 1608 1609 UNIX_PATH_MAX = 108 1610 1611 def testLinuxAbstractNamespace(self): 1612 address = "\x00python-test-hello\x00\xff" 1613 s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1614 s1.bind(address) 1615 s1.listen(1) 1616 s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1617 s2.connect(s1.getsockname()) 1618 s1.accept() 1619 self.assertEqual(s1.getsockname(), address) 1620 self.assertEqual(s2.getpeername(), address) 1621 1622 def testMaxName(self): 1623 address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1) 1624 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1625 s.bind(address) 1626 self.assertEqual(s.getsockname(), address) 1627 1628 def testNameOverflow(self): 1629 address = "\x00" + "h" * self.UNIX_PATH_MAX 1630 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1631 self.assertRaises(socket.error, s.bind, address) 1632 1633 1634@unittest.skipUnless(thread, 'Threading required for this test.') 1635class BufferIOTest(SocketConnectedTest): 1636 """ 1637 Test the buffer versions of socket.recv() and socket.send(). 1638 """ 1639 def __init__(self, methodName='runTest'): 1640 SocketConnectedTest.__init__(self, methodName=methodName) 1641 1642 def testRecvIntoArray(self): 1643 buf = array.array('c', ' '*1024) 1644 nbytes = self.cli_conn.recv_into(buf) 1645 self.assertEqual(nbytes, len(MSG)) 1646 msg = buf.tostring()[:len(MSG)] 1647 self.assertEqual(msg, MSG) 1648 1649 def _testRecvIntoArray(self): 1650 with test_support.check_py3k_warnings(): 1651 buf = buffer(MSG) 1652 self.serv_conn.send(buf) 1653 1654 def testRecvIntoBytearray(self): 1655 buf = bytearray(1024) 1656 nbytes = self.cli_conn.recv_into(buf) 1657 self.assertEqual(nbytes, len(MSG)) 1658 msg = buf[:len(MSG)] 1659 self.assertEqual(msg, MSG) 1660 1661 _testRecvIntoBytearray = _testRecvIntoArray 1662 1663 def testRecvIntoMemoryview(self): 1664 buf = bytearray(1024) 1665 nbytes = self.cli_conn.recv_into(memoryview(buf)) 1666 self.assertEqual(nbytes, len(MSG)) 1667 msg = buf[:len(MSG)] 1668 self.assertEqual(msg, MSG) 1669 1670 _testRecvIntoMemoryview = _testRecvIntoArray 1671 1672 def testRecvFromIntoArray(self): 1673 buf = array.array('c', ' '*1024) 1674 nbytes, addr = self.cli_conn.recvfrom_into(buf) 1675 self.assertEqual(nbytes, len(MSG)) 1676 msg = buf.tostring()[:len(MSG)] 1677 self.assertEqual(msg, MSG) 1678 1679 def _testRecvFromIntoArray(self): 1680 with test_support.check_py3k_warnings(): 1681 buf = buffer(MSG) 1682 self.serv_conn.send(buf) 1683 1684 def testRecvFromIntoBytearray(self): 1685 buf = bytearray(1024) 1686 nbytes, addr = self.cli_conn.recvfrom_into(buf) 1687 self.assertEqual(nbytes, len(MSG)) 1688 msg = buf[:len(MSG)] 1689 self.assertEqual(msg, MSG) 1690 1691 _testRecvFromIntoBytearray = _testRecvFromIntoArray 1692 1693 def testRecvFromIntoMemoryview(self): 1694 buf = bytearray(1024) 1695 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 1696 self.assertEqual(nbytes, len(MSG)) 1697 msg = buf[:len(MSG)] 1698 self.assertEqual(msg, MSG) 1699 1700 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 1701 1702 def testRecvFromIntoSmallBuffer(self): 1703 # See issue #20246. 1704 buf = bytearray(8) 1705 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 1706 1707 def _testRecvFromIntoSmallBuffer(self): 1708 with test_support.check_py3k_warnings(): 1709 buf = buffer(MSG) 1710 self.serv_conn.send(buf) 1711 1712 def testRecvFromIntoEmptyBuffer(self): 1713 buf = bytearray() 1714 self.cli_conn.recvfrom_into(buf) 1715 self.cli_conn.recvfrom_into(buf, 0) 1716 1717 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 1718 1719 1720TIPC_STYPE = 2000 1721TIPC_LOWER = 200 1722TIPC_UPPER = 210 1723 1724def isTipcAvailable(): 1725 """Check if the TIPC module is loaded 1726 1727 The TIPC module is not loaded automatically on Ubuntu and probably 1728 other Linux distros. 1729 """ 1730 if not hasattr(socket, "AF_TIPC"): 1731 return False 1732 try: 1733 f = open("/proc/modules") 1734 except IOError as e: 1735 # It's ok if the file does not exist, is a directory or if we 1736 # have not the permission to read it. In any other case it's a 1737 # real error, so raise it again. 1738 if e.errno in (errno.ENOENT, errno.EISDIR, errno.EACCES): 1739 return False 1740 else: 1741 raise 1742 with f: 1743 for line in f: 1744 if line.startswith("tipc "): 1745 return True 1746 return False 1747 1748@unittest.skipUnless(isTipcAvailable(), 1749 "TIPC module is not loaded, please 'sudo modprobe tipc'") 1750class TIPCTest(unittest.TestCase): 1751 def testRDM(self): 1752 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 1753 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 1754 1755 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1756 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 1757 TIPC_LOWER, TIPC_UPPER) 1758 srv.bind(srvaddr) 1759 1760 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 1761 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) 1762 cli.sendto(MSG, sendaddr) 1763 1764 msg, recvaddr = srv.recvfrom(1024) 1765 1766 self.assertEqual(cli.getsockname(), recvaddr) 1767 self.assertEqual(msg, MSG) 1768 1769 1770@unittest.skipUnless(isTipcAvailable(), 1771 "TIPC module is not loaded, please 'sudo modprobe tipc'") 1772class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 1773 def __init__(self, methodName = 'runTest'): 1774 unittest.TestCase.__init__(self, methodName = methodName) 1775 ThreadableTest.__init__(self) 1776 1777 def setUp(self): 1778 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 1779 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1780 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 1781 TIPC_LOWER, TIPC_UPPER) 1782 self.srv.bind(srvaddr) 1783 self.srv.listen(5) 1784 self.serverExplicitReady() 1785 self.conn, self.connaddr = self.srv.accept() 1786 1787 def clientSetUp(self): 1788 # There is a hittable race between serverExplicitReady() and the 1789 # accept() call; sleep a little while to avoid it, otherwise 1790 # we could get an exception 1791 time.sleep(0.1) 1792 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 1793 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 1794 TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0) 1795 self.cli.connect(addr) 1796 self.cliaddr = self.cli.getsockname() 1797 1798 def testStream(self): 1799 msg = self.conn.recv(1024) 1800 self.assertEqual(msg, MSG) 1801 self.assertEqual(self.cliaddr, self.connaddr) 1802 1803 def _testStream(self): 1804 self.cli.send(MSG) 1805 self.cli.close() 1806 1807 1808def test_main(): 1809 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, 1810 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, 1811 UDPTimeoutTest ] 1812 1813 tests.extend([ 1814 NonBlockingTCPTests, 1815 FileObjectClassTestCase, 1816 FileObjectInterruptedTestCase, 1817 UnbufferedFileObjectClassTestCase, 1818 LineBufferedFileObjectClassTestCase, 1819 SmallBufferedFileObjectClassTestCase, 1820 Urllib2FileobjectTest, 1821 NetworkConnectionNoServer, 1822 NetworkConnectionAttributesTest, 1823 NetworkConnectionBehaviourTest, 1824 ]) 1825 tests.append(BasicSocketPairTest) 1826 tests.append(TestLinuxAbstractNamespace) 1827 tests.extend([TIPCTest, TIPCThreadableTest]) 1828 1829 thread_info = test_support.threading_setup() 1830 test_support.run_unittest(*tests) 1831 test_support.threading_cleanup(*thread_info) 1832 1833if __name__ == "__main__": 1834 test_main() 1835