1import unittest 2from test import support 3 4import errno 5import io 6import itertools 7import socket 8import select 9import tempfile 10import time 11import traceback 12import queue 13import sys 14import os 15import array 16import contextlib 17from weakref import proxy 18import signal 19import math 20import pickle 21import struct 22import random 23import string 24try: 25 import multiprocessing 26except ImportError: 27 multiprocessing = False 28try: 29 import fcntl 30except ImportError: 31 fcntl = None 32 33HOST = support.HOST 34MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return 35MAIN_TIMEOUT = 60.0 36 37try: 38 import _thread as thread 39 import threading 40except ImportError: 41 thread = None 42 threading = None 43try: 44 import _socket 45except ImportError: 46 _socket = None 47 48 49def _have_socket_can(): 50 """Check whether CAN sockets are supported on this host.""" 51 try: 52 s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 53 except (AttributeError, OSError): 54 return False 55 else: 56 s.close() 57 return True 58 59def _have_socket_rds(): 60 """Check whether RDS sockets are supported on this host.""" 61 try: 62 s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 63 except (AttributeError, OSError): 64 return False 65 else: 66 s.close() 67 return True 68 69def _have_socket_alg(): 70 """Check whether AF_ALG sockets are supported on this host.""" 71 try: 72 s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 73 except (AttributeError, OSError): 74 return False 75 else: 76 s.close() 77 return True 78 79HAVE_SOCKET_CAN = _have_socket_can() 80 81HAVE_SOCKET_RDS = _have_socket_rds() 82 83HAVE_SOCKET_ALG = _have_socket_alg() 84 85# Size in bytes of the int type 86SIZEOF_INT = array.array("i").itemsize 87 88class SocketTCPTest(unittest.TestCase): 89 90 def setUp(self): 91 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 92 self.port = support.bind_port(self.serv) 93 self.serv.listen() 94 95 def tearDown(self): 96 self.serv.close() 97 self.serv = None 98 99class SocketUDPTest(unittest.TestCase): 100 101 def setUp(self): 102 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 103 self.port = support.bind_port(self.serv) 104 105 def tearDown(self): 106 self.serv.close() 107 self.serv = None 108 109class ThreadSafeCleanupTestCase(unittest.TestCase): 110 """Subclass of unittest.TestCase with thread-safe cleanup methods. 111 112 This subclass protects the addCleanup() and doCleanups() methods 113 with a recursive lock. 114 """ 115 116 if threading: 117 def __init__(self, *args, **kwargs): 118 super().__init__(*args, **kwargs) 119 self._cleanup_lock = threading.RLock() 120 121 def addCleanup(self, *args, **kwargs): 122 with self._cleanup_lock: 123 return super().addCleanup(*args, **kwargs) 124 125 def doCleanups(self, *args, **kwargs): 126 with self._cleanup_lock: 127 return super().doCleanups(*args, **kwargs) 128 129class SocketCANTest(unittest.TestCase): 130 131 """To be able to run this test, a `vcan0` CAN interface can be created with 132 the following commands: 133 # modprobe vcan 134 # ip link add dev vcan0 type vcan 135 # ifconfig vcan0 up 136 """ 137 interface = 'vcan0' 138 bufsize = 128 139 140 """The CAN frame structure is defined in <linux/can.h>: 141 142 struct can_frame { 143 canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ 144 __u8 can_dlc; /* data length code: 0 .. 8 */ 145 __u8 data[8] __attribute__((aligned(8))); 146 }; 147 """ 148 can_frame_fmt = "=IB3x8s" 149 can_frame_size = struct.calcsize(can_frame_fmt) 150 151 """The Broadcast Management Command frame structure is defined 152 in <linux/can/bcm.h>: 153 154 struct bcm_msg_head { 155 __u32 opcode; 156 __u32 flags; 157 __u32 count; 158 struct timeval ival1, ival2; 159 canid_t can_id; 160 __u32 nframes; 161 struct can_frame frames[0]; 162 } 163 164 `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see 165 `struct can_frame` definition). Must use native not standard types for packing. 166 """ 167 bcm_cmd_msg_fmt = "@3I4l2I" 168 bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) 169 170 def setUp(self): 171 self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 172 self.addCleanup(self.s.close) 173 try: 174 self.s.bind((self.interface,)) 175 except OSError: 176 self.skipTest('network interface `%s` does not exist' % 177 self.interface) 178 179 180class SocketRDSTest(unittest.TestCase): 181 182 """To be able to run this test, the `rds` kernel module must be loaded: 183 # modprobe rds 184 """ 185 bufsize = 8192 186 187 def setUp(self): 188 self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 189 self.addCleanup(self.serv.close) 190 try: 191 self.port = support.bind_port(self.serv) 192 except OSError: 193 self.skipTest('unable to bind RDS socket') 194 195 196class ThreadableTest: 197 """Threadable Test class 198 199 The ThreadableTest class makes it easy to create a threaded 200 client/server pair from an existing unit test. To create a 201 new threaded class from an existing unit test, use multiple 202 inheritance: 203 204 class NewClass (OldClass, ThreadableTest): 205 pass 206 207 This class defines two new fixture functions with obvious 208 purposes for overriding: 209 210 clientSetUp () 211 clientTearDown () 212 213 Any new test functions within the class must then define 214 tests in pairs, where the test name is preceded with a 215 '_' to indicate the client portion of the test. Ex: 216 217 def testFoo(self): 218 # Server portion 219 220 def _testFoo(self): 221 # Client portion 222 223 Any exceptions raised by the clients during their tests 224 are caught and transferred to the main thread to alert 225 the testing framework. 226 227 Note, the server setup function cannot call any blocking 228 functions that rely on the client thread during setup, 229 unless serverExplicitReady() is called just before 230 the blocking call (such as in setting up a client/server 231 connection and performing the accept() in setUp(). 232 """ 233 234 def __init__(self): 235 # Swap the true setup function 236 self.__setUp = self.setUp 237 self.__tearDown = self.tearDown 238 self.setUp = self._setUp 239 self.tearDown = self._tearDown 240 241 def serverExplicitReady(self): 242 """This method allows the server to explicitly indicate that 243 it wants the client thread to proceed. This is useful if the 244 server is about to execute a blocking routine that is 245 dependent upon the client thread during its setup routine.""" 246 self.server_ready.set() 247 248 def _setUp(self): 249 self.wait_threads = support.wait_threads_exit() 250 self.wait_threads.__enter__() 251 252 self.server_ready = threading.Event() 253 self.client_ready = threading.Event() 254 self.done = threading.Event() 255 self.queue = queue.Queue(1) 256 self.server_crashed = False 257 258 # Do some munging to start the client test. 259 methodname = self.id() 260 i = methodname.rfind('.') 261 methodname = methodname[i+1:] 262 test_method = getattr(self, '_' + methodname) 263 self.client_thread = thread.start_new_thread( 264 self.clientRun, (test_method,)) 265 266 try: 267 self.__setUp() 268 except: 269 self.server_crashed = True 270 raise 271 finally: 272 self.server_ready.set() 273 self.client_ready.wait() 274 275 def _tearDown(self): 276 self.__tearDown() 277 self.done.wait() 278 self.wait_threads.__exit__(None, None, None) 279 280 if self.queue.qsize(): 281 exc = self.queue.get() 282 raise exc 283 284 def clientRun(self, test_func): 285 self.server_ready.wait() 286 try: 287 self.clientSetUp() 288 except BaseException as e: 289 self.queue.put(e) 290 self.clientTearDown() 291 return 292 finally: 293 self.client_ready.set() 294 if self.server_crashed: 295 self.clientTearDown() 296 return 297 if not hasattr(test_func, '__call__'): 298 raise TypeError("test_func must be a callable function") 299 try: 300 test_func() 301 except BaseException as e: 302 self.queue.put(e) 303 finally: 304 self.clientTearDown() 305 306 def clientSetUp(self): 307 raise NotImplementedError("clientSetUp must be implemented.") 308 309 def clientTearDown(self): 310 self.done.set() 311 thread.exit() 312 313class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 314 315 def __init__(self, methodName='runTest'): 316 SocketTCPTest.__init__(self, methodName=methodName) 317 ThreadableTest.__init__(self) 318 319 def clientSetUp(self): 320 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 321 322 def clientTearDown(self): 323 self.cli.close() 324 self.cli = None 325 ThreadableTest.clientTearDown(self) 326 327class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 328 329 def __init__(self, methodName='runTest'): 330 SocketUDPTest.__init__(self, methodName=methodName) 331 ThreadableTest.__init__(self) 332 333 def clientSetUp(self): 334 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 335 336 def clientTearDown(self): 337 self.cli.close() 338 self.cli = None 339 ThreadableTest.clientTearDown(self) 340 341class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): 342 343 def __init__(self, methodName='runTest'): 344 SocketCANTest.__init__(self, methodName=methodName) 345 ThreadableTest.__init__(self) 346 347 def clientSetUp(self): 348 self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 349 try: 350 self.cli.bind((self.interface,)) 351 except OSError: 352 # skipTest should not be called here, and will be called in the 353 # server instead 354 pass 355 356 def clientTearDown(self): 357 self.cli.close() 358 self.cli = None 359 ThreadableTest.clientTearDown(self) 360 361class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): 362 363 def __init__(self, methodName='runTest'): 364 SocketRDSTest.__init__(self, methodName=methodName) 365 ThreadableTest.__init__(self) 366 367 def clientSetUp(self): 368 self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 369 try: 370 # RDS sockets must be bound explicitly to send or receive data 371 self.cli.bind((HOST, 0)) 372 self.cli_addr = self.cli.getsockname() 373 except OSError: 374 # skipTest should not be called here, and will be called in the 375 # server instead 376 pass 377 378 def clientTearDown(self): 379 self.cli.close() 380 self.cli = None 381 ThreadableTest.clientTearDown(self) 382 383class SocketConnectedTest(ThreadedTCPSocketTest): 384 """Socket tests for client-server connection. 385 386 self.cli_conn is a client socket connected to the server. The 387 setUp() method guarantees that it is connected to the server. 388 """ 389 390 def __init__(self, methodName='runTest'): 391 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 392 393 def setUp(self): 394 ThreadedTCPSocketTest.setUp(self) 395 # Indicate explicitly we're ready for the client thread to 396 # proceed and then perform the blocking call to accept 397 self.serverExplicitReady() 398 conn, addr = self.serv.accept() 399 self.cli_conn = conn 400 401 def tearDown(self): 402 self.cli_conn.close() 403 self.cli_conn = None 404 ThreadedTCPSocketTest.tearDown(self) 405 406 def clientSetUp(self): 407 ThreadedTCPSocketTest.clientSetUp(self) 408 self.cli.connect((HOST, self.port)) 409 self.serv_conn = self.cli 410 411 def clientTearDown(self): 412 self.serv_conn.close() 413 self.serv_conn = None 414 ThreadedTCPSocketTest.clientTearDown(self) 415 416class SocketPairTest(unittest.TestCase, ThreadableTest): 417 418 def __init__(self, methodName='runTest'): 419 unittest.TestCase.__init__(self, methodName=methodName) 420 ThreadableTest.__init__(self) 421 422 def setUp(self): 423 self.serv, self.cli = socket.socketpair() 424 425 def tearDown(self): 426 self.serv.close() 427 self.serv = None 428 429 def clientSetUp(self): 430 pass 431 432 def clientTearDown(self): 433 self.cli.close() 434 self.cli = None 435 ThreadableTest.clientTearDown(self) 436 437 438# The following classes are used by the sendmsg()/recvmsg() tests. 439# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase 440# gives a drop-in replacement for SocketConnectedTest, but different 441# address families can be used, and the attributes serv_addr and 442# cli_addr will be set to the addresses of the endpoints. 443 444class SocketTestBase(unittest.TestCase): 445 """A base class for socket tests. 446 447 Subclasses must provide methods newSocket() to return a new socket 448 and bindSock(sock) to bind it to an unused address. 449 450 Creates a socket self.serv and sets self.serv_addr to its address. 451 """ 452 453 def setUp(self): 454 self.serv = self.newSocket() 455 self.bindServer() 456 457 def bindServer(self): 458 """Bind server socket and set self.serv_addr to its address.""" 459 self.bindSock(self.serv) 460 self.serv_addr = self.serv.getsockname() 461 462 def tearDown(self): 463 self.serv.close() 464 self.serv = None 465 466 467class SocketListeningTestMixin(SocketTestBase): 468 """Mixin to listen on the server socket.""" 469 470 def setUp(self): 471 super().setUp() 472 self.serv.listen() 473 474 475class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, 476 ThreadableTest): 477 """Mixin to add client socket and allow client/server tests. 478 479 Client socket is self.cli and its address is self.cli_addr. See 480 ThreadableTest for usage information. 481 """ 482 483 def __init__(self, *args, **kwargs): 484 super().__init__(*args, **kwargs) 485 ThreadableTest.__init__(self) 486 487 def clientSetUp(self): 488 self.cli = self.newClientSocket() 489 self.bindClient() 490 491 def newClientSocket(self): 492 """Return a new socket for use as client.""" 493 return self.newSocket() 494 495 def bindClient(self): 496 """Bind client socket and set self.cli_addr to its address.""" 497 self.bindSock(self.cli) 498 self.cli_addr = self.cli.getsockname() 499 500 def clientTearDown(self): 501 self.cli.close() 502 self.cli = None 503 ThreadableTest.clientTearDown(self) 504 505 506class ConnectedStreamTestMixin(SocketListeningTestMixin, 507 ThreadedSocketTestMixin): 508 """Mixin to allow client/server stream tests with connected client. 509 510 Server's socket representing connection to client is self.cli_conn 511 and client's connection to server is self.serv_conn. (Based on 512 SocketConnectedTest.) 513 """ 514 515 def setUp(self): 516 super().setUp() 517 # Indicate explicitly we're ready for the client thread to 518 # proceed and then perform the blocking call to accept 519 self.serverExplicitReady() 520 conn, addr = self.serv.accept() 521 self.cli_conn = conn 522 523 def tearDown(self): 524 self.cli_conn.close() 525 self.cli_conn = None 526 super().tearDown() 527 528 def clientSetUp(self): 529 super().clientSetUp() 530 self.cli.connect(self.serv_addr) 531 self.serv_conn = self.cli 532 533 def clientTearDown(self): 534 try: 535 self.serv_conn.close() 536 self.serv_conn = None 537 except AttributeError: 538 pass 539 super().clientTearDown() 540 541 542class UnixSocketTestBase(SocketTestBase): 543 """Base class for Unix-domain socket tests.""" 544 545 # This class is used for file descriptor passing tests, so we 546 # create the sockets in a private directory so that other users 547 # can't send anything that might be problematic for a privileged 548 # user running the tests. 549 550 def setUp(self): 551 self.dir_path = tempfile.mkdtemp() 552 self.addCleanup(os.rmdir, self.dir_path) 553 super().setUp() 554 555 def bindSock(self, sock): 556 path = tempfile.mktemp(dir=self.dir_path) 557 support.bind_unix_socket(sock, path) 558 self.addCleanup(support.unlink, path) 559 560class UnixStreamBase(UnixSocketTestBase): 561 """Base class for Unix-domain SOCK_STREAM tests.""" 562 563 def newSocket(self): 564 return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 565 566 567class InetTestBase(SocketTestBase): 568 """Base class for IPv4 socket tests.""" 569 570 host = HOST 571 572 def setUp(self): 573 super().setUp() 574 self.port = self.serv_addr[1] 575 576 def bindSock(self, sock): 577 support.bind_port(sock, host=self.host) 578 579class TCPTestBase(InetTestBase): 580 """Base class for TCP-over-IPv4 tests.""" 581 582 def newSocket(self): 583 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 584 585class UDPTestBase(InetTestBase): 586 """Base class for UDP-over-IPv4 tests.""" 587 588 def newSocket(self): 589 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 590 591class SCTPStreamBase(InetTestBase): 592 """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" 593 594 def newSocket(self): 595 return socket.socket(socket.AF_INET, socket.SOCK_STREAM, 596 socket.IPPROTO_SCTP) 597 598 599class Inet6TestBase(InetTestBase): 600 """Base class for IPv6 socket tests.""" 601 602 host = support.HOSTv6 603 604class UDP6TestBase(Inet6TestBase): 605 """Base class for UDP-over-IPv6 tests.""" 606 607 def newSocket(self): 608 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 609 610 611# Test-skipping decorators for use with ThreadableTest. 612 613def skipWithClientIf(condition, reason): 614 """Skip decorated test if condition is true, add client_skip decorator. 615 616 If the decorated object is not a class, sets its attribute 617 "client_skip" to a decorator which will return an empty function 618 if the test is to be skipped, or the original function if it is 619 not. This can be used to avoid running the client part of a 620 skipped test when using ThreadableTest. 621 """ 622 def client_pass(*args, **kwargs): 623 pass 624 def skipdec(obj): 625 retval = unittest.skip(reason)(obj) 626 if not isinstance(obj, type): 627 retval.client_skip = lambda f: client_pass 628 return retval 629 def noskipdec(obj): 630 if not (isinstance(obj, type) or hasattr(obj, "client_skip")): 631 obj.client_skip = lambda f: f 632 return obj 633 return skipdec if condition else noskipdec 634 635 636def requireAttrs(obj, *attributes): 637 """Skip decorated test if obj is missing any of the given attributes. 638 639 Sets client_skip attribute as skipWithClientIf() does. 640 """ 641 missing = [name for name in attributes if not hasattr(obj, name)] 642 return skipWithClientIf( 643 missing, "don't have " + ", ".join(name for name in missing)) 644 645 646def requireSocket(*args): 647 """Skip decorated test if a socket cannot be created with given arguments. 648 649 When an argument is given as a string, will use the value of that 650 attribute of the socket module, or skip the test if it doesn't 651 exist. Sets client_skip attribute as skipWithClientIf() does. 652 """ 653 err = None 654 missing = [obj for obj in args if 655 isinstance(obj, str) and not hasattr(socket, obj)] 656 if missing: 657 err = "don't have " + ", ".join(name for name in missing) 658 else: 659 callargs = [getattr(socket, obj) if isinstance(obj, str) else obj 660 for obj in args] 661 try: 662 s = socket.socket(*callargs) 663 except OSError as e: 664 # XXX: check errno? 665 err = str(e) 666 else: 667 s.close() 668 return skipWithClientIf( 669 err is not None, 670 "can't create socket({0}): {1}".format( 671 ", ".join(str(o) for o in args), err)) 672 673 674####################################################################### 675## Begin Tests 676 677class GeneralModuleTests(unittest.TestCase): 678 679 def test_SocketType_is_socketobject(self): 680 import _socket 681 self.assertTrue(socket.SocketType is _socket.socket) 682 s = socket.socket() 683 self.assertIsInstance(s, socket.SocketType) 684 s.close() 685 686 def test_repr(self): 687 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 688 with s: 689 self.assertIn('fd=%i' % s.fileno(), repr(s)) 690 self.assertIn('family=%s' % socket.AF_INET, repr(s)) 691 self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) 692 self.assertIn('proto=0', repr(s)) 693 self.assertNotIn('raddr', repr(s)) 694 s.bind(('127.0.0.1', 0)) 695 self.assertIn('laddr', repr(s)) 696 self.assertIn(str(s.getsockname()), repr(s)) 697 self.assertIn('[closed]', repr(s)) 698 self.assertNotIn('laddr', repr(s)) 699 700 @unittest.skipUnless(_socket is not None, 'need _socket module') 701 def test_csocket_repr(self): 702 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 703 try: 704 expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' 705 % (s.fileno(), s.family, s.type, s.proto)) 706 self.assertEqual(repr(s), expected) 707 finally: 708 s.close() 709 expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' 710 % (s.family, s.type, s.proto)) 711 self.assertEqual(repr(s), expected) 712 713 def test_weakref(self): 714 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 715 p = proxy(s) 716 self.assertEqual(p.fileno(), s.fileno()) 717 s.close() 718 s = None 719 support.gc_collect() 720 try: 721 p.fileno() 722 except ReferenceError: 723 pass 724 else: 725 self.fail('Socket proxy still exists') 726 727 def testSocketError(self): 728 # Testing socket module exceptions 729 msg = "Error raising socket exception (%s)." 730 with self.assertRaises(OSError, msg=msg % 'OSError'): 731 raise OSError 732 with self.assertRaises(OSError, msg=msg % 'socket.herror'): 733 raise socket.herror 734 with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): 735 raise socket.gaierror 736 737 def testSendtoErrors(self): 738 # Testing that sendto doesn't mask failures. See #10169. 739 # PyPy note: made the test accept broader messages: PyPy's 740 # messages are equivalent but worded differently. 741 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 742 self.addCleanup(s.close) 743 s.bind(('', 0)) 744 sockname = s.getsockname() 745 # 2 args 746 with self.assertRaises(TypeError) as cm: 747 s.sendto('\u2620', sockname) 748 self.assertIn(str(cm.exception), 749 ["a bytes-like object is required, not 'str'", # cpython 750 "a bytes-like object is required, not str"]) # pypy 751 with self.assertRaises(TypeError) as cm: 752 s.sendto(5j, sockname) 753 self.assertIn(str(cm.exception), 754 ["a bytes-like object is required, not 'complex'", 755 "a bytes-like object is required, not complex"]) 756 with self.assertRaises(TypeError) as cm: 757 s.sendto(b'foo', None) 758 self.assertIn('NoneType', str(cm.exception)) 759 # 3 args 760 with self.assertRaises(TypeError) as cm: 761 s.sendto('\u2620', 0, sockname) 762 self.assertIn(str(cm.exception), 763 ["a bytes-like object is required, not 'str'", 764 "a bytes-like object is required, not str"]) 765 with self.assertRaises(TypeError) as cm: 766 s.sendto(5j, 0, sockname) 767 self.assertIn(str(cm.exception), 768 ["a bytes-like object is required, not 'complex'", 769 "a bytes-like object is required, not complex"]) 770 with self.assertRaises(TypeError) as cm: 771 s.sendto(b'foo', 0, None) 772 self.assertIn('NoneType', str(cm.exception)) 773 with self.assertRaises(TypeError) as cm: 774 s.sendto(b'foo', 'bar', sockname) 775 self.assertIn('integer', str(cm.exception)) 776 with self.assertRaises(TypeError) as cm: 777 s.sendto(b'foo', None, None) 778 self.assertIn('integer', str(cm.exception)) 779 # wrong number of args 780 with self.assertRaises(TypeError) as cm: 781 s.sendto(b'foo') 782 if support.check_impl_detail(): 783 self.assertIn(' given)', str(cm.exception)) 784 with self.assertRaises(TypeError) as cm: 785 s.sendto(b'foo', 0, sockname, 4) 786 self.assertIn(' given', str(cm.exception)) 787 788 def testCrucialConstants(self): 789 # Testing for mission critical constants 790 socket.AF_INET 791 socket.SOCK_STREAM 792 socket.SOCK_DGRAM 793 socket.SOCK_RAW 794 socket.SOCK_RDM 795 socket.SOCK_SEQPACKET 796 socket.SOL_SOCKET 797 socket.SO_REUSEADDR 798 799 def testHostnameRes(self): 800 # Testing hostname resolution mechanisms 801 hostname = socket.gethostname() 802 try: 803 ip = socket.gethostbyname(hostname) 804 except OSError: 805 # Probably name lookup wasn't set up right; skip this test 806 self.skipTest('name lookup failure') 807 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 808 try: 809 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 810 except OSError: 811 # Probably a similar problem as above; skip this test 812 self.skipTest('name lookup failure') 813 all_host_names = [hostname, hname] + aliases 814 fqhn = socket.getfqdn(ip) 815 if not fqhn in all_host_names: 816 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 817 818 def test_host_resolution(self): 819 for addr in [support.HOST, '10.0.0.1', '255.255.255.255']: 820 self.assertEqual(socket.gethostbyname(addr), addr) 821 822 # we don't test support.HOSTv6 because there's a chance it doesn't have 823 # a matching name entry (e.g. 'ip6-localhost') 824 for host in [support.HOST]: 825 self.assertIn(host, socket.gethostbyaddr(host)[2]) 826 827 def test_host_resolution_bad_address(self): 828 # These are all malformed IP addresses and expected not to resolve to 829 # any result. But some ISPs, e.g. AWS, may successfully resolve these 830 # IPs. 831 explanation = ( 832 "resolving an invalid IP address did not raise OSError; " 833 "can be caused by a broken DNS server" 834 ) 835 for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', 836 '1:1:1:1:1:1:1:1:1']: 837 with self.assertRaises(OSError): 838 socket.gethostbyname(addr) 839 with self.assertRaises(OSError, msg=explanation): 840 socket.gethostbyaddr(addr) 841 842 @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") 843 @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") 844 def test_sethostname(self): 845 oldhn = socket.gethostname() 846 try: 847 socket.sethostname('new') 848 except OSError as e: 849 if e.errno == errno.EPERM: 850 self.skipTest("test should be run as root") 851 else: 852 raise 853 try: 854 # running test as root! 855 self.assertEqual(socket.gethostname(), 'new') 856 # Should work with bytes objects too 857 socket.sethostname(b'bar') 858 self.assertEqual(socket.gethostname(), 'bar') 859 finally: 860 socket.sethostname(oldhn) 861 862 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 863 'socket.if_nameindex() not available.') 864 def testInterfaceNameIndex(self): 865 interfaces = socket.if_nameindex() 866 for index, name in interfaces: 867 self.assertIsInstance(index, int) 868 self.assertIsInstance(name, str) 869 # interface indices are non-zero integers 870 self.assertGreater(index, 0) 871 _index = socket.if_nametoindex(name) 872 self.assertIsInstance(_index, int) 873 self.assertEqual(index, _index) 874 _name = socket.if_indextoname(index) 875 self.assertIsInstance(_name, str) 876 self.assertEqual(name, _name) 877 878 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 879 'socket.if_nameindex() not available.') 880 def testInvalidInterfaceNameIndex(self): 881 # test nonexistent interface index/name 882 self.assertRaises(OSError, socket.if_indextoname, 0) 883 self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') 884 # test with invalid values 885 self.assertRaises(TypeError, socket.if_nametoindex, 0) 886 self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') 887 888 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 889 'test needs sys.getrefcount()') 890 def testRefCountGetNameInfo(self): 891 # Testing reference count for getnameinfo 892 try: 893 # On some versions, this loses a reference 894 orig = sys.getrefcount(__name__) 895 socket.getnameinfo(__name__,0) 896 except TypeError: 897 if sys.getrefcount(__name__) != orig: 898 self.fail("socket.getnameinfo loses a reference") 899 900 def testInterpreterCrash(self): 901 # Making sure getnameinfo doesn't crash the interpreter 902 try: 903 # On some versions, this crashes the interpreter. 904 socket.getnameinfo(('x', 0, 0, 0), 0) 905 except OSError: 906 pass 907 908 def testNtoH(self): 909 # This just checks that htons etc. are their own inverse, 910 # when looking at the lower 16 or 32 bits. 911 sizes = {socket.htonl: 32, socket.ntohl: 32, 912 socket.htons: 16, socket.ntohs: 16} 913 for func, size in sizes.items(): 914 mask = (1<<size) - 1 915 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 916 self.assertEqual(i & mask, func(func(i&mask)) & mask) 917 918 swapped = func(mask) 919 self.assertEqual(swapped & mask, mask) 920 self.assertRaises(OverflowError, func, 1<<34) 921 922 @support.cpython_only 923 def testNtoHErrors(self): 924 good_values = [ 1, 2, 3, 1, 2, 3 ] 925 bad_values = [ -1, -2, -3, -1, -2, -3 ] 926 for k in good_values: 927 socket.ntohl(k) 928 socket.ntohs(k) 929 socket.htonl(k) 930 socket.htons(k) 931 for k in bad_values: 932 self.assertRaises(OverflowError, socket.ntohl, k) 933 self.assertRaises(OverflowError, socket.ntohs, k) 934 self.assertRaises(OverflowError, socket.htonl, k) 935 self.assertRaises(OverflowError, socket.htons, k) 936 937 def testGetServBy(self): 938 eq = self.assertEqual 939 # Find one service that exists, then check all the related interfaces. 940 # I've ordered this by protocols that have both a tcp and udp 941 # protocol, at least for modern Linuxes. 942 if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) 943 or sys.platform in ('linux', 'darwin')): 944 # avoid the 'echo' service on this platform, as there is an 945 # assumption breaking non-standard port/protocol entry 946 services = ('daytime', 'qotd', 'domain') 947 else: 948 services = ('echo', 'daytime', 'domain') 949 for service in services: 950 try: 951 port = socket.getservbyname(service, 'tcp') 952 break 953 except OSError: 954 pass 955 else: 956 raise OSError 957 # Try same call with optional protocol omitted 958 port2 = socket.getservbyname(service) 959 eq(port, port2) 960 # Try udp, but don't barf if it doesn't exist 961 try: 962 udpport = socket.getservbyname(service, 'udp') 963 except OSError: 964 udpport = None 965 else: 966 eq(udpport, port) 967 # Now make sure the lookup by port returns the same service name 968 eq(socket.getservbyport(port2), service) 969 eq(socket.getservbyport(port, 'tcp'), service) 970 if udpport is not None: 971 eq(socket.getservbyport(udpport, 'udp'), service) 972 # Make sure getservbyport does not accept out of range ports. 973 self.assertRaises(OverflowError, socket.getservbyport, -1) 974 self.assertRaises(OverflowError, socket.getservbyport, 65536) 975 976 def testDefaultTimeout(self): 977 # Testing default timeout 978 # The default timeout should initially be None 979 self.assertEqual(socket.getdefaulttimeout(), None) 980 s = socket.socket() 981 self.assertEqual(s.gettimeout(), None) 982 s.close() 983 984 # Set the default timeout to 10, and see if it propagates 985 socket.setdefaulttimeout(10) 986 self.assertEqual(socket.getdefaulttimeout(), 10) 987 s = socket.socket() 988 self.assertEqual(s.gettimeout(), 10) 989 s.close() 990 991 # Reset the default timeout to None, and see if it propagates 992 socket.setdefaulttimeout(None) 993 self.assertEqual(socket.getdefaulttimeout(), None) 994 s = socket.socket() 995 self.assertEqual(s.gettimeout(), None) 996 s.close() 997 998 # Check that setting it to an invalid value raises ValueError 999 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 1000 1001 # Check that setting it to an invalid type raises TypeError 1002 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 1003 1004 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 1005 'test needs socket.inet_aton()') 1006 def testIPv4_inet_aton_fourbytes(self): 1007 # Test that issue1008086 and issue767150 are fixed. 1008 # It must return 4 bytes. 1009 self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) 1010 self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) 1011 1012 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1013 'test needs socket.inet_pton()') 1014 def testIPv4toString(self): 1015 from socket import inet_aton as f, inet_pton, AF_INET 1016 g = lambda a: inet_pton(AF_INET, a) 1017 1018 assertInvalid = lambda func,a: self.assertRaises( 1019 (OSError, ValueError), func, a 1020 ) 1021 1022 self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) 1023 self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) 1024 self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) 1025 self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) 1026 self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) 1027 assertInvalid(f, '0.0.0.') 1028 assertInvalid(f, '300.0.0.0') 1029 assertInvalid(f, 'a.0.0.0') 1030 assertInvalid(f, '1.2.3.4.5') 1031 assertInvalid(f, '::1') 1032 1033 self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) 1034 self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) 1035 self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) 1036 self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) 1037 assertInvalid(g, '0.0.0.') 1038 assertInvalid(g, '300.0.0.0') 1039 assertInvalid(g, 'a.0.0.0') 1040 assertInvalid(g, '1.2.3.4.5') 1041 assertInvalid(g, '::1') 1042 1043 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1044 'test needs socket.inet_pton()') 1045 def testIPv6toString(self): 1046 try: 1047 from socket import inet_pton, AF_INET6, has_ipv6 1048 if not has_ipv6: 1049 self.skipTest('IPv6 not available') 1050 except ImportError: 1051 self.skipTest('could not import needed symbols from socket') 1052 1053 if sys.platform == "win32": 1054 try: 1055 inet_pton(AF_INET6, '::') 1056 except OSError as e: 1057 if e.winerror == 10022: 1058 self.skipTest('IPv6 might not be supported') 1059 1060 f = lambda a: inet_pton(AF_INET6, a) 1061 assertInvalid = lambda a: self.assertRaises( 1062 (OSError, ValueError), f, a 1063 ) 1064 1065 self.assertEqual(b'\x00' * 16, f('::')) 1066 self.assertEqual(b'\x00' * 16, f('0::0')) 1067 self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) 1068 self.assertEqual( 1069 b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 1070 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 1071 ) 1072 self.assertEqual( 1073 b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', 1074 f('ad42:abc::127:0:254:2') 1075 ) 1076 self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) 1077 assertInvalid('0x20::') 1078 assertInvalid(':::') 1079 assertInvalid('::0::') 1080 assertInvalid('1::abc::') 1081 assertInvalid('1::abc::def') 1082 assertInvalid('1:2:3:4:5:6:') 1083 assertInvalid('1:2:3:4:5:6') 1084 assertInvalid('1:2:3:4:5:6:7:8:') 1085 assertInvalid('1:2:3:4:5:6:7:8:0') 1086 1087 self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', 1088 f('::254.42.23.64') 1089 ) 1090 self.assertEqual( 1091 b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', 1092 f('42::a29b:254.42.23.64') 1093 ) 1094 self.assertEqual( 1095 b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', 1096 f('42:a8b9:0:2:ffff:a29b:254.42.23.64') 1097 ) 1098 assertInvalid('255.254.253.252') 1099 assertInvalid('1::260.2.3.0') 1100 assertInvalid('1::0.be.e.0') 1101 assertInvalid('1:2:3:4:5:6:7:1.2.3.4') 1102 assertInvalid('::1.2.3.4:0') 1103 assertInvalid('0.100.200.0:3:4:5:6:7:8') 1104 1105 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1106 'test needs socket.inet_ntop()') 1107 def testStringToIPv4(self): 1108 from socket import inet_ntoa as f, inet_ntop, AF_INET 1109 g = lambda a: inet_ntop(AF_INET, a) 1110 assertInvalid = lambda func,a: self.assertRaises( 1111 (OSError, ValueError), func, a 1112 ) 1113 1114 self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) 1115 self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) 1116 self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) 1117 self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) 1118 assertInvalid(f, b'\x00' * 3) 1119 assertInvalid(f, b'\x00' * 5) 1120 assertInvalid(f, b'\x00' * 16) 1121 self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) 1122 1123 self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) 1124 self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) 1125 self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) 1126 assertInvalid(g, b'\x00' * 3) 1127 assertInvalid(g, b'\x00' * 5) 1128 assertInvalid(g, b'\x00' * 16) 1129 self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) 1130 1131 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1132 'test needs socket.inet_ntop()') 1133 def testStringToIPv6(self): 1134 try: 1135 from socket import inet_ntop, AF_INET6, has_ipv6 1136 if not has_ipv6: 1137 self.skipTest('IPv6 not available') 1138 except ImportError: 1139 self.skipTest('could not import needed symbols from socket') 1140 1141 if sys.platform == "win32": 1142 try: 1143 inet_ntop(AF_INET6, b'\x00' * 16) 1144 except OSError as e: 1145 if e.winerror == 10022: 1146 self.skipTest('IPv6 might not be supported') 1147 1148 f = lambda a: inet_ntop(AF_INET6, a) 1149 assertInvalid = lambda a: self.assertRaises( 1150 (OSError, ValueError), f, a 1151 ) 1152 1153 self.assertEqual('::', f(b'\x00' * 16)) 1154 self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) 1155 self.assertEqual( 1156 'aef:b01:506:1001:ffff:9997:55:170', 1157 f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 1158 ) 1159 self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) 1160 1161 assertInvalid(b'\x12' * 15) 1162 assertInvalid(b'\x12' * 17) 1163 assertInvalid(b'\x12' * 4) 1164 1165 # XXX The following don't test module-level functionality... 1166 1167 def testSockName(self): 1168 # Testing getsockname() 1169 port = support.find_unused_port() 1170 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1171 self.addCleanup(sock.close) 1172 sock.bind(("0.0.0.0", port)) 1173 name = sock.getsockname() 1174 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 1175 # it reasonable to get the host's addr in addition to 0.0.0.0. 1176 # At least for eCos. This is required for the S/390 to pass. 1177 try: 1178 my_ip_addr = socket.gethostbyname(socket.gethostname()) 1179 except OSError: 1180 # Probably name lookup wasn't set up right; skip this test 1181 self.skipTest('name lookup failure') 1182 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 1183 self.assertEqual(name[1], port) 1184 1185 def testGetSockOpt(self): 1186 # Testing getsockopt() 1187 # We know a socket should start without reuse==0 1188 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1189 self.addCleanup(sock.close) 1190 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1191 self.assertFalse(reuse != 0, "initial mode is reuse") 1192 1193 def testSetSockOpt(self): 1194 # Testing setsockopt() 1195 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1196 self.addCleanup(sock.close) 1197 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1198 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1199 self.assertFalse(reuse == 0, "failed to set reuse mode") 1200 1201 def testSendAfterClose(self): 1202 # testing send() after close() with timeout 1203 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1204 sock.settimeout(1) 1205 sock.close() 1206 self.assertRaises(OSError, sock.send, b"spam") 1207 1208 def testCloseException(self): 1209 sock = socket.socket() 1210 socket.socket(fileno=sock.fileno()).close() 1211 try: 1212 sock.close() 1213 except OSError as err: 1214 # Winsock apparently raises ENOTSOCK 1215 self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) 1216 else: 1217 self.fail("close() should raise EBADF/ENOTSOCK") 1218 1219 def testNewAttributes(self): 1220 # testing .family, .type and .protocol 1221 1222 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1223 self.assertEqual(sock.family, socket.AF_INET) 1224 if hasattr(socket, 'SOCK_CLOEXEC'): 1225 self.assertIn(sock.type, 1226 (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, 1227 socket.SOCK_STREAM)) 1228 else: 1229 self.assertEqual(sock.type, socket.SOCK_STREAM) 1230 self.assertEqual(sock.proto, 0) 1231 sock.close() 1232 1233 def test_getsockaddrarg(self): 1234 sock = socket.socket() 1235 self.addCleanup(sock.close) 1236 port = support.find_unused_port() 1237 big_port = port + 65536 1238 neg_port = port - 65536 1239 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 1240 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 1241 # Since find_unused_port() is inherently subject to race conditions, we 1242 # call it a couple times if necessary. 1243 for i in itertools.count(): 1244 port = support.find_unused_port() 1245 try: 1246 sock.bind((HOST, port)) 1247 except OSError as e: 1248 if e.errno != errno.EADDRINUSE or i == 5: 1249 raise 1250 else: 1251 break 1252 1253 @unittest.skipUnless(os.name == "nt", "Windows specific") 1254 def test_sock_ioctl(self): 1255 self.assertTrue(hasattr(socket.socket, 'ioctl')) 1256 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 1257 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 1258 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 1259 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 1260 s = socket.socket() 1261 self.addCleanup(s.close) 1262 self.assertRaises(ValueError, s.ioctl, -1, None) 1263 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 1264 1265 @unittest.skipUnless(os.name == "nt", "Windows specific") 1266 @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), 1267 'Loopback fast path support required for this test') 1268 def test_sio_loopback_fast_path(self): 1269 s = socket.socket() 1270 self.addCleanup(s.close) 1271 try: 1272 s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) 1273 except OSError as exc: 1274 WSAEOPNOTSUPP = 10045 1275 if exc.winerror == WSAEOPNOTSUPP: 1276 self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " 1277 "doesn't implemented in this Windows version") 1278 raise 1279 self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) 1280 1281 def testGetaddrinfo(self): 1282 try: 1283 socket.getaddrinfo('localhost', 80) 1284 except socket.gaierror as err: 1285 if err.errno == socket.EAI_SERVICE: 1286 # see http://bugs.python.org/issue1282647 1287 self.skipTest("buggy libc version") 1288 raise 1289 # len of every sequence is supposed to be == 5 1290 for info in socket.getaddrinfo(HOST, None): 1291 self.assertEqual(len(info), 5) 1292 # host can be a domain name, a string representation of an 1293 # IPv4/v6 address or None 1294 socket.getaddrinfo('localhost', 80) 1295 socket.getaddrinfo('127.0.0.1', 80) 1296 socket.getaddrinfo(None, 80) 1297 if support.IPV6_ENABLED: 1298 socket.getaddrinfo('::1', 80) 1299 # port can be a string service name such as "http", a numeric 1300 # port number or None 1301 socket.getaddrinfo(HOST, "http") 1302 socket.getaddrinfo(HOST, 80) 1303 socket.getaddrinfo(HOST, None) 1304 # test family and socktype filters 1305 infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) 1306 for family, type, _, _, _ in infos: 1307 self.assertEqual(family, socket.AF_INET) 1308 self.assertEqual(str(family), 'AddressFamily.AF_INET') 1309 self.assertEqual(type, socket.SOCK_STREAM) 1310 self.assertEqual(str(type), 'SocketKind.SOCK_STREAM') 1311 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1312 for _, socktype, _, _, _ in infos: 1313 self.assertEqual(socktype, socket.SOCK_STREAM) 1314 # test proto and flags arguments 1315 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1316 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1317 # a server willing to support both IPv4 and IPv6 will 1318 # usually do this 1319 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1320 socket.AI_PASSIVE) 1321 # test keyword arguments 1322 a = socket.getaddrinfo(HOST, None) 1323 b = socket.getaddrinfo(host=HOST, port=None) 1324 self.assertEqual(a, b) 1325 a = socket.getaddrinfo(HOST, None, socket.AF_INET) 1326 b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) 1327 self.assertEqual(a, b) 1328 a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1329 b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) 1330 self.assertEqual(a, b) 1331 a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1332 b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) 1333 self.assertEqual(a, b) 1334 a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1335 b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) 1336 self.assertEqual(a, b) 1337 a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1338 socket.AI_PASSIVE) 1339 b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, 1340 type=socket.SOCK_STREAM, proto=0, 1341 flags=socket.AI_PASSIVE) 1342 self.assertEqual(a, b) 1343 # Issue #6697. 1344 self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') 1345 1346 # Issue 17269: test workaround for OS X platform bug segfault 1347 if hasattr(socket, 'AI_NUMERICSERV'): 1348 try: 1349 # The arguments here are undefined and the call may succeed 1350 # or fail. All we care here is that it doesn't segfault. 1351 socket.getaddrinfo("localhost", None, 0, 0, 0, 1352 socket.AI_NUMERICSERV) 1353 except socket.gaierror: 1354 pass 1355 1356 def test_getnameinfo(self): 1357 # only IP addresses are allowed 1358 self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) 1359 1360 @unittest.skipUnless(support.is_resource_enabled('network'), 1361 'network is not enabled') 1362 def test_idna(self): 1363 # Check for internet access before running test 1364 # (issue #12804, issue #25138). 1365 with support.transient_internet('python.org'): 1366 socket.gethostbyname('python.org') 1367 1368 # these should all be successful 1369 domain = 'испытание.pythontest.net' 1370 socket.gethostbyname(domain) 1371 socket.gethostbyname_ex(domain) 1372 socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) 1373 # this may not work if the forward lookup chooses the IPv6 address, as that doesn't 1374 # have a reverse entry yet 1375 # socket.gethostbyaddr('испытание.python.org') 1376 1377 def check_sendall_interrupted(self, with_timeout): 1378 # socketpair() is not strictly required, but it makes things easier. 1379 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 1380 self.skipTest("signal.alarm and socket.socketpair required for this test") 1381 # Our signal handlers clobber the C errno by calling a math function 1382 # with an invalid domain value. 1383 def ok_handler(*args): 1384 self.assertRaises(ValueError, math.acosh, 0) 1385 def raising_handler(*args): 1386 self.assertRaises(ValueError, math.acosh, 0) 1387 1 // 0 1388 c, s = socket.socketpair() 1389 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 1390 try: 1391 if with_timeout: 1392 # Just above the one second minimum for signal.alarm 1393 c.settimeout(1.5) 1394 with self.assertRaises(ZeroDivisionError): 1395 signal.alarm(1) 1396 c.sendall(b"x" * support.SOCK_MAX_SIZE) 1397 if with_timeout: 1398 signal.signal(signal.SIGALRM, ok_handler) 1399 signal.alarm(1) 1400 self.assertRaises(socket.timeout, c.sendall, 1401 b"x" * support.SOCK_MAX_SIZE) 1402 finally: 1403 signal.alarm(0) 1404 signal.signal(signal.SIGALRM, old_alarm) 1405 c.close() 1406 s.close() 1407 1408 def test_sendall_interrupted(self): 1409 self.check_sendall_interrupted(False) 1410 1411 def test_sendall_interrupted_with_timeout(self): 1412 self.check_sendall_interrupted(True) 1413 1414 def test_dealloc_warn(self): 1415 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1416 r = repr(sock) 1417 with self.assertWarns(ResourceWarning) as cm: 1418 sock = None 1419 support.gc_collect() 1420 self.assertIn(r, str(cm.warning.args[0])) 1421 # An open socket file object gets dereferenced after the socket 1422 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1423 f = sock.makefile('rb') 1424 r = repr(sock) 1425 sock = None 1426 support.gc_collect() 1427 with self.assertWarns(ResourceWarning): 1428 f = None 1429 support.gc_collect() 1430 1431 def test_name_closed_socketio(self): 1432 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1433 fp = sock.makefile("rb") 1434 fp.close() 1435 self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") 1436 1437 def test_unusable_closed_socketio(self): 1438 with socket.socket() as sock: 1439 fp = sock.makefile("rb", buffering=0) 1440 self.assertTrue(fp.readable()) 1441 self.assertFalse(fp.writable()) 1442 self.assertFalse(fp.seekable()) 1443 fp.close() 1444 self.assertRaises(ValueError, fp.readable) 1445 self.assertRaises(ValueError, fp.writable) 1446 self.assertRaises(ValueError, fp.seekable) 1447 1448 def test_makefile_mode(self): 1449 for mode in 'r', 'rb', 'rw', 'w', 'wb': 1450 with self.subTest(mode=mode): 1451 with socket.socket() as sock: 1452 with sock.makefile(mode) as fp: 1453 self.assertEqual(fp.mode, mode) 1454 1455 def test_makefile_invalid_mode(self): 1456 for mode in 'rt', 'x', '+', 'a': 1457 with self.subTest(mode=mode): 1458 with socket.socket() as sock: 1459 with self.assertRaisesRegex(ValueError, 'invalid mode'): 1460 sock.makefile(mode) 1461 1462 def test_pickle(self): 1463 sock = socket.socket() 1464 with sock: 1465 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1466 self.assertRaises(TypeError, pickle.dumps, sock, protocol) 1467 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1468 family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) 1469 self.assertEqual(family, socket.AF_INET) 1470 type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) 1471 self.assertEqual(type, socket.SOCK_STREAM) 1472 1473 def test_listen_backlog(self): 1474 for backlog in 0, -1: 1475 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1476 srv.bind((HOST, 0)) 1477 srv.listen(backlog) 1478 1479 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1480 srv.bind((HOST, 0)) 1481 srv.listen() 1482 1483 @support.cpython_only 1484 def test_listen_backlog_overflow(self): 1485 # Issue 15989 1486 import _testcapi 1487 srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1488 srv.bind((HOST, 0)) 1489 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 1490 srv.close() 1491 1492 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 1493 def test_flowinfo(self): 1494 self.assertRaises(OverflowError, socket.getnameinfo, 1495 (support.HOSTv6, 0, 0xffffffff), 0) 1496 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 1497 self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) 1498 1499 def test_str_for_enums(self): 1500 # Make sure that the AF_* and SOCK_* constants have enum-like string 1501 # reprs. 1502 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1503 self.assertEqual(str(s.family), 'AddressFamily.AF_INET') 1504 self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM') 1505 1506 @unittest.skipIf(os.name == 'nt', 'Will not work on Windows') 1507 def test_uknown_socket_family_repr(self): 1508 # Test that when created with a family that's not one of the known 1509 # AF_*/SOCK_* constants, socket.family just returns the number. 1510 # 1511 # To do this we fool socket.socket into believing it already has an 1512 # open fd because on this path it doesn't actually verify the family and 1513 # type and populates the socket object. 1514 # 1515 # On Windows this trick won't work, so the test is skipped. 1516 fd, path = tempfile.mkstemp() 1517 self.addCleanup(os.unlink, path) 1518 with socket.socket(family=42424, type=13331, fileno=fd) as s: 1519 self.assertEqual(s.family, 42424) 1520 self.assertEqual(s.type, 13331) 1521 1522 @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') 1523 def test__sendfile_use_sendfile(self): 1524 class File: 1525 def __init__(self, fd): 1526 self.fd = fd 1527 1528 def fileno(self): 1529 return self.fd 1530 with socket.socket() as sock: 1531 fd = os.open(os.curdir, os.O_RDONLY) 1532 os.close(fd) 1533 with self.assertRaises(socket._GiveupOnSendfile): 1534 sock._sendfile_use_sendfile(File(fd)) 1535 with self.assertRaises(OverflowError): 1536 sock._sendfile_use_sendfile(File(2**1000)) 1537 with self.assertRaises(TypeError): 1538 sock._sendfile_use_sendfile(File(None)) 1539 1540 1541@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1542class BasicCANTest(unittest.TestCase): 1543 1544 def testCrucialConstants(self): 1545 socket.AF_CAN 1546 socket.PF_CAN 1547 socket.CAN_RAW 1548 1549 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1550 'socket.CAN_BCM required for this test.') 1551 def testBCMConstants(self): 1552 socket.CAN_BCM 1553 1554 # opcodes 1555 socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task 1556 socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task 1557 socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task 1558 socket.CAN_BCM_TX_SEND # send one CAN frame 1559 socket.CAN_BCM_RX_SETUP # create RX content filter subscription 1560 socket.CAN_BCM_RX_DELETE # remove RX content filter subscription 1561 socket.CAN_BCM_RX_READ # read properties of RX content filter subscription 1562 socket.CAN_BCM_TX_STATUS # reply to TX_READ request 1563 socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) 1564 socket.CAN_BCM_RX_STATUS # reply to RX_READ request 1565 socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent 1566 socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) 1567 1568 def testCreateSocket(self): 1569 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1570 pass 1571 1572 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1573 'socket.CAN_BCM required for this test.') 1574 def testCreateBCMSocket(self): 1575 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: 1576 pass 1577 1578 def testBindAny(self): 1579 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1580 s.bind(('', )) 1581 1582 def testTooLongInterfaceName(self): 1583 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 1584 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1585 self.assertRaisesRegex(OSError, 'interface name too long', 1586 s.bind, ('x' * 1024,)) 1587 1588 @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), 1589 'socket.CAN_RAW_LOOPBACK required for this test.') 1590 def testLoopback(self): 1591 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1592 for loopback in (0, 1): 1593 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, 1594 loopback) 1595 self.assertEqual(loopback, 1596 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) 1597 1598 @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), 1599 'socket.CAN_RAW_FILTER required for this test.') 1600 def testFilter(self): 1601 can_id, can_mask = 0x200, 0x700 1602 can_filter = struct.pack("=II", can_id, can_mask) 1603 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 1604 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) 1605 self.assertEqual(can_filter, 1606 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) 1607 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) 1608 1609 1610@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 1611@unittest.skipUnless(thread, 'Threading required for this test.') 1612class CANTest(ThreadedCANSocketTest): 1613 1614 def __init__(self, methodName='runTest'): 1615 ThreadedCANSocketTest.__init__(self, methodName=methodName) 1616 1617 @classmethod 1618 def build_can_frame(cls, can_id, data): 1619 """Build a CAN frame.""" 1620 can_dlc = len(data) 1621 data = data.ljust(8, b'\x00') 1622 return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) 1623 1624 @classmethod 1625 def dissect_can_frame(cls, frame): 1626 """Dissect a CAN frame.""" 1627 can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) 1628 return (can_id, can_dlc, data[:can_dlc]) 1629 1630 def testSendFrame(self): 1631 cf, addr = self.s.recvfrom(self.bufsize) 1632 self.assertEqual(self.cf, cf) 1633 self.assertEqual(addr[0], self.interface) 1634 self.assertEqual(addr[1], socket.AF_CAN) 1635 1636 def _testSendFrame(self): 1637 self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') 1638 self.cli.send(self.cf) 1639 1640 def testSendMaxFrame(self): 1641 cf, addr = self.s.recvfrom(self.bufsize) 1642 self.assertEqual(self.cf, cf) 1643 1644 def _testSendMaxFrame(self): 1645 self.cf = self.build_can_frame(0x00, b'\x07' * 8) 1646 self.cli.send(self.cf) 1647 1648 def testSendMultiFrames(self): 1649 cf, addr = self.s.recvfrom(self.bufsize) 1650 self.assertEqual(self.cf1, cf) 1651 1652 cf, addr = self.s.recvfrom(self.bufsize) 1653 self.assertEqual(self.cf2, cf) 1654 1655 def _testSendMultiFrames(self): 1656 self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') 1657 self.cli.send(self.cf1) 1658 1659 self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') 1660 self.cli.send(self.cf2) 1661 1662 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1663 'socket.CAN_BCM required for this test.') 1664 def _testBCM(self): 1665 cf, addr = self.cli.recvfrom(self.bufsize) 1666 self.assertEqual(self.cf, cf) 1667 can_id, can_dlc, data = self.dissect_can_frame(cf) 1668 self.assertEqual(self.can_id, can_id) 1669 self.assertEqual(self.data, data) 1670 1671 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 1672 'socket.CAN_BCM required for this test.') 1673 def testBCM(self): 1674 bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) 1675 self.addCleanup(bcm.close) 1676 bcm.connect((self.interface,)) 1677 self.can_id = 0x123 1678 self.data = bytes([0xc0, 0xff, 0xee]) 1679 self.cf = self.build_can_frame(self.can_id, self.data) 1680 opcode = socket.CAN_BCM_TX_SEND 1681 flags = 0 1682 count = 0 1683 ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 1684 bcm_can_id = 0x0222 1685 nframes = 1 1686 assert len(self.cf) == 16 1687 header = struct.pack(self.bcm_cmd_msg_fmt, 1688 opcode, 1689 flags, 1690 count, 1691 ival1_seconds, 1692 ival1_usec, 1693 ival2_seconds, 1694 ival2_usec, 1695 bcm_can_id, 1696 nframes, 1697 ) 1698 header_plus_frame = header + self.cf 1699 bytes_sent = bcm.send(header_plus_frame) 1700 self.assertEqual(bytes_sent, len(header_plus_frame)) 1701 1702 1703@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 1704class BasicRDSTest(unittest.TestCase): 1705 1706 def testCrucialConstants(self): 1707 socket.AF_RDS 1708 socket.PF_RDS 1709 1710 def testCreateSocket(self): 1711 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 1712 pass 1713 1714 def testSocketBufferSize(self): 1715 bufsize = 16384 1716 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 1717 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) 1718 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) 1719 1720 1721@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 1722@unittest.skipUnless(thread, 'Threading required for this test.') 1723class RDSTest(ThreadedRDSSocketTest): 1724 1725 def __init__(self, methodName='runTest'): 1726 ThreadedRDSSocketTest.__init__(self, methodName=methodName) 1727 1728 def setUp(self): 1729 super().setUp() 1730 self.evt = threading.Event() 1731 1732 def testSendAndRecv(self): 1733 data, addr = self.serv.recvfrom(self.bufsize) 1734 self.assertEqual(self.data, data) 1735 self.assertEqual(self.cli_addr, addr) 1736 1737 def _testSendAndRecv(self): 1738 self.data = b'spam' 1739 self.cli.sendto(self.data, 0, (HOST, self.port)) 1740 1741 def testPeek(self): 1742 data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) 1743 self.assertEqual(self.data, data) 1744 data, addr = self.serv.recvfrom(self.bufsize) 1745 self.assertEqual(self.data, data) 1746 1747 def _testPeek(self): 1748 self.data = b'spam' 1749 self.cli.sendto(self.data, 0, (HOST, self.port)) 1750 1751 @requireAttrs(socket.socket, 'recvmsg') 1752 def testSendAndRecvMsg(self): 1753 data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) 1754 self.assertEqual(self.data, data) 1755 1756 @requireAttrs(socket.socket, 'sendmsg') 1757 def _testSendAndRecvMsg(self): 1758 self.data = b'hello ' * 10 1759 self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) 1760 1761 def testSendAndRecvMulti(self): 1762 data, addr = self.serv.recvfrom(self.bufsize) 1763 self.assertEqual(self.data1, data) 1764 1765 data, addr = self.serv.recvfrom(self.bufsize) 1766 self.assertEqual(self.data2, data) 1767 1768 def _testSendAndRecvMulti(self): 1769 self.data1 = b'bacon' 1770 self.cli.sendto(self.data1, 0, (HOST, self.port)) 1771 1772 self.data2 = b'egg' 1773 self.cli.sendto(self.data2, 0, (HOST, self.port)) 1774 1775 def testSelect(self): 1776 r, w, x = select.select([self.serv], [], [], 3.0) 1777 self.assertIn(self.serv, r) 1778 data, addr = self.serv.recvfrom(self.bufsize) 1779 self.assertEqual(self.data, data) 1780 1781 def _testSelect(self): 1782 self.data = b'select' 1783 self.cli.sendto(self.data, 0, (HOST, self.port)) 1784 1785 1786@unittest.skipUnless(thread, 'Threading required for this test.') 1787class BasicTCPTest(SocketConnectedTest): 1788 1789 def __init__(self, methodName='runTest'): 1790 SocketConnectedTest.__init__(self, methodName=methodName) 1791 1792 def testRecv(self): 1793 # Testing large receive over TCP 1794 msg = self.cli_conn.recv(1024) 1795 self.assertEqual(msg, MSG) 1796 1797 def _testRecv(self): 1798 self.serv_conn.send(MSG) 1799 1800 def testOverFlowRecv(self): 1801 # Testing receive in chunks over TCP 1802 seg1 = self.cli_conn.recv(len(MSG) - 3) 1803 seg2 = self.cli_conn.recv(1024) 1804 msg = seg1 + seg2 1805 self.assertEqual(msg, MSG) 1806 1807 def _testOverFlowRecv(self): 1808 self.serv_conn.send(MSG) 1809 1810 def testRecvFrom(self): 1811 # Testing large recvfrom() over TCP 1812 msg, addr = self.cli_conn.recvfrom(1024) 1813 self.assertEqual(msg, MSG) 1814 1815 def _testRecvFrom(self): 1816 self.serv_conn.send(MSG) 1817 1818 def testOverFlowRecvFrom(self): 1819 # Testing recvfrom() in chunks over TCP 1820 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 1821 seg2, addr = self.cli_conn.recvfrom(1024) 1822 msg = seg1 + seg2 1823 self.assertEqual(msg, MSG) 1824 1825 def _testOverFlowRecvFrom(self): 1826 self.serv_conn.send(MSG) 1827 1828 def testSendAll(self): 1829 # Testing sendall() with a 2048 byte string over TCP 1830 msg = b'' 1831 while 1: 1832 read = self.cli_conn.recv(1024) 1833 if not read: 1834 break 1835 msg += read 1836 self.assertEqual(msg, b'f' * 2048) 1837 1838 def _testSendAll(self): 1839 big_chunk = b'f' * 2048 1840 self.serv_conn.sendall(big_chunk) 1841 1842 def testFromFd(self): 1843 # Testing fromfd() 1844 fd = self.cli_conn.fileno() 1845 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 1846 self.addCleanup(sock.close) 1847 self.assertIsInstance(sock, socket.socket) 1848 msg = sock.recv(1024) 1849 self.assertEqual(msg, MSG) 1850 1851 def _testFromFd(self): 1852 self.serv_conn.send(MSG) 1853 1854 def testDup(self): 1855 # Testing dup() 1856 sock = self.cli_conn.dup() 1857 self.addCleanup(sock.close) 1858 msg = sock.recv(1024) 1859 self.assertEqual(msg, MSG) 1860 1861 def _testDup(self): 1862 self.serv_conn.send(MSG) 1863 1864 def testShutdown(self): 1865 # Testing shutdown() 1866 msg = self.cli_conn.recv(1024) 1867 self.assertEqual(msg, MSG) 1868 # wait for _testShutdown to finish: on OS X, when the server 1869 # closes the connection the client also becomes disconnected, 1870 # and the client's shutdown call will fail. (Issue #4397.) 1871 self.done.wait() 1872 1873 def _testShutdown(self): 1874 self.serv_conn.send(MSG) 1875 self.serv_conn.shutdown(2) 1876 1877 testShutdown_overflow = support.cpython_only(testShutdown) 1878 1879 @support.cpython_only 1880 def _testShutdown_overflow(self): 1881 import _testcapi 1882 self.serv_conn.send(MSG) 1883 # Issue 15989 1884 self.assertRaises(OverflowError, self.serv_conn.shutdown, 1885 _testcapi.INT_MAX + 1) 1886 self.assertRaises(OverflowError, self.serv_conn.shutdown, 1887 2 + (_testcapi.UINT_MAX + 1)) 1888 self.serv_conn.shutdown(2) 1889 1890 def testDetach(self): 1891 # Testing detach() 1892 fileno = self.cli_conn.fileno() 1893 f = self.cli_conn.detach() 1894 self.assertEqual(f, fileno) 1895 # cli_conn cannot be used anymore... 1896 self.assertTrue(self.cli_conn._closed) 1897 self.assertRaises(OSError, self.cli_conn.recv, 1024) 1898 self.cli_conn.close() 1899 # ...but we can create another socket using the (still open) 1900 # file descriptor 1901 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) 1902 self.addCleanup(sock.close) 1903 msg = sock.recv(1024) 1904 self.assertEqual(msg, MSG) 1905 1906 def _testDetach(self): 1907 self.serv_conn.send(MSG) 1908 1909@unittest.skipUnless(thread, 'Threading required for this test.') 1910class BasicUDPTest(ThreadedUDPSocketTest): 1911 1912 def __init__(self, methodName='runTest'): 1913 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 1914 1915 def testSendtoAndRecv(self): 1916 # Testing sendto() and Recv() over UDP 1917 msg = self.serv.recv(len(MSG)) 1918 self.assertEqual(msg, MSG) 1919 1920 def _testSendtoAndRecv(self): 1921 self.cli.sendto(MSG, 0, (HOST, self.port)) 1922 1923 def testRecvFrom(self): 1924 # Testing recvfrom() over UDP 1925 msg, addr = self.serv.recvfrom(len(MSG)) 1926 self.assertEqual(msg, MSG) 1927 1928 def _testRecvFrom(self): 1929 self.cli.sendto(MSG, 0, (HOST, self.port)) 1930 1931 def testRecvFromNegative(self): 1932 # Negative lengths passed to recvfrom should give ValueError. 1933 self.assertRaises(ValueError, self.serv.recvfrom, -1) 1934 1935 def _testRecvFromNegative(self): 1936 self.cli.sendto(MSG, 0, (HOST, self.port)) 1937 1938# Tests for the sendmsg()/recvmsg() interface. Where possible, the 1939# same test code is used with different families and types of socket 1940# (e.g. stream, datagram), and tests using recvmsg() are repeated 1941# using recvmsg_into(). 1942# 1943# The generic test classes such as SendmsgTests and 1944# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be 1945# supplied with sockets cli_sock and serv_sock representing the 1946# client's and the server's end of the connection respectively, and 1947# attributes cli_addr and serv_addr holding their (numeric where 1948# appropriate) addresses. 1949# 1950# The final concrete test classes combine these with subclasses of 1951# SocketTestBase which set up client and server sockets of a specific 1952# type, and with subclasses of SendrecvmsgBase such as 1953# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these 1954# sockets to cli_sock and serv_sock and override the methods and 1955# attributes of SendrecvmsgBase to fill in destination addresses if 1956# needed when sending, check for specific flags in msg_flags, etc. 1957# 1958# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using 1959# recvmsg_into(). 1960 1961# XXX: like the other datagram (UDP) tests in this module, the code 1962# here assumes that datagram delivery on the local machine will be 1963# reliable. 1964 1965class SendrecvmsgBase(ThreadSafeCleanupTestCase): 1966 # Base class for sendmsg()/recvmsg() tests. 1967 1968 # Time in seconds to wait before considering a test failed, or 1969 # None for no timeout. Not all tests actually set a timeout. 1970 fail_timeout = 3.0 1971 1972 def setUp(self): 1973 self.misc_event = threading.Event() 1974 super().setUp() 1975 1976 def sendToServer(self, msg): 1977 # Send msg to the server. 1978 return self.cli_sock.send(msg) 1979 1980 # Tuple of alternative default arguments for sendmsg() when called 1981 # via sendmsgToServer() (e.g. to include a destination address). 1982 sendmsg_to_server_defaults = () 1983 1984 def sendmsgToServer(self, *args): 1985 # Call sendmsg() on self.cli_sock with the given arguments, 1986 # filling in any arguments which are not supplied with the 1987 # corresponding items of self.sendmsg_to_server_defaults, if 1988 # any. 1989 return self.cli_sock.sendmsg( 1990 *(args + self.sendmsg_to_server_defaults[len(args):])) 1991 1992 def doRecvmsg(self, sock, bufsize, *args): 1993 # Call recvmsg() on sock with given arguments and return its 1994 # result. Should be used for tests which can use either 1995 # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides 1996 # this method with one which emulates it using recvmsg_into(), 1997 # thus allowing the same test to be used for both methods. 1998 result = sock.recvmsg(bufsize, *args) 1999 self.registerRecvmsgResult(result) 2000 return result 2001 2002 def registerRecvmsgResult(self, result): 2003 # Called by doRecvmsg() with the return value of recvmsg() or 2004 # recvmsg_into(). Can be overridden to arrange cleanup based 2005 # on the returned ancillary data, for instance. 2006 pass 2007 2008 def checkRecvmsgAddress(self, addr1, addr2): 2009 # Called to compare the received address with the address of 2010 # the peer. 2011 self.assertEqual(addr1, addr2) 2012 2013 # Flags that are normally unset in msg_flags 2014 msg_flags_common_unset = 0 2015 for name in ("MSG_CTRUNC", "MSG_OOB"): 2016 msg_flags_common_unset |= getattr(socket, name, 0) 2017 2018 # Flags that are normally set 2019 msg_flags_common_set = 0 2020 2021 # Flags set when a complete record has been received (e.g. MSG_EOR 2022 # for SCTP) 2023 msg_flags_eor_indicator = 0 2024 2025 # Flags set when a complete record has not been received 2026 # (e.g. MSG_TRUNC for datagram sockets) 2027 msg_flags_non_eor_indicator = 0 2028 2029 def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): 2030 # Method to check the value of msg_flags returned by recvmsg[_into](). 2031 # 2032 # Checks that all bits in msg_flags_common_set attribute are 2033 # set in "flags" and all bits in msg_flags_common_unset are 2034 # unset. 2035 # 2036 # The "eor" argument specifies whether the flags should 2037 # indicate that a full record (or datagram) has been received. 2038 # If "eor" is None, no checks are done; otherwise, checks 2039 # that: 2040 # 2041 # * if "eor" is true, all bits in msg_flags_eor_indicator are 2042 # set and all bits in msg_flags_non_eor_indicator are unset 2043 # 2044 # * if "eor" is false, all bits in msg_flags_non_eor_indicator 2045 # are set and all bits in msg_flags_eor_indicator are unset 2046 # 2047 # If "checkset" and/or "checkunset" are supplied, they require 2048 # the given bits to be set or unset respectively, overriding 2049 # what the attributes require for those bits. 2050 # 2051 # If any bits are set in "ignore", they will not be checked, 2052 # regardless of the other inputs. 2053 # 2054 # Will raise Exception if the inputs require a bit to be both 2055 # set and unset, and it is not ignored. 2056 2057 defaultset = self.msg_flags_common_set 2058 defaultunset = self.msg_flags_common_unset 2059 2060 if eor: 2061 defaultset |= self.msg_flags_eor_indicator 2062 defaultunset |= self.msg_flags_non_eor_indicator 2063 elif eor is not None: 2064 defaultset |= self.msg_flags_non_eor_indicator 2065 defaultunset |= self.msg_flags_eor_indicator 2066 2067 # Function arguments override defaults 2068 defaultset &= ~checkunset 2069 defaultunset &= ~checkset 2070 2071 # Merge arguments with remaining defaults, and check for conflicts 2072 checkset |= defaultset 2073 checkunset |= defaultunset 2074 inboth = checkset & checkunset & ~ignore 2075 if inboth: 2076 raise Exception("contradictory set, unset requirements for flags " 2077 "{0:#x}".format(inboth)) 2078 2079 # Compare with given msg_flags value 2080 mask = (checkset | checkunset) & ~ignore 2081 self.assertEqual(flags & mask, checkset & mask) 2082 2083 2084class RecvmsgIntoMixin(SendrecvmsgBase): 2085 # Mixin to implement doRecvmsg() using recvmsg_into(). 2086 2087 def doRecvmsg(self, sock, bufsize, *args): 2088 buf = bytearray(bufsize) 2089 result = sock.recvmsg_into([buf], *args) 2090 self.registerRecvmsgResult(result) 2091 self.assertGreaterEqual(result[0], 0) 2092 self.assertLessEqual(result[0], bufsize) 2093 return (bytes(buf[:result[0]]),) + result[1:] 2094 2095 2096class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): 2097 # Defines flags to be checked in msg_flags for datagram sockets. 2098 2099 @property 2100 def msg_flags_non_eor_indicator(self): 2101 return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC 2102 2103 2104class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): 2105 # Defines flags to be checked in msg_flags for SCTP sockets. 2106 2107 @property 2108 def msg_flags_eor_indicator(self): 2109 return super().msg_flags_eor_indicator | socket.MSG_EOR 2110 2111 2112class SendrecvmsgConnectionlessBase(SendrecvmsgBase): 2113 # Base class for tests on connectionless-mode sockets. Users must 2114 # supply sockets on attributes cli and serv to be mapped to 2115 # cli_sock and serv_sock respectively. 2116 2117 @property 2118 def serv_sock(self): 2119 return self.serv 2120 2121 @property 2122 def cli_sock(self): 2123 return self.cli 2124 2125 @property 2126 def sendmsg_to_server_defaults(self): 2127 return ([], [], 0, self.serv_addr) 2128 2129 def sendToServer(self, msg): 2130 return self.cli_sock.sendto(msg, self.serv_addr) 2131 2132 2133class SendrecvmsgConnectedBase(SendrecvmsgBase): 2134 # Base class for tests on connected sockets. Users must supply 2135 # sockets on attributes serv_conn and cli_conn (representing the 2136 # connections *to* the server and the client), to be mapped to 2137 # cli_sock and serv_sock respectively. 2138 2139 @property 2140 def serv_sock(self): 2141 return self.cli_conn 2142 2143 @property 2144 def cli_sock(self): 2145 return self.serv_conn 2146 2147 def checkRecvmsgAddress(self, addr1, addr2): 2148 # Address is currently "unspecified" for a connected socket, 2149 # so we don't examine it 2150 pass 2151 2152 2153class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): 2154 # Base class to set a timeout on server's socket. 2155 2156 def setUp(self): 2157 super().setUp() 2158 self.serv_sock.settimeout(self.fail_timeout) 2159 2160 2161class SendmsgTests(SendrecvmsgServerTimeoutBase): 2162 # Tests for sendmsg() which can use any socket type and do not 2163 # involve recvmsg() or recvmsg_into(). 2164 2165 def testSendmsg(self): 2166 # Send a simple message with sendmsg(). 2167 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2168 2169 def _testSendmsg(self): 2170 self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) 2171 2172 def testSendmsgDataGenerator(self): 2173 # Send from buffer obtained from a generator (not a sequence). 2174 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2175 2176 def _testSendmsgDataGenerator(self): 2177 self.assertEqual(self.sendmsgToServer((o for o in [MSG])), 2178 len(MSG)) 2179 2180 def testSendmsgAncillaryGenerator(self): 2181 # Gather (empty) ancillary data from a generator. 2182 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2183 2184 def _testSendmsgAncillaryGenerator(self): 2185 self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), 2186 len(MSG)) 2187 2188 def testSendmsgArray(self): 2189 # Send data from an array instead of the usual bytes object. 2190 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2191 2192 def _testSendmsgArray(self): 2193 self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), 2194 len(MSG)) 2195 2196 def testSendmsgGather(self): 2197 # Send message data from more than one buffer (gather write). 2198 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2199 2200 def _testSendmsgGather(self): 2201 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2202 2203 def testSendmsgBadArgs(self): 2204 # Check that sendmsg() rejects invalid arguments. 2205 self.assertEqual(self.serv_sock.recv(1000), b"done") 2206 2207 def _testSendmsgBadArgs(self): 2208 self.assertRaises(TypeError, self.cli_sock.sendmsg) 2209 self.assertRaises(TypeError, self.sendmsgToServer, 2210 b"not in an iterable") 2211 self.assertRaises(TypeError, self.sendmsgToServer, 2212 object()) 2213 self.assertRaises(TypeError, self.sendmsgToServer, 2214 [object()]) 2215 self.assertRaises(TypeError, self.sendmsgToServer, 2216 [MSG, object()]) 2217 self.assertRaises(TypeError, self.sendmsgToServer, 2218 [MSG], object()) 2219 self.assertRaises(TypeError, self.sendmsgToServer, 2220 [MSG], [], object()) 2221 self.assertRaises(TypeError, self.sendmsgToServer, 2222 [MSG], [], 0, object()) 2223 self.sendToServer(b"done") 2224 2225 def testSendmsgBadCmsg(self): 2226 # Check that invalid ancillary data items are rejected. 2227 self.assertEqual(self.serv_sock.recv(1000), b"done") 2228 2229 def _testSendmsgBadCmsg(self): 2230 self.assertRaises(TypeError, self.sendmsgToServer, 2231 [MSG], [object()]) 2232 self.assertRaises(TypeError, self.sendmsgToServer, 2233 [MSG], [(object(), 0, b"data")]) 2234 self.assertRaises(TypeError, self.sendmsgToServer, 2235 [MSG], [(0, object(), b"data")]) 2236 self.assertRaises(TypeError, self.sendmsgToServer, 2237 [MSG], [(0, 0, object())]) 2238 self.assertRaises(TypeError, self.sendmsgToServer, 2239 [MSG], [(0, 0)]) 2240 self.assertRaises(TypeError, self.sendmsgToServer, 2241 [MSG], [(0, 0, b"data", 42)]) 2242 self.sendToServer(b"done") 2243 2244 @requireAttrs(socket, "CMSG_SPACE") 2245 def testSendmsgBadMultiCmsg(self): 2246 # Check that invalid ancillary data items are rejected when 2247 # more than one item is present. 2248 self.assertEqual(self.serv_sock.recv(1000), b"done") 2249 2250 @testSendmsgBadMultiCmsg.client_skip 2251 def _testSendmsgBadMultiCmsg(self): 2252 self.assertRaises(TypeError, self.sendmsgToServer, 2253 [MSG], [0, 0, b""]) 2254 self.assertRaises(TypeError, self.sendmsgToServer, 2255 [MSG], [(0, 0, b""), object()]) 2256 self.sendToServer(b"done") 2257 2258 def testSendmsgExcessCmsgReject(self): 2259 # Check that sendmsg() rejects excess ancillary data items 2260 # when the number that can be sent is limited. 2261 self.assertEqual(self.serv_sock.recv(1000), b"done") 2262 2263 def _testSendmsgExcessCmsgReject(self): 2264 if not hasattr(socket, "CMSG_SPACE"): 2265 # Can only send one item 2266 with self.assertRaises(OSError) as cm: 2267 self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) 2268 self.assertIsNone(cm.exception.errno) 2269 self.sendToServer(b"done") 2270 2271 def testSendmsgAfterClose(self): 2272 # Check that sendmsg() fails on a closed socket. 2273 pass 2274 2275 def _testSendmsgAfterClose(self): 2276 self.cli_sock.close() 2277 self.assertRaises(OSError, self.sendmsgToServer, [MSG]) 2278 2279 2280class SendmsgStreamTests(SendmsgTests): 2281 # Tests for sendmsg() which require a stream socket and do not 2282 # involve recvmsg() or recvmsg_into(). 2283 2284 def testSendmsgExplicitNoneAddr(self): 2285 # Check that peer address can be specified as None. 2286 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2287 2288 def _testSendmsgExplicitNoneAddr(self): 2289 self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) 2290 2291 def testSendmsgTimeout(self): 2292 # Check that timeout works with sendmsg(). 2293 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2294 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2295 2296 def _testSendmsgTimeout(self): 2297 try: 2298 self.cli_sock.settimeout(0.03) 2299 try: 2300 while True: 2301 self.sendmsgToServer([b"a"*512]) 2302 except socket.timeout: 2303 pass 2304 except OSError as exc: 2305 if exc.errno != errno.ENOMEM: 2306 raise 2307 # bpo-33937 the test randomly fails on Travis CI with 2308 # "OSError: [Errno 12] Cannot allocate memory" 2309 else: 2310 self.fail("socket.timeout not raised") 2311 finally: 2312 self.misc_event.set() 2313 2314 # XXX: would be nice to have more tests for sendmsg flags argument. 2315 2316 # Linux supports MSG_DONTWAIT when sending, but in general, it 2317 # only works when receiving. Could add other platforms if they 2318 # support it too. 2319 @skipWithClientIf(sys.platform not in {"linux"}, 2320 "MSG_DONTWAIT not known to work on this platform when " 2321 "sending") 2322 def testSendmsgDontWait(self): 2323 # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. 2324 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 2325 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2326 2327 @testSendmsgDontWait.client_skip 2328 def _testSendmsgDontWait(self): 2329 try: 2330 with self.assertRaises(OSError) as cm: 2331 while True: 2332 self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) 2333 # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI 2334 # with "OSError: [Errno 12] Cannot allocate memory" 2335 self.assertIn(cm.exception.errno, 2336 (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM)) 2337 finally: 2338 self.misc_event.set() 2339 2340 2341class SendmsgConnectionlessTests(SendmsgTests): 2342 # Tests for sendmsg() which require a connectionless-mode 2343 # (e.g. datagram) socket, and do not involve recvmsg() or 2344 # recvmsg_into(). 2345 2346 def testSendmsgNoDestAddr(self): 2347 # Check that sendmsg() fails when no destination address is 2348 # given for unconnected socket. 2349 pass 2350 2351 def _testSendmsgNoDestAddr(self): 2352 self.assertRaises(OSError, self.cli_sock.sendmsg, 2353 [MSG]) 2354 self.assertRaises(OSError, self.cli_sock.sendmsg, 2355 [MSG], [], 0, None) 2356 2357 2358class RecvmsgGenericTests(SendrecvmsgBase): 2359 # Tests for recvmsg() which can also be emulated using 2360 # recvmsg_into(), and can use any socket type. 2361 2362 def testRecvmsg(self): 2363 # Receive a simple message with recvmsg[_into](). 2364 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2365 self.assertEqual(msg, MSG) 2366 self.checkRecvmsgAddress(addr, self.cli_addr) 2367 self.assertEqual(ancdata, []) 2368 self.checkFlags(flags, eor=True) 2369 2370 def _testRecvmsg(self): 2371 self.sendToServer(MSG) 2372 2373 def testRecvmsgExplicitDefaults(self): 2374 # Test recvmsg[_into]() with default arguments provided explicitly. 2375 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2376 len(MSG), 0, 0) 2377 self.assertEqual(msg, MSG) 2378 self.checkRecvmsgAddress(addr, self.cli_addr) 2379 self.assertEqual(ancdata, []) 2380 self.checkFlags(flags, eor=True) 2381 2382 def _testRecvmsgExplicitDefaults(self): 2383 self.sendToServer(MSG) 2384 2385 def testRecvmsgShorter(self): 2386 # Receive a message smaller than buffer. 2387 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2388 len(MSG) + 42) 2389 self.assertEqual(msg, MSG) 2390 self.checkRecvmsgAddress(addr, self.cli_addr) 2391 self.assertEqual(ancdata, []) 2392 self.checkFlags(flags, eor=True) 2393 2394 def _testRecvmsgShorter(self): 2395 self.sendToServer(MSG) 2396 2397 # FreeBSD < 8 doesn't always set the MSG_TRUNC flag when a truncated 2398 # datagram is received (issue #13001). 2399 @support.requires_freebsd_version(8) 2400 def testRecvmsgTrunc(self): 2401 # Receive part of message, check for truncation indicators. 2402 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2403 len(MSG) - 3) 2404 self.assertEqual(msg, MSG[:-3]) 2405 self.checkRecvmsgAddress(addr, self.cli_addr) 2406 self.assertEqual(ancdata, []) 2407 self.checkFlags(flags, eor=False) 2408 2409 @support.requires_freebsd_version(8) 2410 def _testRecvmsgTrunc(self): 2411 self.sendToServer(MSG) 2412 2413 def testRecvmsgShortAncillaryBuf(self): 2414 # Test ancillary data buffer too small to hold any ancillary data. 2415 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2416 len(MSG), 1) 2417 self.assertEqual(msg, MSG) 2418 self.checkRecvmsgAddress(addr, self.cli_addr) 2419 self.assertEqual(ancdata, []) 2420 self.checkFlags(flags, eor=True) 2421 2422 def _testRecvmsgShortAncillaryBuf(self): 2423 self.sendToServer(MSG) 2424 2425 def testRecvmsgLongAncillaryBuf(self): 2426 # Test large ancillary data buffer. 2427 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2428 len(MSG), 10240) 2429 self.assertEqual(msg, MSG) 2430 self.checkRecvmsgAddress(addr, self.cli_addr) 2431 self.assertEqual(ancdata, []) 2432 self.checkFlags(flags, eor=True) 2433 2434 def _testRecvmsgLongAncillaryBuf(self): 2435 self.sendToServer(MSG) 2436 2437 def testRecvmsgAfterClose(self): 2438 # Check that recvmsg[_into]() fails on a closed socket. 2439 self.serv_sock.close() 2440 self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) 2441 2442 def _testRecvmsgAfterClose(self): 2443 pass 2444 2445 def testRecvmsgTimeout(self): 2446 # Check that timeout works. 2447 try: 2448 self.serv_sock.settimeout(0.03) 2449 self.assertRaises(socket.timeout, 2450 self.doRecvmsg, self.serv_sock, len(MSG)) 2451 finally: 2452 self.misc_event.set() 2453 2454 def _testRecvmsgTimeout(self): 2455 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 2456 2457 @requireAttrs(socket, "MSG_PEEK") 2458 def testRecvmsgPeek(self): 2459 # Check that MSG_PEEK in flags enables examination of pending 2460 # data without consuming it. 2461 2462 # Receive part of data with MSG_PEEK. 2463 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2464 len(MSG) - 3, 0, 2465 socket.MSG_PEEK) 2466 self.assertEqual(msg, MSG[:-3]) 2467 self.checkRecvmsgAddress(addr, self.cli_addr) 2468 self.assertEqual(ancdata, []) 2469 # Ignoring MSG_TRUNC here (so this test is the same for stream 2470 # and datagram sockets). Some wording in POSIX seems to 2471 # suggest that it needn't be set when peeking, but that may 2472 # just be a slip. 2473 self.checkFlags(flags, eor=False, 2474 ignore=getattr(socket, "MSG_TRUNC", 0)) 2475 2476 # Receive all data with MSG_PEEK. 2477 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2478 len(MSG), 0, 2479 socket.MSG_PEEK) 2480 self.assertEqual(msg, MSG) 2481 self.checkRecvmsgAddress(addr, self.cli_addr) 2482 self.assertEqual(ancdata, []) 2483 self.checkFlags(flags, eor=True) 2484 2485 # Check that the same data can still be received normally. 2486 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2487 self.assertEqual(msg, MSG) 2488 self.checkRecvmsgAddress(addr, self.cli_addr) 2489 self.assertEqual(ancdata, []) 2490 self.checkFlags(flags, eor=True) 2491 2492 @testRecvmsgPeek.client_skip 2493 def _testRecvmsgPeek(self): 2494 self.sendToServer(MSG) 2495 2496 @requireAttrs(socket.socket, "sendmsg") 2497 def testRecvmsgFromSendmsg(self): 2498 # Test receiving with recvmsg[_into]() when message is sent 2499 # using sendmsg(). 2500 self.serv_sock.settimeout(self.fail_timeout) 2501 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 2502 self.assertEqual(msg, MSG) 2503 self.checkRecvmsgAddress(addr, self.cli_addr) 2504 self.assertEqual(ancdata, []) 2505 self.checkFlags(flags, eor=True) 2506 2507 @testRecvmsgFromSendmsg.client_skip 2508 def _testRecvmsgFromSendmsg(self): 2509 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2510 2511 2512class RecvmsgGenericStreamTests(RecvmsgGenericTests): 2513 # Tests which require a stream socket and can use either recvmsg() 2514 # or recvmsg_into(). 2515 2516 def testRecvmsgEOF(self): 2517 # Receive end-of-stream indicator (b"", peer socket closed). 2518 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 2519 self.assertEqual(msg, b"") 2520 self.checkRecvmsgAddress(addr, self.cli_addr) 2521 self.assertEqual(ancdata, []) 2522 self.checkFlags(flags, eor=None) # Might not have end-of-record marker 2523 2524 def _testRecvmsgEOF(self): 2525 self.cli_sock.close() 2526 2527 def testRecvmsgOverflow(self): 2528 # Receive a message in more than one chunk. 2529 seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2530 len(MSG) - 3) 2531 self.checkRecvmsgAddress(addr, self.cli_addr) 2532 self.assertEqual(ancdata, []) 2533 self.checkFlags(flags, eor=False) 2534 2535 seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 2536 self.checkRecvmsgAddress(addr, self.cli_addr) 2537 self.assertEqual(ancdata, []) 2538 self.checkFlags(flags, eor=True) 2539 2540 msg = seg1 + seg2 2541 self.assertEqual(msg, MSG) 2542 2543 def _testRecvmsgOverflow(self): 2544 self.sendToServer(MSG) 2545 2546 2547class RecvmsgTests(RecvmsgGenericTests): 2548 # Tests for recvmsg() which can use any socket type. 2549 2550 def testRecvmsgBadArgs(self): 2551 # Check that recvmsg() rejects invalid arguments. 2552 self.assertRaises(TypeError, self.serv_sock.recvmsg) 2553 self.assertRaises(ValueError, self.serv_sock.recvmsg, 2554 -1, 0, 0) 2555 self.assertRaises(ValueError, self.serv_sock.recvmsg, 2556 len(MSG), -1, 0) 2557 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2558 [bytearray(10)], 0, 0) 2559 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2560 object(), 0, 0) 2561 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2562 len(MSG), object(), 0) 2563 self.assertRaises(TypeError, self.serv_sock.recvmsg, 2564 len(MSG), 0, object()) 2565 2566 msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) 2567 self.assertEqual(msg, MSG) 2568 self.checkRecvmsgAddress(addr, self.cli_addr) 2569 self.assertEqual(ancdata, []) 2570 self.checkFlags(flags, eor=True) 2571 2572 def _testRecvmsgBadArgs(self): 2573 self.sendToServer(MSG) 2574 2575 2576class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): 2577 # Tests for recvmsg_into() which can use any socket type. 2578 2579 def testRecvmsgIntoBadArgs(self): 2580 # Check that recvmsg_into() rejects invalid arguments. 2581 buf = bytearray(len(MSG)) 2582 self.assertRaises(TypeError, self.serv_sock.recvmsg_into) 2583 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2584 len(MSG), 0, 0) 2585 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2586 buf, 0, 0) 2587 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2588 [object()], 0, 0) 2589 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2590 [b"I'm not writable"], 0, 0) 2591 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2592 [buf, object()], 0, 0) 2593 self.assertRaises(ValueError, self.serv_sock.recvmsg_into, 2594 [buf], -1, 0) 2595 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2596 [buf], object(), 0) 2597 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 2598 [buf], 0, object()) 2599 2600 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) 2601 self.assertEqual(nbytes, len(MSG)) 2602 self.assertEqual(buf, bytearray(MSG)) 2603 self.checkRecvmsgAddress(addr, self.cli_addr) 2604 self.assertEqual(ancdata, []) 2605 self.checkFlags(flags, eor=True) 2606 2607 def _testRecvmsgIntoBadArgs(self): 2608 self.sendToServer(MSG) 2609 2610 def testRecvmsgIntoGenerator(self): 2611 # Receive into buffer obtained from a generator (not a sequence). 2612 buf = bytearray(len(MSG)) 2613 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 2614 (o for o in [buf])) 2615 self.assertEqual(nbytes, len(MSG)) 2616 self.assertEqual(buf, bytearray(MSG)) 2617 self.checkRecvmsgAddress(addr, self.cli_addr) 2618 self.assertEqual(ancdata, []) 2619 self.checkFlags(flags, eor=True) 2620 2621 def _testRecvmsgIntoGenerator(self): 2622 self.sendToServer(MSG) 2623 2624 def testRecvmsgIntoArray(self): 2625 # Receive into an array rather than the usual bytearray. 2626 buf = array.array("B", [0] * len(MSG)) 2627 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) 2628 self.assertEqual(nbytes, len(MSG)) 2629 self.assertEqual(buf.tobytes(), MSG) 2630 self.checkRecvmsgAddress(addr, self.cli_addr) 2631 self.assertEqual(ancdata, []) 2632 self.checkFlags(flags, eor=True) 2633 2634 def _testRecvmsgIntoArray(self): 2635 self.sendToServer(MSG) 2636 2637 def testRecvmsgIntoScatter(self): 2638 # Receive into multiple buffers (scatter write). 2639 b1 = bytearray(b"----") 2640 b2 = bytearray(b"0123456789") 2641 b3 = bytearray(b"--------------") 2642 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 2643 [b1, memoryview(b2)[2:9], b3]) 2644 self.assertEqual(nbytes, len(b"Mary had a little lamb")) 2645 self.assertEqual(b1, bytearray(b"Mary")) 2646 self.assertEqual(b2, bytearray(b"01 had a 9")) 2647 self.assertEqual(b3, bytearray(b"little lamb---")) 2648 self.checkRecvmsgAddress(addr, self.cli_addr) 2649 self.assertEqual(ancdata, []) 2650 self.checkFlags(flags, eor=True) 2651 2652 def _testRecvmsgIntoScatter(self): 2653 self.sendToServer(b"Mary had a little lamb") 2654 2655 2656class CmsgMacroTests(unittest.TestCase): 2657 # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests 2658 # assumptions used by sendmsg() and recvmsg[_into](), which share 2659 # code with these functions. 2660 2661 # Match the definition in socketmodule.c 2662 try: 2663 import _testcapi 2664 except ImportError: 2665 socklen_t_limit = 0x7fffffff 2666 else: 2667 socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) 2668 2669 @requireAttrs(socket, "CMSG_LEN") 2670 def testCMSG_LEN(self): 2671 # Test CMSG_LEN() with various valid and invalid values, 2672 # checking the assumptions used by recvmsg() and sendmsg(). 2673 toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 2674 values = list(range(257)) + list(range(toobig - 257, toobig)) 2675 2676 # struct cmsghdr has at least three members, two of which are ints 2677 self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) 2678 for n in values: 2679 ret = socket.CMSG_LEN(n) 2680 # This is how recvmsg() calculates the data size 2681 self.assertEqual(ret - socket.CMSG_LEN(0), n) 2682 self.assertLessEqual(ret, self.socklen_t_limit) 2683 2684 self.assertRaises(OverflowError, socket.CMSG_LEN, -1) 2685 # sendmsg() shares code with these functions, and requires 2686 # that it reject values over the limit. 2687 self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) 2688 self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) 2689 2690 @requireAttrs(socket, "CMSG_SPACE") 2691 def testCMSG_SPACE(self): 2692 # Test CMSG_SPACE() with various valid and invalid values, 2693 # checking the assumptions used by sendmsg(). 2694 toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 2695 values = list(range(257)) + list(range(toobig - 257, toobig)) 2696 2697 last = socket.CMSG_SPACE(0) 2698 # struct cmsghdr has at least three members, two of which are ints 2699 self.assertGreater(last, array.array("i").itemsize * 2) 2700 for n in values: 2701 ret = socket.CMSG_SPACE(n) 2702 self.assertGreaterEqual(ret, last) 2703 self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) 2704 self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) 2705 self.assertLessEqual(ret, self.socklen_t_limit) 2706 last = ret 2707 2708 self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) 2709 # sendmsg() shares code with these functions, and requires 2710 # that it reject values over the limit. 2711 self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) 2712 self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) 2713 2714 2715class SCMRightsTest(SendrecvmsgServerTimeoutBase): 2716 # Tests for file descriptor passing on Unix-domain sockets. 2717 2718 # Invalid file descriptor value that's unlikely to evaluate to a 2719 # real FD even if one of its bytes is replaced with a different 2720 # value (which shouldn't actually happen). 2721 badfd = -0x5555 2722 2723 def newFDs(self, n): 2724 # Return a list of n file descriptors for newly-created files 2725 # containing their list indices as ASCII numbers. 2726 fds = [] 2727 for i in range(n): 2728 fd, path = tempfile.mkstemp() 2729 self.addCleanup(os.unlink, path) 2730 self.addCleanup(os.close, fd) 2731 os.write(fd, str(i).encode()) 2732 fds.append(fd) 2733 return fds 2734 2735 def checkFDs(self, fds): 2736 # Check that the file descriptors in the given list contain 2737 # their correct list indices as ASCII numbers. 2738 for n, fd in enumerate(fds): 2739 os.lseek(fd, 0, os.SEEK_SET) 2740 self.assertEqual(os.read(fd, 1024), str(n).encode()) 2741 2742 def registerRecvmsgResult(self, result): 2743 self.addCleanup(self.closeRecvmsgFDs, result) 2744 2745 def closeRecvmsgFDs(self, recvmsg_result): 2746 # Close all file descriptors specified in the ancillary data 2747 # of the given return value from recvmsg() or recvmsg_into(). 2748 for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: 2749 if (cmsg_level == socket.SOL_SOCKET and 2750 cmsg_type == socket.SCM_RIGHTS): 2751 fds = array.array("i") 2752 fds.frombytes(cmsg_data[: 2753 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 2754 for fd in fds: 2755 os.close(fd) 2756 2757 def createAndSendFDs(self, n): 2758 # Send n new file descriptors created by newFDs() to the 2759 # server, with the constant MSG as the non-ancillary data. 2760 self.assertEqual( 2761 self.sendmsgToServer([MSG], 2762 [(socket.SOL_SOCKET, 2763 socket.SCM_RIGHTS, 2764 array.array("i", self.newFDs(n)))]), 2765 len(MSG)) 2766 2767 def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): 2768 # Check that constant MSG was received with numfds file 2769 # descriptors in a maximum of maxcmsgs control messages (which 2770 # must contain only complete integers). By default, check 2771 # that MSG_CTRUNC is unset, but ignore any flags in 2772 # ignoreflags. 2773 msg, ancdata, flags, addr = result 2774 self.assertEqual(msg, MSG) 2775 self.checkRecvmsgAddress(addr, self.cli_addr) 2776 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 2777 ignore=ignoreflags) 2778 2779 self.assertIsInstance(ancdata, list) 2780 self.assertLessEqual(len(ancdata), maxcmsgs) 2781 fds = array.array("i") 2782 for item in ancdata: 2783 self.assertIsInstance(item, tuple) 2784 cmsg_level, cmsg_type, cmsg_data = item 2785 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 2786 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 2787 self.assertIsInstance(cmsg_data, bytes) 2788 self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) 2789 fds.frombytes(cmsg_data) 2790 2791 self.assertEqual(len(fds), numfds) 2792 self.checkFDs(fds) 2793 2794 def testFDPassSimple(self): 2795 # Pass a single FD (array read from bytes object). 2796 self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, 2797 len(MSG), 10240)) 2798 2799 def _testFDPassSimple(self): 2800 self.assertEqual( 2801 self.sendmsgToServer( 2802 [MSG], 2803 [(socket.SOL_SOCKET, 2804 socket.SCM_RIGHTS, 2805 array.array("i", self.newFDs(1)).tobytes())]), 2806 len(MSG)) 2807 2808 def testMultipleFDPass(self): 2809 # Pass multiple FDs in a single array. 2810 self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, 2811 len(MSG), 10240)) 2812 2813 def _testMultipleFDPass(self): 2814 self.createAndSendFDs(4) 2815 2816 @requireAttrs(socket, "CMSG_SPACE") 2817 def testFDPassCMSG_SPACE(self): 2818 # Test using CMSG_SPACE() to calculate ancillary buffer size. 2819 self.checkRecvmsgFDs( 2820 4, self.doRecvmsg(self.serv_sock, len(MSG), 2821 socket.CMSG_SPACE(4 * SIZEOF_INT))) 2822 2823 @testFDPassCMSG_SPACE.client_skip 2824 def _testFDPassCMSG_SPACE(self): 2825 self.createAndSendFDs(4) 2826 2827 def testFDPassCMSG_LEN(self): 2828 # Test using CMSG_LEN() to calculate ancillary buffer size. 2829 self.checkRecvmsgFDs(1, 2830 self.doRecvmsg(self.serv_sock, len(MSG), 2831 socket.CMSG_LEN(4 * SIZEOF_INT)), 2832 # RFC 3542 says implementations may set 2833 # MSG_CTRUNC if there isn't enough space 2834 # for trailing padding. 2835 ignoreflags=socket.MSG_CTRUNC) 2836 2837 def _testFDPassCMSG_LEN(self): 2838 self.createAndSendFDs(1) 2839 2840 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2841 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2842 @requireAttrs(socket, "CMSG_SPACE") 2843 def testFDPassSeparate(self): 2844 # Pass two FDs in two separate arrays. Arrays may be combined 2845 # into a single control message by the OS. 2846 self.checkRecvmsgFDs(2, 2847 self.doRecvmsg(self.serv_sock, len(MSG), 10240), 2848 maxcmsgs=2) 2849 2850 @testFDPassSeparate.client_skip 2851 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2852 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2853 def _testFDPassSeparate(self): 2854 fd0, fd1 = self.newFDs(2) 2855 self.assertEqual( 2856 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 2857 socket.SCM_RIGHTS, 2858 array.array("i", [fd0])), 2859 (socket.SOL_SOCKET, 2860 socket.SCM_RIGHTS, 2861 array.array("i", [fd1]))]), 2862 len(MSG)) 2863 2864 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2865 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2866 @requireAttrs(socket, "CMSG_SPACE") 2867 def testFDPassSeparateMinSpace(self): 2868 # Pass two FDs in two separate arrays, receiving them into the 2869 # minimum space for two arrays. 2870 num_fds = 2 2871 self.checkRecvmsgFDs(num_fds, 2872 self.doRecvmsg(self.serv_sock, len(MSG), 2873 socket.CMSG_SPACE(SIZEOF_INT) + 2874 socket.CMSG_LEN(SIZEOF_INT * num_fds)), 2875 maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) 2876 2877 @testFDPassSeparateMinSpace.client_skip 2878 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 2879 @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397") 2880 def _testFDPassSeparateMinSpace(self): 2881 fd0, fd1 = self.newFDs(2) 2882 self.assertEqual( 2883 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 2884 socket.SCM_RIGHTS, 2885 array.array("i", [fd0])), 2886 (socket.SOL_SOCKET, 2887 socket.SCM_RIGHTS, 2888 array.array("i", [fd1]))]), 2889 len(MSG)) 2890 2891 def sendAncillaryIfPossible(self, msg, ancdata): 2892 # Try to send msg and ancdata to server, but if the system 2893 # call fails, just send msg with no ancillary data. 2894 try: 2895 nbytes = self.sendmsgToServer([msg], ancdata) 2896 except OSError as e: 2897 # Check that it was the system call that failed 2898 self.assertIsInstance(e.errno, int) 2899 nbytes = self.sendmsgToServer([msg]) 2900 self.assertEqual(nbytes, len(msg)) 2901 2902 @unittest.skipIf(sys.platform == "darwin", "see issue #24725") 2903 def testFDPassEmpty(self): 2904 # Try to pass an empty FD array. Can receive either no array 2905 # or an empty array. 2906 self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, 2907 len(MSG), 10240), 2908 ignoreflags=socket.MSG_CTRUNC) 2909 2910 def _testFDPassEmpty(self): 2911 self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, 2912 socket.SCM_RIGHTS, 2913 b"")]) 2914 2915 def testFDPassPartialInt(self): 2916 # Try to pass a truncated FD array. 2917 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2918 len(MSG), 10240) 2919 self.assertEqual(msg, MSG) 2920 self.checkRecvmsgAddress(addr, self.cli_addr) 2921 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 2922 self.assertLessEqual(len(ancdata), 1) 2923 for cmsg_level, cmsg_type, cmsg_data in ancdata: 2924 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 2925 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 2926 self.assertLess(len(cmsg_data), SIZEOF_INT) 2927 2928 def _testFDPassPartialInt(self): 2929 self.sendAncillaryIfPossible( 2930 MSG, 2931 [(socket.SOL_SOCKET, 2932 socket.SCM_RIGHTS, 2933 array.array("i", [self.badfd]).tobytes()[:-1])]) 2934 2935 @requireAttrs(socket, "CMSG_SPACE") 2936 def testFDPassPartialIntInMiddle(self): 2937 # Try to pass two FD arrays, the first of which is truncated. 2938 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 2939 len(MSG), 10240) 2940 self.assertEqual(msg, MSG) 2941 self.checkRecvmsgAddress(addr, self.cli_addr) 2942 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 2943 self.assertLessEqual(len(ancdata), 2) 2944 fds = array.array("i") 2945 # Arrays may have been combined in a single control message 2946 for cmsg_level, cmsg_type, cmsg_data in ancdata: 2947 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 2948 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 2949 fds.frombytes(cmsg_data[: 2950 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 2951 self.assertLessEqual(len(fds), 2) 2952 self.checkFDs(fds) 2953 2954 @testFDPassPartialIntInMiddle.client_skip 2955 def _testFDPassPartialIntInMiddle(self): 2956 fd0, fd1 = self.newFDs(2) 2957 self.sendAncillaryIfPossible( 2958 MSG, 2959 [(socket.SOL_SOCKET, 2960 socket.SCM_RIGHTS, 2961 array.array("i", [fd0, self.badfd]).tobytes()[:-1]), 2962 (socket.SOL_SOCKET, 2963 socket.SCM_RIGHTS, 2964 array.array("i", [fd1]))]) 2965 2966 def checkTruncatedHeader(self, result, ignoreflags=0): 2967 # Check that no ancillary data items are returned when data is 2968 # truncated inside the cmsghdr structure. 2969 msg, ancdata, flags, addr = result 2970 self.assertEqual(msg, MSG) 2971 self.checkRecvmsgAddress(addr, self.cli_addr) 2972 self.assertEqual(ancdata, []) 2973 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 2974 ignore=ignoreflags) 2975 2976 def testCmsgTruncNoBufSize(self): 2977 # Check that no ancillary data is received when no buffer size 2978 # is specified. 2979 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), 2980 # BSD seems to set MSG_CTRUNC only 2981 # if an item has been partially 2982 # received. 2983 ignoreflags=socket.MSG_CTRUNC) 2984 2985 def _testCmsgTruncNoBufSize(self): 2986 self.createAndSendFDs(1) 2987 2988 def testCmsgTrunc0(self): 2989 # Check that no ancillary data is received when buffer size is 0. 2990 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), 2991 ignoreflags=socket.MSG_CTRUNC) 2992 2993 def _testCmsgTrunc0(self): 2994 self.createAndSendFDs(1) 2995 2996 # Check that no ancillary data is returned for various non-zero 2997 # (but still too small) buffer sizes. 2998 2999 def testCmsgTrunc1(self): 3000 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) 3001 3002 def _testCmsgTrunc1(self): 3003 self.createAndSendFDs(1) 3004 3005 def testCmsgTrunc2Int(self): 3006 # The cmsghdr structure has at least three members, two of 3007 # which are ints, so we still shouldn't see any ancillary 3008 # data. 3009 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3010 SIZEOF_INT * 2)) 3011 3012 def _testCmsgTrunc2Int(self): 3013 self.createAndSendFDs(1) 3014 3015 def testCmsgTruncLen0Minus1(self): 3016 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3017 socket.CMSG_LEN(0) - 1)) 3018 3019 def _testCmsgTruncLen0Minus1(self): 3020 self.createAndSendFDs(1) 3021 3022 # The following tests try to truncate the control message in the 3023 # middle of the FD array. 3024 3025 def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): 3026 # Check that file descriptor data is truncated to between 3027 # mindata and maxdata bytes when received with buffer size 3028 # ancbuf, and that any complete file descriptor numbers are 3029 # valid. 3030 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3031 len(MSG), ancbuf) 3032 self.assertEqual(msg, MSG) 3033 self.checkRecvmsgAddress(addr, self.cli_addr) 3034 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3035 3036 if mindata == 0 and ancdata == []: 3037 return 3038 self.assertEqual(len(ancdata), 1) 3039 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3040 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3041 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3042 self.assertGreaterEqual(len(cmsg_data), mindata) 3043 self.assertLessEqual(len(cmsg_data), maxdata) 3044 fds = array.array("i") 3045 fds.frombytes(cmsg_data[: 3046 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3047 self.checkFDs(fds) 3048 3049 def testCmsgTruncLen0(self): 3050 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) 3051 3052 def _testCmsgTruncLen0(self): 3053 self.createAndSendFDs(1) 3054 3055 def testCmsgTruncLen0Plus1(self): 3056 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) 3057 3058 def _testCmsgTruncLen0Plus1(self): 3059 self.createAndSendFDs(2) 3060 3061 def testCmsgTruncLen1(self): 3062 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), 3063 maxdata=SIZEOF_INT) 3064 3065 def _testCmsgTruncLen1(self): 3066 self.createAndSendFDs(2) 3067 3068 def testCmsgTruncLen2Minus1(self): 3069 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, 3070 maxdata=(2 * SIZEOF_INT) - 1) 3071 3072 def _testCmsgTruncLen2Minus1(self): 3073 self.createAndSendFDs(2) 3074 3075 3076class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): 3077 # Test sendmsg() and recvmsg[_into]() using the ancillary data 3078 # features of the RFC 3542 Advanced Sockets API for IPv6. 3079 # Currently we can only handle certain data items (e.g. traffic 3080 # class, hop limit, MTU discovery and fragmentation settings) 3081 # without resorting to unportable means such as the struct module, 3082 # but the tests here are aimed at testing the ancillary data 3083 # handling in sendmsg() and recvmsg() rather than the IPv6 API 3084 # itself. 3085 3086 # Test value to use when setting hop limit of packet 3087 hop_limit = 2 3088 3089 # Test value to use when setting traffic class of packet. 3090 # -1 means "use kernel default". 3091 traffic_class = -1 3092 3093 def ancillaryMapping(self, ancdata): 3094 # Given ancillary data list ancdata, return a mapping from 3095 # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. 3096 # Check that no (level, type) pair appears more than once. 3097 d = {} 3098 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3099 self.assertNotIn((cmsg_level, cmsg_type), d) 3100 d[(cmsg_level, cmsg_type)] = cmsg_data 3101 return d 3102 3103 def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): 3104 # Receive hop limit into ancbufsize bytes of ancillary data 3105 # space. Check that data is MSG, ancillary data is not 3106 # truncated (but ignore any flags in ignoreflags), and hop 3107 # limit is between 0 and maxhop inclusive. 3108 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3109 socket.IPV6_RECVHOPLIMIT, 1) 3110 self.misc_event.set() 3111 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3112 len(MSG), ancbufsize) 3113 3114 self.assertEqual(msg, MSG) 3115 self.checkRecvmsgAddress(addr, self.cli_addr) 3116 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3117 ignore=ignoreflags) 3118 3119 self.assertEqual(len(ancdata), 1) 3120 self.assertIsInstance(ancdata[0], tuple) 3121 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3122 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3123 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3124 self.assertIsInstance(cmsg_data, bytes) 3125 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3126 a = array.array("i") 3127 a.frombytes(cmsg_data) 3128 self.assertGreaterEqual(a[0], 0) 3129 self.assertLessEqual(a[0], maxhop) 3130 3131 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3132 def testRecvHopLimit(self): 3133 # Test receiving the packet hop limit as ancillary data. 3134 self.checkHopLimit(ancbufsize=10240) 3135 3136 @testRecvHopLimit.client_skip 3137 def _testRecvHopLimit(self): 3138 # Need to wait until server has asked to receive ancillary 3139 # data, as implementations are not required to buffer it 3140 # otherwise. 3141 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3142 self.sendToServer(MSG) 3143 3144 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3145 def testRecvHopLimitCMSG_SPACE(self): 3146 # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. 3147 self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) 3148 3149 @testRecvHopLimitCMSG_SPACE.client_skip 3150 def _testRecvHopLimitCMSG_SPACE(self): 3151 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3152 self.sendToServer(MSG) 3153 3154 # Could test receiving into buffer sized using CMSG_LEN, but RFC 3155 # 3542 says portable applications must provide space for trailing 3156 # padding. Implementations may set MSG_CTRUNC if there isn't 3157 # enough space for the padding. 3158 3159 @requireAttrs(socket.socket, "sendmsg") 3160 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3161 def testSetHopLimit(self): 3162 # Test setting hop limit on outgoing packet and receiving it 3163 # at the other end. 3164 self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) 3165 3166 @testSetHopLimit.client_skip 3167 def _testSetHopLimit(self): 3168 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3169 self.assertEqual( 3170 self.sendmsgToServer([MSG], 3171 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3172 array.array("i", [self.hop_limit]))]), 3173 len(MSG)) 3174 3175 def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, 3176 ignoreflags=0): 3177 # Receive traffic class and hop limit into ancbufsize bytes of 3178 # ancillary data space. Check that data is MSG, ancillary 3179 # data is not truncated (but ignore any flags in ignoreflags), 3180 # and traffic class and hop limit are in range (hop limit no 3181 # more than maxhop). 3182 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3183 socket.IPV6_RECVHOPLIMIT, 1) 3184 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3185 socket.IPV6_RECVTCLASS, 1) 3186 self.misc_event.set() 3187 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3188 len(MSG), ancbufsize) 3189 3190 self.assertEqual(msg, MSG) 3191 self.checkRecvmsgAddress(addr, self.cli_addr) 3192 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3193 ignore=ignoreflags) 3194 self.assertEqual(len(ancdata), 2) 3195 ancmap = self.ancillaryMapping(ancdata) 3196 3197 tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] 3198 self.assertEqual(len(tcdata), SIZEOF_INT) 3199 a = array.array("i") 3200 a.frombytes(tcdata) 3201 self.assertGreaterEqual(a[0], 0) 3202 self.assertLessEqual(a[0], 255) 3203 3204 hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] 3205 self.assertEqual(len(hldata), SIZEOF_INT) 3206 a = array.array("i") 3207 a.frombytes(hldata) 3208 self.assertGreaterEqual(a[0], 0) 3209 self.assertLessEqual(a[0], maxhop) 3210 3211 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3212 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3213 def testRecvTrafficClassAndHopLimit(self): 3214 # Test receiving traffic class and hop limit as ancillary data. 3215 self.checkTrafficClassAndHopLimit(ancbufsize=10240) 3216 3217 @testRecvTrafficClassAndHopLimit.client_skip 3218 def _testRecvTrafficClassAndHopLimit(self): 3219 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3220 self.sendToServer(MSG) 3221 3222 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3223 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3224 def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3225 # Test receiving traffic class and hop limit, using 3226 # CMSG_SPACE() to calculate buffer size. 3227 self.checkTrafficClassAndHopLimit( 3228 ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) 3229 3230 @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip 3231 def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3232 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3233 self.sendToServer(MSG) 3234 3235 @requireAttrs(socket.socket, "sendmsg") 3236 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3237 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3238 def testSetTrafficClassAndHopLimit(self): 3239 # Test setting traffic class and hop limit on outgoing packet, 3240 # and receiving them at the other end. 3241 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3242 maxhop=self.hop_limit) 3243 3244 @testSetTrafficClassAndHopLimit.client_skip 3245 def _testSetTrafficClassAndHopLimit(self): 3246 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3247 self.assertEqual( 3248 self.sendmsgToServer([MSG], 3249 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3250 array.array("i", [self.traffic_class])), 3251 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3252 array.array("i", [self.hop_limit]))]), 3253 len(MSG)) 3254 3255 @requireAttrs(socket.socket, "sendmsg") 3256 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3257 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3258 def testOddCmsgSize(self): 3259 # Try to send ancillary data with first item one byte too 3260 # long. Fall back to sending with correct size if this fails, 3261 # and check that second item was handled correctly. 3262 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3263 maxhop=self.hop_limit) 3264 3265 @testOddCmsgSize.client_skip 3266 def _testOddCmsgSize(self): 3267 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3268 try: 3269 nbytes = self.sendmsgToServer( 3270 [MSG], 3271 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3272 array.array("i", [self.traffic_class]).tobytes() + b"\x00"), 3273 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3274 array.array("i", [self.hop_limit]))]) 3275 except OSError as e: 3276 self.assertIsInstance(e.errno, int) 3277 nbytes = self.sendmsgToServer( 3278 [MSG], 3279 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3280 array.array("i", [self.traffic_class])), 3281 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3282 array.array("i", [self.hop_limit]))]) 3283 self.assertEqual(nbytes, len(MSG)) 3284 3285 # Tests for proper handling of truncated ancillary data 3286 3287 def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): 3288 # Receive hop limit into ancbufsize bytes of ancillary data 3289 # space, which should be too small to contain the ancillary 3290 # data header (if ancbufsize is None, pass no second argument 3291 # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set 3292 # (unless included in ignoreflags), and no ancillary data is 3293 # returned. 3294 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3295 socket.IPV6_RECVHOPLIMIT, 1) 3296 self.misc_event.set() 3297 args = () if ancbufsize is None else (ancbufsize,) 3298 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3299 len(MSG), *args) 3300 3301 self.assertEqual(msg, MSG) 3302 self.checkRecvmsgAddress(addr, self.cli_addr) 3303 self.assertEqual(ancdata, []) 3304 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3305 ignore=ignoreflags) 3306 3307 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3308 def testCmsgTruncNoBufSize(self): 3309 # Check that no ancillary data is received when no ancillary 3310 # buffer size is provided. 3311 self.checkHopLimitTruncatedHeader(ancbufsize=None, 3312 # BSD seems to set 3313 # MSG_CTRUNC only if an item 3314 # has been partially 3315 # received. 3316 ignoreflags=socket.MSG_CTRUNC) 3317 3318 @testCmsgTruncNoBufSize.client_skip 3319 def _testCmsgTruncNoBufSize(self): 3320 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3321 self.sendToServer(MSG) 3322 3323 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3324 def testSingleCmsgTrunc0(self): 3325 # Check that no ancillary data is received when ancillary 3326 # buffer size is zero. 3327 self.checkHopLimitTruncatedHeader(ancbufsize=0, 3328 ignoreflags=socket.MSG_CTRUNC) 3329 3330 @testSingleCmsgTrunc0.client_skip 3331 def _testSingleCmsgTrunc0(self): 3332 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3333 self.sendToServer(MSG) 3334 3335 # Check that no ancillary data is returned for various non-zero 3336 # (but still too small) buffer sizes. 3337 3338 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3339 def testSingleCmsgTrunc1(self): 3340 self.checkHopLimitTruncatedHeader(ancbufsize=1) 3341 3342 @testSingleCmsgTrunc1.client_skip 3343 def _testSingleCmsgTrunc1(self): 3344 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3345 self.sendToServer(MSG) 3346 3347 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3348 def testSingleCmsgTrunc2Int(self): 3349 self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) 3350 3351 @testSingleCmsgTrunc2Int.client_skip 3352 def _testSingleCmsgTrunc2Int(self): 3353 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3354 self.sendToServer(MSG) 3355 3356 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3357 def testSingleCmsgTruncLen0Minus1(self): 3358 self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) 3359 3360 @testSingleCmsgTruncLen0Minus1.client_skip 3361 def _testSingleCmsgTruncLen0Minus1(self): 3362 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3363 self.sendToServer(MSG) 3364 3365 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3366 def testSingleCmsgTruncInData(self): 3367 # Test truncation of a control message inside its associated 3368 # data. The message may be returned with its data truncated, 3369 # or not returned at all. 3370 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3371 socket.IPV6_RECVHOPLIMIT, 1) 3372 self.misc_event.set() 3373 msg, ancdata, flags, addr = self.doRecvmsg( 3374 self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) 3375 3376 self.assertEqual(msg, MSG) 3377 self.checkRecvmsgAddress(addr, self.cli_addr) 3378 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3379 3380 self.assertLessEqual(len(ancdata), 1) 3381 if ancdata: 3382 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3383 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3384 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3385 self.assertLess(len(cmsg_data), SIZEOF_INT) 3386 3387 @testSingleCmsgTruncInData.client_skip 3388 def _testSingleCmsgTruncInData(self): 3389 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3390 self.sendToServer(MSG) 3391 3392 def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): 3393 # Receive traffic class and hop limit into ancbufsize bytes of 3394 # ancillary data space, which should be large enough to 3395 # contain the first item, but too small to contain the header 3396 # of the second. Check that data is MSG, MSG_CTRUNC is set 3397 # (unless included in ignoreflags), and only one ancillary 3398 # data item is returned. 3399 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3400 socket.IPV6_RECVHOPLIMIT, 1) 3401 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3402 socket.IPV6_RECVTCLASS, 1) 3403 self.misc_event.set() 3404 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3405 len(MSG), ancbufsize) 3406 3407 self.assertEqual(msg, MSG) 3408 self.checkRecvmsgAddress(addr, self.cli_addr) 3409 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3410 ignore=ignoreflags) 3411 3412 self.assertEqual(len(ancdata), 1) 3413 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3414 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3415 self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) 3416 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3417 a = array.array("i") 3418 a.frombytes(cmsg_data) 3419 self.assertGreaterEqual(a[0], 0) 3420 self.assertLessEqual(a[0], 255) 3421 3422 # Try the above test with various buffer sizes. 3423 3424 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3425 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3426 def testSecondCmsgTrunc0(self): 3427 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), 3428 ignoreflags=socket.MSG_CTRUNC) 3429 3430 @testSecondCmsgTrunc0.client_skip 3431 def _testSecondCmsgTrunc0(self): 3432 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3433 self.sendToServer(MSG) 3434 3435 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3436 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3437 def testSecondCmsgTrunc1(self): 3438 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) 3439 3440 @testSecondCmsgTrunc1.client_skip 3441 def _testSecondCmsgTrunc1(self): 3442 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3443 self.sendToServer(MSG) 3444 3445 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3446 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3447 def testSecondCmsgTrunc2Int(self): 3448 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 3449 2 * SIZEOF_INT) 3450 3451 @testSecondCmsgTrunc2Int.client_skip 3452 def _testSecondCmsgTrunc2Int(self): 3453 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3454 self.sendToServer(MSG) 3455 3456 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3457 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3458 def testSecondCmsgTruncLen0Minus1(self): 3459 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 3460 socket.CMSG_LEN(0) - 1) 3461 3462 @testSecondCmsgTruncLen0Minus1.client_skip 3463 def _testSecondCmsgTruncLen0Minus1(self): 3464 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3465 self.sendToServer(MSG) 3466 3467 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3468 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3469 def testSecomdCmsgTruncInData(self): 3470 # Test truncation of the second of two control messages inside 3471 # its associated data. 3472 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3473 socket.IPV6_RECVHOPLIMIT, 1) 3474 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3475 socket.IPV6_RECVTCLASS, 1) 3476 self.misc_event.set() 3477 msg, ancdata, flags, addr = self.doRecvmsg( 3478 self.serv_sock, len(MSG), 3479 socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) 3480 3481 self.assertEqual(msg, MSG) 3482 self.checkRecvmsgAddress(addr, self.cli_addr) 3483 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3484 3485 cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} 3486 3487 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 3488 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3489 cmsg_types.remove(cmsg_type) 3490 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3491 a = array.array("i") 3492 a.frombytes(cmsg_data) 3493 self.assertGreaterEqual(a[0], 0) 3494 self.assertLessEqual(a[0], 255) 3495 3496 if ancdata: 3497 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 3498 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3499 cmsg_types.remove(cmsg_type) 3500 self.assertLess(len(cmsg_data), SIZEOF_INT) 3501 3502 self.assertEqual(ancdata, []) 3503 3504 @testSecomdCmsgTruncInData.client_skip 3505 def _testSecomdCmsgTruncInData(self): 3506 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3507 self.sendToServer(MSG) 3508 3509 3510# Derive concrete test classes for different socket types. 3511 3512class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, 3513 SendrecvmsgConnectionlessBase, 3514 ThreadedSocketTestMixin, UDPTestBase): 3515 pass 3516 3517@requireAttrs(socket.socket, "sendmsg") 3518@unittest.skipUnless(thread, 'Threading required for this test.') 3519class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): 3520 pass 3521 3522@requireAttrs(socket.socket, "recvmsg") 3523@unittest.skipUnless(thread, 'Threading required for this test.') 3524class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): 3525 pass 3526 3527@requireAttrs(socket.socket, "recvmsg_into") 3528@unittest.skipUnless(thread, 'Threading required for this test.') 3529class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): 3530 pass 3531 3532 3533class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, 3534 SendrecvmsgConnectionlessBase, 3535 ThreadedSocketTestMixin, UDP6TestBase): 3536 3537 def checkRecvmsgAddress(self, addr1, addr2): 3538 # Called to compare the received address with the address of 3539 # the peer, ignoring scope ID 3540 self.assertEqual(addr1[:-1], addr2[:-1]) 3541 3542@requireAttrs(socket.socket, "sendmsg") 3543@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3544@requireSocket("AF_INET6", "SOCK_DGRAM") 3545@unittest.skipUnless(thread, 'Threading required for this test.') 3546class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): 3547 pass 3548 3549@requireAttrs(socket.socket, "recvmsg") 3550@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3551@requireSocket("AF_INET6", "SOCK_DGRAM") 3552@unittest.skipUnless(thread, 'Threading required for this test.') 3553class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): 3554 pass 3555 3556@requireAttrs(socket.socket, "recvmsg_into") 3557@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3558@requireSocket("AF_INET6", "SOCK_DGRAM") 3559@unittest.skipUnless(thread, 'Threading required for this test.') 3560class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): 3561 pass 3562 3563@requireAttrs(socket.socket, "recvmsg") 3564@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3565@requireAttrs(socket, "IPPROTO_IPV6") 3566@requireSocket("AF_INET6", "SOCK_DGRAM") 3567@unittest.skipUnless(thread, 'Threading required for this test.') 3568class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, 3569 SendrecvmsgUDP6TestBase): 3570 pass 3571 3572@requireAttrs(socket.socket, "recvmsg_into") 3573@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') 3574@requireAttrs(socket, "IPPROTO_IPV6") 3575@requireSocket("AF_INET6", "SOCK_DGRAM") 3576@unittest.skipUnless(thread, 'Threading required for this test.') 3577class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, 3578 RFC3542AncillaryTest, 3579 SendrecvmsgUDP6TestBase): 3580 pass 3581 3582 3583class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, 3584 ConnectedStreamTestMixin, TCPTestBase): 3585 pass 3586 3587@requireAttrs(socket.socket, "sendmsg") 3588@unittest.skipUnless(thread, 'Threading required for this test.') 3589class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): 3590 pass 3591 3592@requireAttrs(socket.socket, "recvmsg") 3593@unittest.skipUnless(thread, 'Threading required for this test.') 3594class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, 3595 SendrecvmsgTCPTestBase): 3596 pass 3597 3598@requireAttrs(socket.socket, "recvmsg_into") 3599@unittest.skipUnless(thread, 'Threading required for this test.') 3600class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 3601 SendrecvmsgTCPTestBase): 3602 pass 3603 3604 3605class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, 3606 SendrecvmsgConnectedBase, 3607 ConnectedStreamTestMixin, SCTPStreamBase): 3608 pass 3609 3610@requireAttrs(socket.socket, "sendmsg") 3611@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 3612@unittest.skipUnless(thread, 'Threading required for this test.') 3613class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): 3614 pass 3615 3616@requireAttrs(socket.socket, "recvmsg") 3617@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 3618@unittest.skipUnless(thread, 'Threading required for this test.') 3619class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 3620 SendrecvmsgSCTPStreamTestBase): 3621 3622 def testRecvmsgEOF(self): 3623 try: 3624 super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() 3625 except OSError as e: 3626 if e.errno != errno.ENOTCONN: 3627 raise 3628 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 3629 3630@requireAttrs(socket.socket, "recvmsg_into") 3631@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 3632@unittest.skipUnless(thread, 'Threading required for this test.') 3633class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 3634 SendrecvmsgSCTPStreamTestBase): 3635 3636 def testRecvmsgEOF(self): 3637 try: 3638 super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() 3639 except OSError as e: 3640 if e.errno != errno.ENOTCONN: 3641 raise 3642 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 3643 3644 3645class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, 3646 ConnectedStreamTestMixin, UnixStreamBase): 3647 pass 3648 3649@requireAttrs(socket.socket, "sendmsg") 3650@requireAttrs(socket, "AF_UNIX") 3651@unittest.skipUnless(thread, 'Threading required for this test.') 3652class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): 3653 pass 3654 3655@requireAttrs(socket.socket, "recvmsg") 3656@requireAttrs(socket, "AF_UNIX") 3657@unittest.skipUnless(thread, 'Threading required for this test.') 3658class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 3659 SendrecvmsgUnixStreamTestBase): 3660 pass 3661 3662@requireAttrs(socket.socket, "recvmsg_into") 3663@requireAttrs(socket, "AF_UNIX") 3664@unittest.skipUnless(thread, 'Threading required for this test.') 3665class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 3666 SendrecvmsgUnixStreamTestBase): 3667 pass 3668 3669@requireAttrs(socket.socket, "sendmsg", "recvmsg") 3670@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 3671@unittest.skipUnless(thread, 'Threading required for this test.') 3672class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): 3673 pass 3674 3675@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") 3676@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 3677@unittest.skipUnless(thread, 'Threading required for this test.') 3678class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, 3679 SendrecvmsgUnixStreamTestBase): 3680 pass 3681 3682 3683# Test interrupting the interruptible send/receive methods with a 3684# signal when a timeout is set. These tests avoid having multiple 3685# threads alive during the test so that the OS cannot deliver the 3686# signal to the wrong one. 3687 3688class InterruptedTimeoutBase(unittest.TestCase): 3689 # Base class for interrupted send/receive tests. Installs an 3690 # empty handler for SIGALRM and removes it on teardown, along with 3691 # any scheduled alarms. 3692 3693 def setUp(self): 3694 super().setUp() 3695 orig_alrm_handler = signal.signal(signal.SIGALRM, 3696 lambda signum, frame: 1 / 0) 3697 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) 3698 3699 # Timeout for socket operations 3700 timeout = 4.0 3701 3702 # Provide setAlarm() method to schedule delivery of SIGALRM after 3703 # given number of seconds, or cancel it if zero, and an 3704 # appropriate time value to use. Use setitimer() if available. 3705 if hasattr(signal, "setitimer"): 3706 alarm_time = 0.05 3707 3708 def setAlarm(self, seconds): 3709 signal.setitimer(signal.ITIMER_REAL, seconds) 3710 else: 3711 # Old systems may deliver the alarm up to one second early 3712 alarm_time = 2 3713 3714 def setAlarm(self, seconds): 3715 signal.alarm(seconds) 3716 3717 3718# Require siginterrupt() in order to ensure that system calls are 3719# interrupted by default. 3720@requireAttrs(signal, "siginterrupt") 3721@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 3722 "Don't have signal.alarm or signal.setitimer") 3723class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): 3724 # Test interrupting the recv*() methods with signals when a 3725 # timeout is set. 3726 3727 def setUp(self): 3728 super().setUp() 3729 self.serv.settimeout(self.timeout) 3730 3731 def checkInterruptedRecv(self, func, *args, **kwargs): 3732 # Check that func(*args, **kwargs) raises 3733 # errno of EINTR when interrupted by a signal. 3734 try: 3735 self.setAlarm(self.alarm_time) 3736 with self.assertRaises(ZeroDivisionError) as cm: 3737 func(*args, **kwargs) 3738 finally: 3739 self.setAlarm(0) 3740 3741 def testInterruptedRecvTimeout(self): 3742 self.checkInterruptedRecv(self.serv.recv, 1024) 3743 3744 def testInterruptedRecvIntoTimeout(self): 3745 self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) 3746 3747 def testInterruptedRecvfromTimeout(self): 3748 self.checkInterruptedRecv(self.serv.recvfrom, 1024) 3749 3750 def testInterruptedRecvfromIntoTimeout(self): 3751 self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) 3752 3753 @requireAttrs(socket.socket, "recvmsg") 3754 def testInterruptedRecvmsgTimeout(self): 3755 self.checkInterruptedRecv(self.serv.recvmsg, 1024) 3756 3757 @requireAttrs(socket.socket, "recvmsg_into") 3758 def testInterruptedRecvmsgIntoTimeout(self): 3759 self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) 3760 3761 3762# Require siginterrupt() in order to ensure that system calls are 3763# interrupted by default. 3764@requireAttrs(signal, "siginterrupt") 3765@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 3766 "Don't have signal.alarm or signal.setitimer") 3767@unittest.skipUnless(thread, 'Threading required for this test.') 3768class InterruptedSendTimeoutTest(InterruptedTimeoutBase, 3769 ThreadSafeCleanupTestCase, 3770 SocketListeningTestMixin, TCPTestBase): 3771 # Test interrupting the interruptible send*() methods with signals 3772 # when a timeout is set. 3773 3774 def setUp(self): 3775 super().setUp() 3776 self.serv_conn = self.newSocket() 3777 self.addCleanup(self.serv_conn.close) 3778 # Use a thread to complete the connection, but wait for it to 3779 # terminate before running the test, so that there is only one 3780 # thread to accept the signal. 3781 cli_thread = threading.Thread(target=self.doConnect) 3782 cli_thread.start() 3783 self.cli_conn, addr = self.serv.accept() 3784 self.addCleanup(self.cli_conn.close) 3785 cli_thread.join() 3786 self.serv_conn.settimeout(self.timeout) 3787 3788 def doConnect(self): 3789 self.serv_conn.connect(self.serv_addr) 3790 3791 def checkInterruptedSend(self, func, *args, **kwargs): 3792 # Check that func(*args, **kwargs), run in a loop, raises 3793 # OSError with an errno of EINTR when interrupted by a 3794 # signal. 3795 try: 3796 with self.assertRaises(ZeroDivisionError) as cm: 3797 while True: 3798 self.setAlarm(self.alarm_time) 3799 func(*args, **kwargs) 3800 finally: 3801 self.setAlarm(0) 3802 3803 # Issue #12958: The following tests have problems on OS X prior to 10.7 3804 @support.requires_mac_ver(10, 7) 3805 def testInterruptedSendTimeout(self): 3806 self.checkInterruptedSend(self.serv_conn.send, b"a"*512) 3807 3808 @support.requires_mac_ver(10, 7) 3809 def testInterruptedSendtoTimeout(self): 3810 # Passing an actual address here as Python's wrapper for 3811 # sendto() doesn't allow passing a zero-length one; POSIX 3812 # requires that the address is ignored since the socket is 3813 # connection-mode, however. 3814 self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, 3815 self.serv_addr) 3816 3817 @support.requires_mac_ver(10, 7) 3818 @requireAttrs(socket.socket, "sendmsg") 3819 def testInterruptedSendmsgTimeout(self): 3820 self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) 3821 3822 3823@unittest.skipUnless(thread, 'Threading required for this test.') 3824class TCPCloserTest(ThreadedTCPSocketTest): 3825 3826 def testClose(self): 3827 conn, addr = self.serv.accept() 3828 conn.close() 3829 3830 sd = self.cli 3831 read, write, err = select.select([sd], [], [], 1.0) 3832 self.assertEqual(read, [sd]) 3833 self.assertEqual(sd.recv(1), b'') 3834 3835 # Calling close() many times should be safe. 3836 conn.close() 3837 conn.close() 3838 3839 def _testClose(self): 3840 self.cli.connect((HOST, self.port)) 3841 time.sleep(1.0) 3842 3843@unittest.skipUnless(thread, 'Threading required for this test.') 3844class BasicSocketPairTest(SocketPairTest): 3845 3846 def __init__(self, methodName='runTest'): 3847 SocketPairTest.__init__(self, methodName=methodName) 3848 3849 def _check_defaults(self, sock): 3850 self.assertIsInstance(sock, socket.socket) 3851 if hasattr(socket, 'AF_UNIX'): 3852 self.assertEqual(sock.family, socket.AF_UNIX) 3853 else: 3854 self.assertEqual(sock.family, socket.AF_INET) 3855 self.assertEqual(sock.type, socket.SOCK_STREAM) 3856 self.assertEqual(sock.proto, 0) 3857 3858 def _testDefaults(self): 3859 self._check_defaults(self.cli) 3860 3861 def testDefaults(self): 3862 self._check_defaults(self.serv) 3863 3864 def testRecv(self): 3865 msg = self.serv.recv(1024) 3866 self.assertEqual(msg, MSG) 3867 3868 def _testRecv(self): 3869 self.cli.send(MSG) 3870 3871 def testSend(self): 3872 self.serv.send(MSG) 3873 3874 def _testSend(self): 3875 msg = self.cli.recv(1024) 3876 self.assertEqual(msg, MSG) 3877 3878@unittest.skipUnless(thread, 'Threading required for this test.') 3879class NonBlockingTCPTests(ThreadedTCPSocketTest): 3880 3881 def __init__(self, methodName='runTest'): 3882 self.event = threading.Event() 3883 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 3884 3885 def testSetBlocking(self): 3886 # Testing whether set blocking works 3887 self.serv.setblocking(True) 3888 self.assertIsNone(self.serv.gettimeout()) 3889 self.serv.setblocking(False) 3890 self.assertEqual(self.serv.gettimeout(), 0.0) 3891 start = time.time() 3892 try: 3893 self.serv.accept() 3894 except OSError: 3895 pass 3896 end = time.time() 3897 self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") 3898 3899 def _testSetBlocking(self): 3900 pass 3901 3902 @support.cpython_only 3903 def testSetBlocking_overflow(self): 3904 # Issue 15989 3905 import _testcapi 3906 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 3907 self.skipTest('needs UINT_MAX < ULONG_MAX') 3908 self.serv.setblocking(False) 3909 self.assertEqual(self.serv.gettimeout(), 0.0) 3910 self.serv.setblocking(_testcapi.UINT_MAX + 1) 3911 self.assertIsNone(self.serv.gettimeout()) 3912 3913 _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) 3914 3915 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 3916 'test needs socket.SOCK_NONBLOCK') 3917 @support.requires_linux_version(2, 6, 28) 3918 def testInitNonBlocking(self): 3919 # reinit server socket 3920 self.serv.close() 3921 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | 3922 socket.SOCK_NONBLOCK) 3923 self.port = support.bind_port(self.serv) 3924 self.serv.listen() 3925 # actual testing 3926 start = time.time() 3927 try: 3928 self.serv.accept() 3929 except OSError: 3930 pass 3931 end = time.time() 3932 self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.") 3933 3934 def _testInitNonBlocking(self): 3935 pass 3936 3937 def testInheritFlags(self): 3938 # Issue #7995: when calling accept() on a listening socket with a 3939 # timeout, the resulting socket should not be non-blocking. 3940 self.serv.settimeout(10) 3941 try: 3942 conn, addr = self.serv.accept() 3943 message = conn.recv(len(MSG)) 3944 finally: 3945 conn.close() 3946 self.serv.settimeout(None) 3947 3948 def _testInheritFlags(self): 3949 time.sleep(0.1) 3950 self.cli.connect((HOST, self.port)) 3951 time.sleep(0.5) 3952 self.cli.send(MSG) 3953 3954 def testAccept(self): 3955 # Testing non-blocking accept 3956 self.serv.setblocking(0) 3957 3958 # connect() didn't start: non-blocking accept() fails 3959 with self.assertRaises(BlockingIOError): 3960 conn, addr = self.serv.accept() 3961 3962 self.event.set() 3963 3964 read, write, err = select.select([self.serv], [], [], MAIN_TIMEOUT) 3965 if self.serv not in read: 3966 self.fail("Error trying to do accept after select.") 3967 3968 # connect() completed: non-blocking accept() doesn't block 3969 conn, addr = self.serv.accept() 3970 self.addCleanup(conn.close) 3971 self.assertIsNone(conn.gettimeout()) 3972 3973 def _testAccept(self): 3974 # don't connect before event is set to check 3975 # that non-blocking accept() raises BlockingIOError 3976 self.event.wait() 3977 3978 self.cli.connect((HOST, self.port)) 3979 3980 def testConnect(self): 3981 # Testing non-blocking connect 3982 conn, addr = self.serv.accept() 3983 conn.close() 3984 3985 def _testConnect(self): 3986 self.cli.settimeout(10) 3987 self.cli.connect((HOST, self.port)) 3988 3989 def testRecv(self): 3990 # Testing non-blocking recv 3991 conn, addr = self.serv.accept() 3992 self.addCleanup(conn.close) 3993 conn.setblocking(0) 3994 3995 # the server didn't send data yet: non-blocking recv() fails 3996 with self.assertRaises(BlockingIOError): 3997 msg = conn.recv(len(MSG)) 3998 3999 self.event.set() 4000 4001 read, write, err = select.select([conn], [], [], MAIN_TIMEOUT) 4002 if conn not in read: 4003 self.fail("Error during select call to non-blocking socket.") 4004 4005 # the server sent data yet: non-blocking recv() doesn't block 4006 msg = conn.recv(len(MSG)) 4007 self.assertEqual(msg, MSG) 4008 4009 def _testRecv(self): 4010 self.cli.connect((HOST, self.port)) 4011 4012 # don't send anything before event is set to check 4013 # that non-blocking recv() raises BlockingIOError 4014 self.event.wait() 4015 4016 # send data: recv() will no longer block 4017 self.cli.sendall(MSG) 4018 4019@unittest.skipUnless(thread, 'Threading required for this test.') 4020class FileObjectClassTestCase(SocketConnectedTest): 4021 """Unit tests for the object returned by socket.makefile() 4022 4023 self.read_file is the io object returned by makefile() on 4024 the client connection. You can read from this file to 4025 get output from the server. 4026 4027 self.write_file is the io object returned by makefile() on the 4028 server connection. You can write to this file to send output 4029 to the client. 4030 """ 4031 4032 bufsize = -1 # Use default buffer size 4033 encoding = 'utf-8' 4034 errors = 'strict' 4035 newline = None 4036 4037 read_mode = 'rb' 4038 read_msg = MSG 4039 write_mode = 'wb' 4040 write_msg = MSG 4041 4042 def __init__(self, methodName='runTest'): 4043 SocketConnectedTest.__init__(self, methodName=methodName) 4044 4045 def setUp(self): 4046 self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ 4047 threading.Event() for i in range(4)] 4048 SocketConnectedTest.setUp(self) 4049 self.read_file = self.cli_conn.makefile( 4050 self.read_mode, self.bufsize, 4051 encoding = self.encoding, 4052 errors = self.errors, 4053 newline = self.newline) 4054 4055 def tearDown(self): 4056 self.serv_finished.set() 4057 self.read_file.close() 4058 self.assertTrue(self.read_file.closed) 4059 self.read_file = None 4060 SocketConnectedTest.tearDown(self) 4061 4062 def clientSetUp(self): 4063 SocketConnectedTest.clientSetUp(self) 4064 self.write_file = self.serv_conn.makefile( 4065 self.write_mode, self.bufsize, 4066 encoding = self.encoding, 4067 errors = self.errors, 4068 newline = self.newline) 4069 4070 def clientTearDown(self): 4071 self.cli_finished.set() 4072 self.write_file.close() 4073 self.assertTrue(self.write_file.closed) 4074 self.write_file = None 4075 SocketConnectedTest.clientTearDown(self) 4076 4077 def testReadAfterTimeout(self): 4078 # Issue #7322: A file object must disallow further reads 4079 # after a timeout has occurred. 4080 self.cli_conn.settimeout(1) 4081 self.read_file.read(3) 4082 # First read raises a timeout 4083 self.assertRaises(socket.timeout, self.read_file.read, 1) 4084 # Second read is disallowed 4085 with self.assertRaises(OSError) as ctx: 4086 self.read_file.read(1) 4087 self.assertIn("cannot read from timed out object", str(ctx.exception)) 4088 4089 def _testReadAfterTimeout(self): 4090 self.write_file.write(self.write_msg[0:3]) 4091 self.write_file.flush() 4092 self.serv_finished.wait() 4093 4094 def testSmallRead(self): 4095 # Performing small file read test 4096 first_seg = self.read_file.read(len(self.read_msg)-3) 4097 second_seg = self.read_file.read(3) 4098 msg = first_seg + second_seg 4099 self.assertEqual(msg, self.read_msg) 4100 4101 def _testSmallRead(self): 4102 self.write_file.write(self.write_msg) 4103 self.write_file.flush() 4104 4105 def testFullRead(self): 4106 # read until EOF 4107 msg = self.read_file.read() 4108 self.assertEqual(msg, self.read_msg) 4109 4110 def _testFullRead(self): 4111 self.write_file.write(self.write_msg) 4112 self.write_file.close() 4113 4114 def testUnbufferedRead(self): 4115 # Performing unbuffered file read test 4116 buf = type(self.read_msg)() 4117 while 1: 4118 char = self.read_file.read(1) 4119 if not char: 4120 break 4121 buf += char 4122 self.assertEqual(buf, self.read_msg) 4123 4124 def _testUnbufferedRead(self): 4125 self.write_file.write(self.write_msg) 4126 self.write_file.flush() 4127 4128 def testReadline(self): 4129 # Performing file readline test 4130 line = self.read_file.readline() 4131 self.assertEqual(line, self.read_msg) 4132 4133 def _testReadline(self): 4134 self.write_file.write(self.write_msg) 4135 self.write_file.flush() 4136 4137 def testCloseAfterMakefile(self): 4138 # The file returned by makefile should keep the socket open. 4139 self.cli_conn.close() 4140 # read until EOF 4141 msg = self.read_file.read() 4142 self.assertEqual(msg, self.read_msg) 4143 4144 def _testCloseAfterMakefile(self): 4145 self.write_file.write(self.write_msg) 4146 self.write_file.flush() 4147 4148 def testMakefileAfterMakefileClose(self): 4149 self.read_file.close() 4150 msg = self.cli_conn.recv(len(MSG)) 4151 if isinstance(self.read_msg, str): 4152 msg = msg.decode() 4153 self.assertEqual(msg, self.read_msg) 4154 4155 def _testMakefileAfterMakefileClose(self): 4156 self.write_file.write(self.write_msg) 4157 self.write_file.flush() 4158 4159 def testClosedAttr(self): 4160 self.assertTrue(not self.read_file.closed) 4161 4162 def _testClosedAttr(self): 4163 self.assertTrue(not self.write_file.closed) 4164 4165 def testAttributes(self): 4166 self.assertEqual(self.read_file.mode, self.read_mode) 4167 self.assertEqual(self.read_file.name, self.cli_conn.fileno()) 4168 4169 def _testAttributes(self): 4170 self.assertEqual(self.write_file.mode, self.write_mode) 4171 self.assertEqual(self.write_file.name, self.serv_conn.fileno()) 4172 4173 def testRealClose(self): 4174 self.read_file.close() 4175 self.assertRaises(ValueError, self.read_file.fileno) 4176 self.cli_conn.close() 4177 self.assertRaises(OSError, self.cli_conn.getsockname) 4178 4179 def _testRealClose(self): 4180 pass 4181 4182 4183class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 4184 4185 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 4186 4187 In this case (and in this case only), it should be possible to 4188 create a file object, read a line from it, create another file 4189 object, read another line from it, without loss of data in the 4190 first file object's buffer. Note that http.client relies on this 4191 when reading multiple requests from the same socket.""" 4192 4193 bufsize = 0 # Use unbuffered mode 4194 4195 def testUnbufferedReadline(self): 4196 # Read a line, create a new file object, read another line with it 4197 line = self.read_file.readline() # first line 4198 self.assertEqual(line, b"A. " + self.write_msg) # first line 4199 self.read_file = self.cli_conn.makefile('rb', 0) 4200 line = self.read_file.readline() # second line 4201 self.assertEqual(line, b"B. " + self.write_msg) # second line 4202 4203 def _testUnbufferedReadline(self): 4204 self.write_file.write(b"A. " + self.write_msg) 4205 self.write_file.write(b"B. " + self.write_msg) 4206 self.write_file.flush() 4207 4208 def testMakefileClose(self): 4209 # The file returned by makefile should keep the socket open... 4210 self.cli_conn.close() 4211 msg = self.cli_conn.recv(1024) 4212 self.assertEqual(msg, self.read_msg) 4213 # ...until the file is itself closed 4214 self.read_file.close() 4215 self.assertRaises(OSError, self.cli_conn.recv, 1024) 4216 4217 def _testMakefileClose(self): 4218 self.write_file.write(self.write_msg) 4219 self.write_file.flush() 4220 4221 @support.refcount_test 4222 def testMakefileCloseSocketDestroy(self): 4223 refcount_before = sys.getrefcount(self.cli_conn) 4224 self.read_file.close() 4225 refcount_after = sys.getrefcount(self.cli_conn) 4226 self.assertEqual(refcount_before - 1, refcount_after) 4227 4228 def _testMakefileCloseSocketDestroy(self): 4229 pass 4230 4231 # Non-blocking ops 4232 # NOTE: to set `read_file` as non-blocking, we must call 4233 # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). 4234 4235 def testSmallReadNonBlocking(self): 4236 self.cli_conn.setblocking(False) 4237 self.assertEqual(self.read_file.readinto(bytearray(10)), None) 4238 self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) 4239 self.evt1.set() 4240 self.evt2.wait(1.0) 4241 first_seg = self.read_file.read(len(self.read_msg) - 3) 4242 if first_seg is None: 4243 # Data not arrived (can happen under Windows), wait a bit 4244 time.sleep(0.5) 4245 first_seg = self.read_file.read(len(self.read_msg) - 3) 4246 buf = bytearray(10) 4247 n = self.read_file.readinto(buf) 4248 self.assertEqual(n, 3) 4249 msg = first_seg + buf[:n] 4250 self.assertEqual(msg, self.read_msg) 4251 self.assertEqual(self.read_file.readinto(bytearray(16)), None) 4252 self.assertEqual(self.read_file.read(1), None) 4253 4254 def _testSmallReadNonBlocking(self): 4255 self.evt1.wait(1.0) 4256 self.write_file.write(self.write_msg) 4257 self.write_file.flush() 4258 self.evt2.set() 4259 # Avoid closing the socket before the server test has finished, 4260 # otherwise system recv() will return 0 instead of EWOULDBLOCK. 4261 self.serv_finished.wait(5.0) 4262 4263 def testWriteNonBlocking(self): 4264 self.cli_finished.wait(5.0) 4265 # The client thread can't skip directly - the SkipTest exception 4266 # would appear as a failure. 4267 if self.serv_skipped: 4268 self.skipTest(self.serv_skipped) 4269 4270 def _testWriteNonBlocking(self): 4271 self.serv_skipped = None 4272 self.serv_conn.setblocking(False) 4273 # Try to saturate the socket buffer pipe with repeated large writes. 4274 BIG = b"x" * support.SOCK_MAX_SIZE 4275 LIMIT = 10 4276 # The first write() succeeds since a chunk of data can be buffered 4277 n = self.write_file.write(BIG) 4278 self.assertGreater(n, 0) 4279 for i in range(LIMIT): 4280 n = self.write_file.write(BIG) 4281 if n is None: 4282 # Succeeded 4283 break 4284 self.assertGreater(n, 0) 4285 else: 4286 # Let us know that this test didn't manage to establish 4287 # the expected conditions. This is not a failure in itself but, 4288 # if it happens repeatedly, the test should be fixed. 4289 self.serv_skipped = "failed to saturate the socket buffer" 4290 4291 4292class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 4293 4294 bufsize = 1 # Default-buffered for reading; line-buffered for writing 4295 4296 4297class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 4298 4299 bufsize = 2 # Exercise the buffering code 4300 4301 4302class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): 4303 """Tests for socket.makefile() in text mode (rather than binary)""" 4304 4305 read_mode = 'r' 4306 read_msg = MSG.decode('utf-8') 4307 write_mode = 'wb' 4308 write_msg = MSG 4309 newline = '' 4310 4311 4312class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): 4313 """Tests for socket.makefile() in text mode (rather than binary)""" 4314 4315 read_mode = 'rb' 4316 read_msg = MSG 4317 write_mode = 'w' 4318 write_msg = MSG.decode('utf-8') 4319 newline = '' 4320 4321 4322class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): 4323 """Tests for socket.makefile() in text mode (rather than binary)""" 4324 4325 read_mode = 'r' 4326 read_msg = MSG.decode('utf-8') 4327 write_mode = 'w' 4328 write_msg = MSG.decode('utf-8') 4329 newline = '' 4330 4331 4332class NetworkConnectionTest(object): 4333 """Prove network connection.""" 4334 4335 def clientSetUp(self): 4336 # We're inherited below by BasicTCPTest2, which also inherits 4337 # BasicTCPTest, which defines self.port referenced below. 4338 self.cli = socket.create_connection((HOST, self.port)) 4339 self.serv_conn = self.cli 4340 4341class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 4342 """Tests that NetworkConnection does not break existing TCP functionality. 4343 """ 4344 4345class NetworkConnectionNoServer(unittest.TestCase): 4346 4347 class MockSocket(socket.socket): 4348 def connect(self, *args): 4349 raise socket.timeout('timed out') 4350 4351 @contextlib.contextmanager 4352 def mocked_socket_module(self): 4353 """Return a socket which times out on connect""" 4354 old_socket = socket.socket 4355 socket.socket = self.MockSocket 4356 try: 4357 yield 4358 finally: 4359 socket.socket = old_socket 4360 4361 def test_connect(self): 4362 port = support.find_unused_port() 4363 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4364 self.addCleanup(cli.close) 4365 with self.assertRaises(OSError) as cm: 4366 cli.connect((HOST, port)) 4367 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 4368 4369 def test_create_connection(self): 4370 # Issue #9792: errors raised by create_connection() should have 4371 # a proper errno attribute. 4372 port = support.find_unused_port() 4373 with self.assertRaises(OSError) as cm: 4374 socket.create_connection((HOST, port)) 4375 4376 # Issue #16257: create_connection() calls getaddrinfo() against 4377 # 'localhost'. This may result in an IPV6 addr being returned 4378 # as well as an IPV4 one: 4379 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 4380 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 4381 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 4382 # 4383 # create_connection() enumerates through all the addresses returned 4384 # and if it doesn't successfully bind to any of them, it propagates 4385 # the last exception it encountered. 4386 # 4387 # On Solaris, ENETUNREACH is returned in this circumstance instead 4388 # of ECONNREFUSED. So, if that errno exists, add it to our list of 4389 # expected errnos. 4390 expected_errnos = [ errno.ECONNREFUSED, ] 4391 if hasattr(errno, 'ENETUNREACH'): 4392 expected_errnos.append(errno.ENETUNREACH) 4393 if hasattr(errno, 'EADDRNOTAVAIL'): 4394 # bpo-31910: socket.create_connection() fails randomly 4395 # with EADDRNOTAVAIL on Travis CI 4396 expected_errnos.append(errno.EADDRNOTAVAIL) 4397 4398 self.assertIn(cm.exception.errno, expected_errnos) 4399 4400 def test_create_connection_timeout(self): 4401 # Issue #9792: create_connection() should not recast timeout errors 4402 # as generic socket errors. 4403 with self.mocked_socket_module(): 4404 with self.assertRaises(socket.timeout): 4405 socket.create_connection((HOST, 1234)) 4406 4407 4408@unittest.skipUnless(thread, 'Threading required for this test.') 4409class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 4410 4411 def __init__(self, methodName='runTest'): 4412 SocketTCPTest.__init__(self, methodName=methodName) 4413 ThreadableTest.__init__(self) 4414 4415 def clientSetUp(self): 4416 self.source_port = support.find_unused_port() 4417 4418 def clientTearDown(self): 4419 self.cli.close() 4420 self.cli = None 4421 ThreadableTest.clientTearDown(self) 4422 4423 def _justAccept(self): 4424 conn, addr = self.serv.accept() 4425 conn.close() 4426 4427 testFamily = _justAccept 4428 def _testFamily(self): 4429 self.cli = socket.create_connection((HOST, self.port), timeout=30) 4430 self.addCleanup(self.cli.close) 4431 self.assertEqual(self.cli.family, 2) 4432 4433 testSourceAddress = _justAccept 4434 def _testSourceAddress(self): 4435 self.cli = socket.create_connection((HOST, self.port), timeout=30, 4436 source_address=('', self.source_port)) 4437 self.addCleanup(self.cli.close) 4438 self.assertEqual(self.cli.getsockname()[1], self.source_port) 4439 # The port number being used is sufficient to show that the bind() 4440 # call happened. 4441 4442 testTimeoutDefault = _justAccept 4443 def _testTimeoutDefault(self): 4444 # passing no explicit timeout uses socket's global default 4445 self.assertTrue(socket.getdefaulttimeout() is None) 4446 socket.setdefaulttimeout(42) 4447 try: 4448 self.cli = socket.create_connection((HOST, self.port)) 4449 self.addCleanup(self.cli.close) 4450 finally: 4451 socket.setdefaulttimeout(None) 4452 self.assertEqual(self.cli.gettimeout(), 42) 4453 4454 testTimeoutNone = _justAccept 4455 def _testTimeoutNone(self): 4456 # None timeout means the same as sock.settimeout(None) 4457 self.assertTrue(socket.getdefaulttimeout() is None) 4458 socket.setdefaulttimeout(30) 4459 try: 4460 self.cli = socket.create_connection((HOST, self.port), timeout=None) 4461 self.addCleanup(self.cli.close) 4462 finally: 4463 socket.setdefaulttimeout(None) 4464 self.assertEqual(self.cli.gettimeout(), None) 4465 4466 testTimeoutValueNamed = _justAccept 4467 def _testTimeoutValueNamed(self): 4468 self.cli = socket.create_connection((HOST, self.port), timeout=30) 4469 self.assertEqual(self.cli.gettimeout(), 30) 4470 4471 testTimeoutValueNonamed = _justAccept 4472 def _testTimeoutValueNonamed(self): 4473 self.cli = socket.create_connection((HOST, self.port), 30) 4474 self.addCleanup(self.cli.close) 4475 self.assertEqual(self.cli.gettimeout(), 30) 4476 4477@unittest.skipUnless(thread, 'Threading required for this test.') 4478class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 4479 4480 def __init__(self, methodName='runTest'): 4481 SocketTCPTest.__init__(self, methodName=methodName) 4482 ThreadableTest.__init__(self) 4483 4484 def clientSetUp(self): 4485 pass 4486 4487 def clientTearDown(self): 4488 self.cli.close() 4489 self.cli = None 4490 ThreadableTest.clientTearDown(self) 4491 4492 def testInsideTimeout(self): 4493 conn, addr = self.serv.accept() 4494 self.addCleanup(conn.close) 4495 time.sleep(3) 4496 conn.send(b"done!") 4497 testOutsideTimeout = testInsideTimeout 4498 4499 def _testInsideTimeout(self): 4500 self.cli = sock = socket.create_connection((HOST, self.port)) 4501 data = sock.recv(5) 4502 self.assertEqual(data, b"done!") 4503 4504 def _testOutsideTimeout(self): 4505 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 4506 self.assertRaises(socket.timeout, lambda: sock.recv(5)) 4507 4508 4509class TCPTimeoutTest(SocketTCPTest): 4510 4511 def testTCPTimeout(self): 4512 def raise_timeout(*args, **kwargs): 4513 self.serv.settimeout(1.0) 4514 self.serv.accept() 4515 self.assertRaises(socket.timeout, raise_timeout, 4516 "Error generating a timeout exception (TCP)") 4517 4518 def testTimeoutZero(self): 4519 ok = False 4520 try: 4521 self.serv.settimeout(0.0) 4522 foo = self.serv.accept() 4523 except socket.timeout: 4524 self.fail("caught timeout instead of error (TCP)") 4525 except OSError: 4526 ok = True 4527 except: 4528 self.fail("caught unexpected exception (TCP)") 4529 if not ok: 4530 self.fail("accept() returned success when we did not expect it") 4531 4532 @unittest.skipUnless(hasattr(signal, 'alarm'), 4533 'test needs signal.alarm()') 4534 def testInterruptedTimeout(self): 4535 # XXX I don't know how to do this test on MSWindows or any other 4536 # plaform that doesn't support signal.alarm() or os.kill(), though 4537 # the bug should have existed on all platforms. 4538 self.serv.settimeout(5.0) # must be longer than alarm 4539 class Alarm(Exception): 4540 pass 4541 def alarm_handler(signal, frame): 4542 raise Alarm 4543 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 4544 try: 4545 try: 4546 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 4547 foo = self.serv.accept() 4548 except socket.timeout: 4549 self.fail("caught timeout instead of Alarm") 4550 except Alarm: 4551 pass 4552 except: 4553 self.fail("caught other exception instead of Alarm:" 4554 " %s(%s):\n%s" % 4555 (sys.exc_info()[:2] + (traceback.format_exc(),))) 4556 else: 4557 self.fail("nothing caught") 4558 finally: 4559 signal.alarm(0) # shut off alarm 4560 except Alarm: 4561 self.fail("got Alarm in wrong place") 4562 finally: 4563 # no alarm can be pending. Safe to restore old handler. 4564 signal.signal(signal.SIGALRM, old_alarm) 4565 4566class UDPTimeoutTest(SocketUDPTest): 4567 4568 def testUDPTimeout(self): 4569 def raise_timeout(*args, **kwargs): 4570 self.serv.settimeout(1.0) 4571 self.serv.recv(1024) 4572 self.assertRaises(socket.timeout, raise_timeout, 4573 "Error generating a timeout exception (UDP)") 4574 4575 def testTimeoutZero(self): 4576 ok = False 4577 try: 4578 self.serv.settimeout(0.0) 4579 foo = self.serv.recv(1024) 4580 except socket.timeout: 4581 self.fail("caught timeout instead of error (UDP)") 4582 except OSError: 4583 ok = True 4584 except: 4585 self.fail("caught unexpected exception (UDP)") 4586 if not ok: 4587 self.fail("recv() returned success when we did not expect it") 4588 4589class TestExceptions(unittest.TestCase): 4590 4591 def testExceptionTree(self): 4592 self.assertTrue(issubclass(OSError, Exception)) 4593 self.assertTrue(issubclass(socket.herror, OSError)) 4594 self.assertTrue(issubclass(socket.gaierror, OSError)) 4595 self.assertTrue(issubclass(socket.timeout, OSError)) 4596 4597 def test_setblocking_invalidfd(self): 4598 # Regression test for issue #28471 4599 4600 sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 4601 sock = socket.socket( 4602 socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) 4603 sock0.close() 4604 self.addCleanup(sock.detach) 4605 4606 with self.assertRaises(OSError): 4607 sock.setblocking(False) 4608 4609 4610@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 4611class TestLinuxAbstractNamespace(unittest.TestCase): 4612 4613 UNIX_PATH_MAX = 108 4614 4615 def testLinuxAbstractNamespace(self): 4616 address = b"\x00python-test-hello\x00\xff" 4617 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 4618 s1.bind(address) 4619 s1.listen() 4620 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 4621 s2.connect(s1.getsockname()) 4622 with s1.accept()[0] as s3: 4623 self.assertEqual(s1.getsockname(), address) 4624 self.assertEqual(s2.getpeername(), address) 4625 4626 def testMaxName(self): 4627 address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) 4628 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 4629 s.bind(address) 4630 self.assertEqual(s.getsockname(), address) 4631 4632 def testNameOverflow(self): 4633 address = "\x00" + "h" * self.UNIX_PATH_MAX 4634 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 4635 self.assertRaises(OSError, s.bind, address) 4636 4637 def testStrName(self): 4638 # Check that an abstract name can be passed as a string. 4639 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 4640 try: 4641 s.bind("\x00python\x00test\x00") 4642 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 4643 finally: 4644 s.close() 4645 4646 def testBytearrayName(self): 4647 # Check that an abstract name can be passed as a bytearray. 4648 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 4649 s.bind(bytearray(b"\x00python\x00test\x00")) 4650 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 4651 4652@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') 4653class TestUnixDomain(unittest.TestCase): 4654 4655 def setUp(self): 4656 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 4657 4658 def tearDown(self): 4659 self.sock.close() 4660 4661 def encoded(self, path): 4662 # Return the given path encoded in the file system encoding, 4663 # or skip the test if this is not possible. 4664 try: 4665 return os.fsencode(path) 4666 except UnicodeEncodeError: 4667 self.skipTest( 4668 "Pathname {0!a} cannot be represented in file " 4669 "system encoding {1!r}".format( 4670 path, sys.getfilesystemencoding())) 4671 4672 def bind(self, sock, path): 4673 # Bind the socket 4674 try: 4675 support.bind_unix_socket(sock, path) 4676 except OSError as e: 4677 if str(e) == "AF_UNIX path too long": 4678 self.skipTest( 4679 "Pathname {0!a} is too long to serve as an AF_UNIX path" 4680 .format(path)) 4681 else: 4682 raise 4683 4684 def testUnbound(self): 4685 # Issue #30205 4686 self.assertIn(self.sock.getsockname(), ('', None)) 4687 4688 def testStrAddr(self): 4689 # Test binding to and retrieving a normal string pathname. 4690 path = os.path.abspath(support.TESTFN) 4691 self.bind(self.sock, path) 4692 self.addCleanup(support.unlink, path) 4693 self.assertEqual(self.sock.getsockname(), path) 4694 4695 def testBytesAddr(self): 4696 # Test binding to a bytes pathname. 4697 path = os.path.abspath(support.TESTFN) 4698 self.bind(self.sock, self.encoded(path)) 4699 self.addCleanup(support.unlink, path) 4700 self.assertEqual(self.sock.getsockname(), path) 4701 4702 def testSurrogateescapeBind(self): 4703 # Test binding to a valid non-ASCII pathname, with the 4704 # non-ASCII bytes supplied using surrogateescape encoding. 4705 path = os.path.abspath(support.TESTFN_UNICODE) 4706 b = self.encoded(path) 4707 self.bind(self.sock, b.decode("ascii", "surrogateescape")) 4708 self.addCleanup(support.unlink, path) 4709 self.assertEqual(self.sock.getsockname(), path) 4710 4711 def testUnencodableAddr(self): 4712 # Test binding to a pathname that cannot be encoded in the 4713 # file system encoding. 4714 if support.TESTFN_UNENCODABLE is None: 4715 self.skipTest("No unencodable filename available") 4716 path = os.path.abspath(support.TESTFN_UNENCODABLE) 4717 self.bind(self.sock, path) 4718 self.addCleanup(support.unlink, path) 4719 self.assertEqual(self.sock.getsockname(), path) 4720 4721@unittest.skipUnless(thread, 'Threading required for this test.') 4722class BufferIOTest(SocketConnectedTest): 4723 """ 4724 Test the buffer versions of socket.recv() and socket.send(). 4725 """ 4726 def __init__(self, methodName='runTest'): 4727 SocketConnectedTest.__init__(self, methodName=methodName) 4728 4729 def testRecvIntoArray(self): 4730 buf = array.array("B", [0] * len(MSG)) 4731 nbytes = self.cli_conn.recv_into(buf) 4732 self.assertEqual(nbytes, len(MSG)) 4733 buf = buf.tobytes() 4734 msg = buf[:len(MSG)] 4735 self.assertEqual(msg, MSG) 4736 4737 def _testRecvIntoArray(self): 4738 buf = bytes(MSG) 4739 self.serv_conn.send(buf) 4740 4741 def testRecvIntoBytearray(self): 4742 buf = bytearray(1024) 4743 nbytes = self.cli_conn.recv_into(buf) 4744 self.assertEqual(nbytes, len(MSG)) 4745 msg = buf[:len(MSG)] 4746 self.assertEqual(msg, MSG) 4747 4748 _testRecvIntoBytearray = _testRecvIntoArray 4749 4750 def testRecvIntoMemoryview(self): 4751 buf = bytearray(1024) 4752 nbytes = self.cli_conn.recv_into(memoryview(buf)) 4753 self.assertEqual(nbytes, len(MSG)) 4754 msg = buf[:len(MSG)] 4755 self.assertEqual(msg, MSG) 4756 4757 _testRecvIntoMemoryview = _testRecvIntoArray 4758 4759 def testRecvFromIntoArray(self): 4760 buf = array.array("B", [0] * len(MSG)) 4761 nbytes, addr = self.cli_conn.recvfrom_into(buf) 4762 self.assertEqual(nbytes, len(MSG)) 4763 buf = buf.tobytes() 4764 msg = buf[:len(MSG)] 4765 self.assertEqual(msg, MSG) 4766 4767 def _testRecvFromIntoArray(self): 4768 buf = bytes(MSG) 4769 self.serv_conn.send(buf) 4770 4771 def testRecvFromIntoBytearray(self): 4772 buf = bytearray(1024) 4773 nbytes, addr = self.cli_conn.recvfrom_into(buf) 4774 self.assertEqual(nbytes, len(MSG)) 4775 msg = buf[:len(MSG)] 4776 self.assertEqual(msg, MSG) 4777 4778 _testRecvFromIntoBytearray = _testRecvFromIntoArray 4779 4780 def testRecvFromIntoMemoryview(self): 4781 buf = bytearray(1024) 4782 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 4783 self.assertEqual(nbytes, len(MSG)) 4784 msg = buf[:len(MSG)] 4785 self.assertEqual(msg, MSG) 4786 4787 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 4788 4789 def testRecvFromIntoSmallBuffer(self): 4790 # See issue #20246. 4791 buf = bytearray(8) 4792 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 4793 4794 def _testRecvFromIntoSmallBuffer(self): 4795 self.serv_conn.send(MSG) 4796 4797 def testRecvFromIntoEmptyBuffer(self): 4798 buf = bytearray() 4799 self.cli_conn.recvfrom_into(buf) 4800 self.cli_conn.recvfrom_into(buf, 0) 4801 4802 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 4803 4804 4805TIPC_STYPE = 2000 4806TIPC_LOWER = 200 4807TIPC_UPPER = 210 4808 4809def isTipcAvailable(): 4810 """Check if the TIPC module is loaded 4811 4812 The TIPC module is not loaded automatically on Ubuntu and probably 4813 other Linux distros. 4814 """ 4815 if not hasattr(socket, "AF_TIPC"): 4816 return False 4817 try: 4818 f = open("/proc/modules") 4819 except (FileNotFoundError, IsADirectoryError, PermissionError): 4820 # It's ok if the file does not exist, is a directory or if we 4821 # have not the permission to read it. 4822 return False 4823 with f: 4824 for line in f: 4825 if line.startswith("tipc "): 4826 return True 4827 return False 4828 4829@unittest.skipUnless(isTipcAvailable(), 4830 "TIPC module is not loaded, please 'sudo modprobe tipc'") 4831class TIPCTest(unittest.TestCase): 4832 def testRDM(self): 4833 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 4834 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 4835 self.addCleanup(srv.close) 4836 self.addCleanup(cli.close) 4837 4838 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 4839 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 4840 TIPC_LOWER, TIPC_UPPER) 4841 srv.bind(srvaddr) 4842 4843 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 4844 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 4845 cli.sendto(MSG, sendaddr) 4846 4847 msg, recvaddr = srv.recvfrom(1024) 4848 4849 self.assertEqual(cli.getsockname(), recvaddr) 4850 self.assertEqual(msg, MSG) 4851 4852 4853@unittest.skipUnless(isTipcAvailable(), 4854 "TIPC module is not loaded, please 'sudo modprobe tipc'") 4855class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 4856 def __init__(self, methodName = 'runTest'): 4857 unittest.TestCase.__init__(self, methodName = methodName) 4858 ThreadableTest.__init__(self) 4859 4860 def setUp(self): 4861 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 4862 self.addCleanup(self.srv.close) 4863 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 4864 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 4865 TIPC_LOWER, TIPC_UPPER) 4866 self.srv.bind(srvaddr) 4867 self.srv.listen() 4868 self.serverExplicitReady() 4869 self.conn, self.connaddr = self.srv.accept() 4870 self.addCleanup(self.conn.close) 4871 4872 def clientSetUp(self): 4873 # There is a hittable race between serverExplicitReady() and the 4874 # accept() call; sleep a little while to avoid it, otherwise 4875 # we could get an exception 4876 time.sleep(0.1) 4877 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 4878 self.addCleanup(self.cli.close) 4879 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 4880 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 4881 self.cli.connect(addr) 4882 self.cliaddr = self.cli.getsockname() 4883 4884 def testStream(self): 4885 msg = self.conn.recv(1024) 4886 self.assertEqual(msg, MSG) 4887 self.assertEqual(self.cliaddr, self.connaddr) 4888 4889 def _testStream(self): 4890 self.cli.send(MSG) 4891 self.cli.close() 4892 4893 4894@unittest.skipUnless(thread, 'Threading required for this test.') 4895class ContextManagersTest(ThreadedTCPSocketTest): 4896 4897 def _testSocketClass(self): 4898 # base test 4899 with socket.socket() as sock: 4900 self.assertFalse(sock._closed) 4901 self.assertTrue(sock._closed) 4902 # close inside with block 4903 with socket.socket() as sock: 4904 sock.close() 4905 self.assertTrue(sock._closed) 4906 # exception inside with block 4907 with socket.socket() as sock: 4908 self.assertRaises(OSError, sock.sendall, b'foo') 4909 self.assertTrue(sock._closed) 4910 4911 def testCreateConnectionBase(self): 4912 conn, addr = self.serv.accept() 4913 self.addCleanup(conn.close) 4914 data = conn.recv(1024) 4915 conn.sendall(data) 4916 4917 def _testCreateConnectionBase(self): 4918 address = self.serv.getsockname() 4919 with socket.create_connection(address) as sock: 4920 self.assertFalse(sock._closed) 4921 sock.sendall(b'foo') 4922 self.assertEqual(sock.recv(1024), b'foo') 4923 self.assertTrue(sock._closed) 4924 4925 def testCreateConnectionClose(self): 4926 conn, addr = self.serv.accept() 4927 self.addCleanup(conn.close) 4928 data = conn.recv(1024) 4929 conn.sendall(data) 4930 4931 def _testCreateConnectionClose(self): 4932 address = self.serv.getsockname() 4933 with socket.create_connection(address) as sock: 4934 sock.close() 4935 self.assertTrue(sock._closed) 4936 self.assertRaises(OSError, sock.sendall, b'foo') 4937 4938 4939class InheritanceTest(unittest.TestCase): 4940 @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), 4941 "SOCK_CLOEXEC not defined") 4942 @support.requires_linux_version(2, 6, 28) 4943 def test_SOCK_CLOEXEC(self): 4944 with socket.socket(socket.AF_INET, 4945 socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: 4946 self.assertTrue(s.type & socket.SOCK_CLOEXEC) 4947 self.assertFalse(s.get_inheritable()) 4948 4949 def test_default_inheritable(self): 4950 sock = socket.socket() 4951 with sock: 4952 self.assertEqual(sock.get_inheritable(), False) 4953 4954 def test_dup(self): 4955 sock = socket.socket() 4956 with sock: 4957 newsock = sock.dup() 4958 sock.close() 4959 with newsock: 4960 self.assertEqual(newsock.get_inheritable(), False) 4961 4962 def test_set_inheritable(self): 4963 sock = socket.socket() 4964 with sock: 4965 sock.set_inheritable(True) 4966 self.assertEqual(sock.get_inheritable(), True) 4967 4968 sock.set_inheritable(False) 4969 self.assertEqual(sock.get_inheritable(), False) 4970 4971 @unittest.skipIf(fcntl is None, "need fcntl") 4972 def test_get_inheritable_cloexec(self): 4973 sock = socket.socket() 4974 with sock: 4975 fd = sock.fileno() 4976 self.assertEqual(sock.get_inheritable(), False) 4977 4978 # clear FD_CLOEXEC flag 4979 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 4980 flags &= ~fcntl.FD_CLOEXEC 4981 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 4982 4983 self.assertEqual(sock.get_inheritable(), True) 4984 4985 @unittest.skipIf(fcntl is None, "need fcntl") 4986 def test_set_inheritable_cloexec(self): 4987 sock = socket.socket() 4988 with sock: 4989 fd = sock.fileno() 4990 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 4991 fcntl.FD_CLOEXEC) 4992 4993 sock.set_inheritable(True) 4994 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 4995 0) 4996 4997 4998 @unittest.skipUnless(hasattr(socket, "socketpair"), 4999 "need socket.socketpair()") 5000 def test_socketpair(self): 5001 s1, s2 = socket.socketpair() 5002 self.addCleanup(s1.close) 5003 self.addCleanup(s2.close) 5004 self.assertEqual(s1.get_inheritable(), False) 5005 self.assertEqual(s2.get_inheritable(), False) 5006 5007 5008@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), 5009 "SOCK_NONBLOCK not defined") 5010class NonblockConstantTest(unittest.TestCase): 5011 def checkNonblock(self, s, nonblock=True, timeout=0.0): 5012 if nonblock: 5013 self.assertTrue(s.type & socket.SOCK_NONBLOCK) 5014 self.assertEqual(s.gettimeout(), timeout) 5015 else: 5016 self.assertFalse(s.type & socket.SOCK_NONBLOCK) 5017 self.assertEqual(s.gettimeout(), None) 5018 5019 @support.requires_linux_version(2, 6, 28) 5020 def test_SOCK_NONBLOCK(self): 5021 # a lot of it seems silly and redundant, but I wanted to test that 5022 # changing back and forth worked ok 5023 with socket.socket(socket.AF_INET, 5024 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: 5025 self.checkNonblock(s) 5026 s.setblocking(1) 5027 self.checkNonblock(s, False) 5028 s.setblocking(0) 5029 self.checkNonblock(s) 5030 s.settimeout(None) 5031 self.checkNonblock(s, False) 5032 s.settimeout(2.0) 5033 self.checkNonblock(s, timeout=2.0) 5034 s.setblocking(1) 5035 self.checkNonblock(s, False) 5036 # defaulttimeout 5037 t = socket.getdefaulttimeout() 5038 socket.setdefaulttimeout(0.0) 5039 with socket.socket() as s: 5040 self.checkNonblock(s) 5041 socket.setdefaulttimeout(None) 5042 with socket.socket() as s: 5043 self.checkNonblock(s, False) 5044 socket.setdefaulttimeout(2.0) 5045 with socket.socket() as s: 5046 self.checkNonblock(s, timeout=2.0) 5047 socket.setdefaulttimeout(None) 5048 with socket.socket() as s: 5049 self.checkNonblock(s, False) 5050 socket.setdefaulttimeout(t) 5051 5052 5053@unittest.skipUnless(os.name == "nt", "Windows specific") 5054@unittest.skipUnless(multiprocessing, "need multiprocessing") 5055class TestSocketSharing(SocketTCPTest): 5056 # This must be classmethod and not staticmethod or multiprocessing 5057 # won't be able to bootstrap it. 5058 @classmethod 5059 def remoteProcessServer(cls, q): 5060 # Recreate socket from shared data 5061 sdata = q.get() 5062 message = q.get() 5063 5064 s = socket.fromshare(sdata) 5065 s2, c = s.accept() 5066 5067 # Send the message 5068 s2.sendall(message) 5069 s2.close() 5070 s.close() 5071 5072 def testShare(self): 5073 # Transfer the listening server socket to another process 5074 # and service it from there. 5075 5076 # Create process: 5077 q = multiprocessing.Queue() 5078 p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) 5079 p.start() 5080 5081 # Get the shared socket data 5082 data = self.serv.share(p.pid) 5083 5084 # Pass the shared socket to the other process 5085 addr = self.serv.getsockname() 5086 self.serv.close() 5087 q.put(data) 5088 5089 # The data that the server will send us 5090 message = b"slapmahfro" 5091 q.put(message) 5092 5093 # Connect 5094 s = socket.create_connection(addr) 5095 # listen for the data 5096 m = [] 5097 while True: 5098 data = s.recv(100) 5099 if not data: 5100 break 5101 m.append(data) 5102 s.close() 5103 received = b"".join(m) 5104 self.assertEqual(received, message) 5105 p.join() 5106 5107 def testShareLength(self): 5108 data = self.serv.share(os.getpid()) 5109 self.assertRaises(ValueError, socket.fromshare, data[:-1]) 5110 self.assertRaises(ValueError, socket.fromshare, data+b"foo") 5111 5112 def compareSockets(self, org, other): 5113 # socket sharing is expected to work only for blocking socket 5114 # since the internal python timeout value isn't transferred. 5115 self.assertEqual(org.gettimeout(), None) 5116 self.assertEqual(org.gettimeout(), other.gettimeout()) 5117 5118 self.assertEqual(org.family, other.family) 5119 self.assertEqual(org.type, other.type) 5120 # If the user specified "0" for proto, then 5121 # internally windows will have picked the correct value. 5122 # Python introspection on the socket however will still return 5123 # 0. For the shared socket, the python value is recreated 5124 # from the actual value, so it may not compare correctly. 5125 if org.proto != 0: 5126 self.assertEqual(org.proto, other.proto) 5127 5128 def testShareLocal(self): 5129 data = self.serv.share(os.getpid()) 5130 s = socket.fromshare(data) 5131 try: 5132 self.compareSockets(self.serv, s) 5133 finally: 5134 s.close() 5135 5136 def testTypes(self): 5137 families = [socket.AF_INET, socket.AF_INET6] 5138 types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] 5139 for f in families: 5140 for t in types: 5141 try: 5142 source = socket.socket(f, t) 5143 except OSError: 5144 continue # This combination is not supported 5145 try: 5146 data = source.share(os.getpid()) 5147 shared = socket.fromshare(data) 5148 try: 5149 self.compareSockets(source, shared) 5150 finally: 5151 shared.close() 5152 finally: 5153 source.close() 5154 5155 5156@unittest.skipUnless(thread, 'Threading required for this test.') 5157class SendfileUsingSendTest(ThreadedTCPSocketTest): 5158 """ 5159 Test the send() implementation of socket.sendfile(). 5160 """ 5161 5162 FILESIZE = (10 * 1024 * 1024) # 10MB 5163 BUFSIZE = 8192 5164 FILEDATA = b"" 5165 TIMEOUT = 2 5166 5167 @classmethod 5168 def setUpClass(cls): 5169 def chunks(total, step): 5170 assert total >= step 5171 while total > step: 5172 yield step 5173 total -= step 5174 if total: 5175 yield total 5176 5177 chunk = b"".join([random.choice(string.ascii_letters).encode() 5178 for i in range(cls.BUFSIZE)]) 5179 with open(support.TESTFN, 'wb') as f: 5180 for csize in chunks(cls.FILESIZE, cls.BUFSIZE): 5181 f.write(chunk) 5182 with open(support.TESTFN, 'rb') as f: 5183 cls.FILEDATA = f.read() 5184 assert len(cls.FILEDATA) == cls.FILESIZE 5185 5186 @classmethod 5187 def tearDownClass(cls): 5188 support.unlink(support.TESTFN) 5189 5190 def accept_conn(self): 5191 self.serv.settimeout(self.TIMEOUT) 5192 conn, addr = self.serv.accept() 5193 conn.settimeout(self.TIMEOUT) 5194 self.addCleanup(conn.close) 5195 return conn 5196 5197 def recv_data(self, conn): 5198 received = [] 5199 while True: 5200 chunk = conn.recv(self.BUFSIZE) 5201 if not chunk: 5202 break 5203 received.append(chunk) 5204 return b''.join(received) 5205 5206 def meth_from_sock(self, sock): 5207 # Depending on the mixin class being run return either send() 5208 # or sendfile() method implementation. 5209 return getattr(sock, "_sendfile_use_send") 5210 5211 # regular file 5212 5213 def _testRegularFile(self): 5214 address = self.serv.getsockname() 5215 file = open(support.TESTFN, 'rb') 5216 with socket.create_connection(address) as sock, file as file: 5217 meth = self.meth_from_sock(sock) 5218 sent = meth(file) 5219 self.assertEqual(sent, self.FILESIZE) 5220 self.assertEqual(file.tell(), self.FILESIZE) 5221 5222 def testRegularFile(self): 5223 conn = self.accept_conn() 5224 data = self.recv_data(conn) 5225 self.assertEqual(len(data), self.FILESIZE) 5226 self.assertEqual(data, self.FILEDATA) 5227 5228 # non regular file 5229 5230 def _testNonRegularFile(self): 5231 address = self.serv.getsockname() 5232 file = io.BytesIO(self.FILEDATA) 5233 with socket.create_connection(address) as sock, file as file: 5234 sent = sock.sendfile(file) 5235 self.assertEqual(sent, self.FILESIZE) 5236 self.assertEqual(file.tell(), self.FILESIZE) 5237 self.assertRaises(socket._GiveupOnSendfile, 5238 sock._sendfile_use_sendfile, file) 5239 5240 def testNonRegularFile(self): 5241 conn = self.accept_conn() 5242 data = self.recv_data(conn) 5243 self.assertEqual(len(data), self.FILESIZE) 5244 self.assertEqual(data, self.FILEDATA) 5245 5246 # empty file 5247 5248 def _testEmptyFileSend(self): 5249 address = self.serv.getsockname() 5250 filename = support.TESTFN + "2" 5251 with open(filename, 'wb'): 5252 self.addCleanup(support.unlink, filename) 5253 file = open(filename, 'rb') 5254 with socket.create_connection(address) as sock, file as file: 5255 meth = self.meth_from_sock(sock) 5256 sent = meth(file) 5257 self.assertEqual(sent, 0) 5258 self.assertEqual(file.tell(), 0) 5259 5260 def testEmptyFileSend(self): 5261 conn = self.accept_conn() 5262 data = self.recv_data(conn) 5263 self.assertEqual(data, b"") 5264 5265 # offset 5266 5267 def _testOffset(self): 5268 address = self.serv.getsockname() 5269 file = open(support.TESTFN, 'rb') 5270 with socket.create_connection(address) as sock, file as file: 5271 meth = self.meth_from_sock(sock) 5272 sent = meth(file, offset=5000) 5273 self.assertEqual(sent, self.FILESIZE - 5000) 5274 self.assertEqual(file.tell(), self.FILESIZE) 5275 5276 def testOffset(self): 5277 conn = self.accept_conn() 5278 data = self.recv_data(conn) 5279 self.assertEqual(len(data), self.FILESIZE - 5000) 5280 self.assertEqual(data, self.FILEDATA[5000:]) 5281 5282 # count 5283 5284 def _testCount(self): 5285 address = self.serv.getsockname() 5286 file = open(support.TESTFN, 'rb') 5287 with socket.create_connection(address, timeout=2) as sock, file as file: 5288 count = 5000007 5289 meth = self.meth_from_sock(sock) 5290 sent = meth(file, count=count) 5291 self.assertEqual(sent, count) 5292 self.assertEqual(file.tell(), count) 5293 5294 def testCount(self): 5295 count = 5000007 5296 conn = self.accept_conn() 5297 data = self.recv_data(conn) 5298 self.assertEqual(len(data), count) 5299 self.assertEqual(data, self.FILEDATA[:count]) 5300 5301 # count small 5302 5303 def _testCountSmall(self): 5304 address = self.serv.getsockname() 5305 file = open(support.TESTFN, 'rb') 5306 with socket.create_connection(address, timeout=2) as sock, file as file: 5307 count = 1 5308 meth = self.meth_from_sock(sock) 5309 sent = meth(file, count=count) 5310 self.assertEqual(sent, count) 5311 self.assertEqual(file.tell(), count) 5312 5313 def testCountSmall(self): 5314 count = 1 5315 conn = self.accept_conn() 5316 data = self.recv_data(conn) 5317 self.assertEqual(len(data), count) 5318 self.assertEqual(data, self.FILEDATA[:count]) 5319 5320 # count + offset 5321 5322 def _testCountWithOffset(self): 5323 address = self.serv.getsockname() 5324 file = open(support.TESTFN, 'rb') 5325 with socket.create_connection(address, timeout=2) as sock, file as file: 5326 count = 100007 5327 meth = self.meth_from_sock(sock) 5328 sent = meth(file, offset=2007, count=count) 5329 self.assertEqual(sent, count) 5330 self.assertEqual(file.tell(), count + 2007) 5331 5332 def testCountWithOffset(self): 5333 count = 100007 5334 conn = self.accept_conn() 5335 data = self.recv_data(conn) 5336 self.assertEqual(len(data), count) 5337 self.assertEqual(data, self.FILEDATA[2007:count+2007]) 5338 5339 # non blocking sockets are not supposed to work 5340 5341 def _testNonBlocking(self): 5342 address = self.serv.getsockname() 5343 file = open(support.TESTFN, 'rb') 5344 with socket.create_connection(address) as sock, file as file: 5345 sock.setblocking(False) 5346 meth = self.meth_from_sock(sock) 5347 self.assertRaises(ValueError, meth, file) 5348 self.assertRaises(ValueError, sock.sendfile, file) 5349 5350 def testNonBlocking(self): 5351 conn = self.accept_conn() 5352 if conn.recv(8192): 5353 self.fail('was not supposed to receive any data') 5354 5355 # timeout (non-triggered) 5356 5357 def _testWithTimeout(self): 5358 address = self.serv.getsockname() 5359 file = open(support.TESTFN, 'rb') 5360 with socket.create_connection(address, timeout=2) as sock, file as file: 5361 meth = self.meth_from_sock(sock) 5362 sent = meth(file) 5363 self.assertEqual(sent, self.FILESIZE) 5364 5365 def testWithTimeout(self): 5366 conn = self.accept_conn() 5367 data = self.recv_data(conn) 5368 self.assertEqual(len(data), self.FILESIZE) 5369 self.assertEqual(data, self.FILEDATA) 5370 5371 # timeout (triggered) 5372 5373 def _testWithTimeoutTriggeredSend(self): 5374 address = self.serv.getsockname() 5375 with open(support.TESTFN, 'rb') as file: 5376 with socket.create_connection(address, timeout=0.01) as sock: 5377 meth = self.meth_from_sock(sock) 5378 self.assertRaises(socket.timeout, meth, file) 5379 5380 def testWithTimeoutTriggeredSend(self): 5381 conn = self.accept_conn() 5382 conn.recv(88192) 5383 5384 # errors 5385 5386 def _test_errors(self): 5387 pass 5388 5389 def test_errors(self): 5390 with open(support.TESTFN, 'rb') as file: 5391 with socket.socket(type=socket.SOCK_DGRAM) as s: 5392 meth = self.meth_from_sock(s) 5393 self.assertRaisesRegex( 5394 ValueError, "SOCK_STREAM", meth, file) 5395 with open(support.TESTFN, 'rt') as file: 5396 with socket.socket() as s: 5397 meth = self.meth_from_sock(s) 5398 self.assertRaisesRegex( 5399 ValueError, "binary mode", meth, file) 5400 with open(support.TESTFN, 'rb') as file: 5401 with socket.socket() as s: 5402 meth = self.meth_from_sock(s) 5403 self.assertRaisesRegex(TypeError, "positive integer", 5404 meth, file, count='2') 5405 self.assertRaisesRegex(TypeError, "positive integer", 5406 meth, file, count=0.1) 5407 self.assertRaisesRegex(ValueError, "positive integer", 5408 meth, file, count=0) 5409 self.assertRaisesRegex(ValueError, "positive integer", 5410 meth, file, count=-1) 5411 5412 5413@unittest.skipUnless(thread, 'Threading required for this test.') 5414@unittest.skipUnless(hasattr(os, "sendfile"), 5415 'os.sendfile() required for this test.') 5416class SendfileUsingSendfileTest(SendfileUsingSendTest): 5417 """ 5418 Test the sendfile() implementation of socket.sendfile(). 5419 """ 5420 def meth_from_sock(self, sock): 5421 return getattr(sock, "_sendfile_use_sendfile") 5422 5423 5424@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') 5425class LinuxKernelCryptoAPI(unittest.TestCase): 5426 # tests for AF_ALG 5427 def create_alg(self, typ, name): 5428 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 5429 try: 5430 sock.bind((typ, name)) 5431 except FileNotFoundError as e: 5432 # type / algorithm is not available 5433 sock.close() 5434 raise unittest.SkipTest(str(e), typ, name) 5435 else: 5436 return sock 5437 5438 # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY, 5439 # at least on ppc64le architecture 5440 @support.requires_linux_version(4, 5) 5441 def test_sha256(self): 5442 expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" 5443 "177a9cb410ff61f20015ad") 5444 with self.create_alg('hash', 'sha256') as algo: 5445 op, _ = algo.accept() 5446 with op: 5447 op.sendall(b"abc") 5448 self.assertEqual(op.recv(512), expected) 5449 5450 op, _ = algo.accept() 5451 with op: 5452 op.send(b'a', socket.MSG_MORE) 5453 op.send(b'b', socket.MSG_MORE) 5454 op.send(b'c', socket.MSG_MORE) 5455 op.send(b'') 5456 self.assertEqual(op.recv(512), expected) 5457 5458 def test_hmac_sha1(self): 5459 expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") 5460 with self.create_alg('hash', 'hmac(sha1)') as algo: 5461 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") 5462 op, _ = algo.accept() 5463 with op: 5464 op.sendall(b"what do ya want for nothing?") 5465 self.assertEqual(op.recv(512), expected) 5466 5467 # Although it should work with 3.19 and newer the test blocks on 5468 # Ubuntu 15.10 with Kernel 4.2.0-19. 5469 @support.requires_linux_version(4, 3) 5470 def test_aes_cbc(self): 5471 key = bytes.fromhex('06a9214036b8a15b512e03d534120006') 5472 iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') 5473 msg = b"Single block msg" 5474 ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') 5475 msglen = len(msg) 5476 with self.create_alg('skcipher', 'cbc(aes)') as algo: 5477 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 5478 op, _ = algo.accept() 5479 with op: 5480 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 5481 flags=socket.MSG_MORE) 5482 op.sendall(msg) 5483 self.assertEqual(op.recv(msglen), ciphertext) 5484 5485 op, _ = algo.accept() 5486 with op: 5487 op.sendmsg_afalg([ciphertext], 5488 op=socket.ALG_OP_DECRYPT, iv=iv) 5489 self.assertEqual(op.recv(msglen), msg) 5490 5491 # long message 5492 multiplier = 1024 5493 longmsg = [msg] * multiplier 5494 op, _ = algo.accept() 5495 with op: 5496 op.sendmsg_afalg(longmsg, 5497 op=socket.ALG_OP_ENCRYPT, iv=iv) 5498 enc = op.recv(msglen * multiplier) 5499 self.assertEqual(len(enc), msglen * multiplier) 5500 self.assertEqual(enc[:msglen], ciphertext) 5501 5502 op, _ = algo.accept() 5503 with op: 5504 op.sendmsg_afalg([enc], 5505 op=socket.ALG_OP_DECRYPT, iv=iv) 5506 dec = op.recv(msglen * multiplier) 5507 self.assertEqual(len(dec), msglen * multiplier) 5508 self.assertEqual(dec, msg * multiplier) 5509 5510 @support.requires_linux_version(4, 9) # see issue29324 5511 def test_aead_aes_gcm(self): 5512 key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') 5513 iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') 5514 plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') 5515 assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') 5516 expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') 5517 expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') 5518 5519 taglen = len(expected_tag) 5520 assoclen = len(assoc) 5521 5522 with self.create_alg('aead', 'gcm(aes)') as algo: 5523 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 5524 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, 5525 None, taglen) 5526 5527 # send assoc, plain and tag buffer in separate steps 5528 op, _ = algo.accept() 5529 with op: 5530 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 5531 assoclen=assoclen, flags=socket.MSG_MORE) 5532 op.sendall(assoc, socket.MSG_MORE) 5533 op.sendall(plain) 5534 res = op.recv(assoclen + len(plain) + taglen) 5535 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5536 self.assertEqual(expected_tag, res[-taglen:]) 5537 5538 # now with msg 5539 op, _ = algo.accept() 5540 with op: 5541 msg = assoc + plain 5542 op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, 5543 assoclen=assoclen) 5544 res = op.recv(assoclen + len(plain) + taglen) 5545 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5546 self.assertEqual(expected_tag, res[-taglen:]) 5547 5548 # create anc data manually 5549 pack_uint32 = struct.Struct('I').pack 5550 op, _ = algo.accept() 5551 with op: 5552 msg = assoc + plain 5553 op.sendmsg( 5554 [msg], 5555 ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], 5556 [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], 5557 [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], 5558 ) 5559 ) 5560 res = op.recv(len(msg) + taglen) 5561 self.assertEqual(expected_ct, res[assoclen:-taglen]) 5562 self.assertEqual(expected_tag, res[-taglen:]) 5563 5564 # decrypt and verify 5565 op, _ = algo.accept() 5566 with op: 5567 msg = assoc + expected_ct + expected_tag 5568 op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, 5569 assoclen=assoclen) 5570 res = op.recv(len(msg) - taglen) 5571 self.assertEqual(plain, res[assoclen:]) 5572 5573 @support.requires_linux_version(4, 3) # see test_aes_cbc 5574 def test_drbg_pr_sha256(self): 5575 # deterministic random bit generator, prediction resistance, sha256 5576 with self.create_alg('rng', 'drbg_pr_sha256') as algo: 5577 extra_seed = os.urandom(32) 5578 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) 5579 op, _ = algo.accept() 5580 with op: 5581 rn = op.recv(32) 5582 self.assertEqual(len(rn), 32) 5583 5584 def test_sendmsg_afalg_args(self): 5585 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 5586 with sock: 5587 with self.assertRaises(TypeError): 5588 sock.sendmsg_afalg() 5589 5590 with self.assertRaises(TypeError): 5591 sock.sendmsg_afalg(op=None) 5592 5593 with self.assertRaises(TypeError): 5594 sock.sendmsg_afalg(1) 5595 5596 with self.assertRaises(TypeError): 5597 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) 5598 5599 with self.assertRaises(TypeError): 5600 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) 5601 5602 def test_length_restriction(self): 5603 # bpo-35050, off-by-one error in length check 5604 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 5605 self.addCleanup(sock.close) 5606 5607 # salg_type[14] 5608 with self.assertRaises(FileNotFoundError): 5609 sock.bind(("t" * 13, "name")) 5610 with self.assertRaisesRegex(ValueError, "type too long"): 5611 sock.bind(("t" * 14, "name")) 5612 5613 # salg_name[64] 5614 with self.assertRaises(FileNotFoundError): 5615 sock.bind(("type", "n" * 63)) 5616 with self.assertRaisesRegex(ValueError, "name too long"): 5617 sock.bind(("type", "n" * 64)) 5618 5619 5620@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") 5621class TestMSWindowsTCPFlags(unittest.TestCase): 5622 knownTCPFlags = { 5623 # available since long time ago 5624 'TCP_MAXSEG', 5625 'TCP_NODELAY', 5626 # available starting with Windows 10 1607 5627 'TCP_FASTOPEN', 5628 # available starting with Windows 10 1703 5629 'TCP_KEEPCNT', 5630 } 5631 5632 def test_new_tcp_flags(self): 5633 provided = [s for s in dir(socket) if s.startswith('TCP')] 5634 unknown = [s for s in provided if s not in self.knownTCPFlags] 5635 5636 self.assertEqual([], unknown, 5637 "New TCP flags were discovered. See bpo-32394 for more information") 5638 5639def test_main(): 5640 tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, 5641 TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ] 5642 5643 tests.extend([ 5644 NonBlockingTCPTests, 5645 FileObjectClassTestCase, 5646 UnbufferedFileObjectClassTestCase, 5647 LineBufferedFileObjectClassTestCase, 5648 SmallBufferedFileObjectClassTestCase, 5649 UnicodeReadFileObjectClassTestCase, 5650 UnicodeWriteFileObjectClassTestCase, 5651 UnicodeReadWriteFileObjectClassTestCase, 5652 NetworkConnectionNoServer, 5653 NetworkConnectionAttributesTest, 5654 NetworkConnectionBehaviourTest, 5655 ContextManagersTest, 5656 InheritanceTest, 5657 NonblockConstantTest 5658 ]) 5659 tests.append(BasicSocketPairTest) 5660 tests.append(TestUnixDomain) 5661 tests.append(TestLinuxAbstractNamespace) 5662 tests.extend([TIPCTest, TIPCThreadableTest]) 5663 tests.extend([BasicCANTest, CANTest]) 5664 tests.extend([BasicRDSTest, RDSTest]) 5665 tests.append(LinuxKernelCryptoAPI) 5666 tests.extend([ 5667 CmsgMacroTests, 5668 SendmsgUDPTest, 5669 RecvmsgUDPTest, 5670 RecvmsgIntoUDPTest, 5671 SendmsgUDP6Test, 5672 RecvmsgUDP6Test, 5673 RecvmsgRFC3542AncillaryUDP6Test, 5674 RecvmsgIntoRFC3542AncillaryUDP6Test, 5675 RecvmsgIntoUDP6Test, 5676 SendmsgTCPTest, 5677 RecvmsgTCPTest, 5678 RecvmsgIntoTCPTest, 5679 SendmsgSCTPStreamTest, 5680 RecvmsgSCTPStreamTest, 5681 RecvmsgIntoSCTPStreamTest, 5682 SendmsgUnixStreamTest, 5683 RecvmsgUnixStreamTest, 5684 RecvmsgIntoUnixStreamTest, 5685 RecvmsgSCMRightsStreamTest, 5686 RecvmsgIntoSCMRightsStreamTest, 5687 # These are slow when setitimer() is not available 5688 InterruptedRecvTimeoutTest, 5689 InterruptedSendTimeoutTest, 5690 TestSocketSharing, 5691 SendfileUsingSendTest, 5692 SendfileUsingSendfileTest, 5693 ]) 5694 tests.append(TestMSWindowsTCPFlags) 5695 5696 thread_info = support.threading_setup() 5697 support.run_unittest(*tests) 5698 support.threading_cleanup(*thread_info) 5699 5700if __name__ == "__main__": 5701 test_main() 5702