1import asyncore 2import unittest 3import select 4import os 5import socket 6import sys 7import time 8import errno 9import struct 10import threading 11 12from test import support 13from test.support import socket_helper 14from io import BytesIO 15 16if support.PGO: 17 raise unittest.SkipTest("test is not helpful for PGO") 18 19 20HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX') 21 22class dummysocket: 23 def __init__(self): 24 self.closed = False 25 26 def close(self): 27 self.closed = True 28 29 def fileno(self): 30 return 42 31 32class dummychannel: 33 def __init__(self): 34 self.socket = dummysocket() 35 36 def close(self): 37 self.socket.close() 38 39class exitingdummy: 40 def __init__(self): 41 pass 42 43 def handle_read_event(self): 44 raise asyncore.ExitNow() 45 46 handle_write_event = handle_read_event 47 handle_close = handle_read_event 48 handle_expt_event = handle_read_event 49 50class crashingdummy: 51 def __init__(self): 52 self.error_handled = False 53 54 def handle_read_event(self): 55 raise Exception() 56 57 handle_write_event = handle_read_event 58 handle_close = handle_read_event 59 handle_expt_event = handle_read_event 60 61 def handle_error(self): 62 self.error_handled = True 63 64# used when testing senders; just collects what it gets until newline is sent 65def capture_server(evt, buf, serv): 66 try: 67 serv.listen() 68 conn, addr = serv.accept() 69 except socket.timeout: 70 pass 71 else: 72 n = 200 73 start = time.monotonic() 74 while n > 0 and time.monotonic() - start < 3.0: 75 r, w, e = select.select([conn], [], [], 0.1) 76 if r: 77 n -= 1 78 data = conn.recv(10) 79 # keep everything except for the newline terminator 80 buf.write(data.replace(b'\n', b'')) 81 if b'\n' in data: 82 break 83 time.sleep(0.01) 84 85 conn.close() 86 finally: 87 serv.close() 88 evt.set() 89 90def bind_af_aware(sock, addr): 91 """Helper function to bind a socket according to its family.""" 92 if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: 93 # Make sure the path doesn't exist. 94 support.unlink(addr) 95 socket_helper.bind_unix_socket(sock, addr) 96 else: 97 sock.bind(addr) 98 99 100class HelperFunctionTests(unittest.TestCase): 101 def test_readwriteexc(self): 102 # Check exception handling behavior of read, write and _exception 103 104 # check that ExitNow exceptions in the object handler method 105 # bubbles all the way up through asyncore read/write/_exception calls 106 tr1 = exitingdummy() 107 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) 108 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) 109 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) 110 111 # check that an exception other than ExitNow in the object handler 112 # method causes the handle_error method to get called 113 tr2 = crashingdummy() 114 asyncore.read(tr2) 115 self.assertEqual(tr2.error_handled, True) 116 117 tr2 = crashingdummy() 118 asyncore.write(tr2) 119 self.assertEqual(tr2.error_handled, True) 120 121 tr2 = crashingdummy() 122 asyncore._exception(tr2) 123 self.assertEqual(tr2.error_handled, True) 124 125 # asyncore.readwrite uses constants in the select module that 126 # are not present in Windows systems (see this thread: 127 # http://mail.python.org/pipermail/python-list/2001-October/109973.html) 128 # These constants should be present as long as poll is available 129 130 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 131 def test_readwrite(self): 132 # Check that correct methods are called by readwrite() 133 134 attributes = ('read', 'expt', 'write', 'closed', 'error_handled') 135 136 expected = ( 137 (select.POLLIN, 'read'), 138 (select.POLLPRI, 'expt'), 139 (select.POLLOUT, 'write'), 140 (select.POLLERR, 'closed'), 141 (select.POLLHUP, 'closed'), 142 (select.POLLNVAL, 'closed'), 143 ) 144 145 class testobj: 146 def __init__(self): 147 self.read = False 148 self.write = False 149 self.closed = False 150 self.expt = False 151 self.error_handled = False 152 153 def handle_read_event(self): 154 self.read = True 155 156 def handle_write_event(self): 157 self.write = True 158 159 def handle_close(self): 160 self.closed = True 161 162 def handle_expt_event(self): 163 self.expt = True 164 165 def handle_error(self): 166 self.error_handled = True 167 168 for flag, expectedattr in expected: 169 tobj = testobj() 170 self.assertEqual(getattr(tobj, expectedattr), False) 171 asyncore.readwrite(tobj, flag) 172 173 # Only the attribute modified by the routine we expect to be 174 # called should be True. 175 for attr in attributes: 176 self.assertEqual(getattr(tobj, attr), attr==expectedattr) 177 178 # check that ExitNow exceptions in the object handler method 179 # bubbles all the way up through asyncore readwrite call 180 tr1 = exitingdummy() 181 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) 182 183 # check that an exception other than ExitNow in the object handler 184 # method causes the handle_error method to get called 185 tr2 = crashingdummy() 186 self.assertEqual(tr2.error_handled, False) 187 asyncore.readwrite(tr2, flag) 188 self.assertEqual(tr2.error_handled, True) 189 190 def test_closeall(self): 191 self.closeall_check(False) 192 193 def test_closeall_default(self): 194 self.closeall_check(True) 195 196 def closeall_check(self, usedefault): 197 # Check that close_all() closes everything in a given map 198 199 l = [] 200 testmap = {} 201 for i in range(10): 202 c = dummychannel() 203 l.append(c) 204 self.assertEqual(c.socket.closed, False) 205 testmap[i] = c 206 207 if usedefault: 208 socketmap = asyncore.socket_map 209 try: 210 asyncore.socket_map = testmap 211 asyncore.close_all() 212 finally: 213 testmap, asyncore.socket_map = asyncore.socket_map, socketmap 214 else: 215 asyncore.close_all(testmap) 216 217 self.assertEqual(len(testmap), 0) 218 219 for c in l: 220 self.assertEqual(c.socket.closed, True) 221 222 def test_compact_traceback(self): 223 try: 224 raise Exception("I don't like spam!") 225 except: 226 real_t, real_v, real_tb = sys.exc_info() 227 r = asyncore.compact_traceback() 228 else: 229 self.fail("Expected exception") 230 231 (f, function, line), t, v, info = r 232 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') 233 self.assertEqual(function, 'test_compact_traceback') 234 self.assertEqual(t, real_t) 235 self.assertEqual(v, real_v) 236 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) 237 238 239class DispatcherTests(unittest.TestCase): 240 def setUp(self): 241 pass 242 243 def tearDown(self): 244 asyncore.close_all() 245 246 def test_basic(self): 247 d = asyncore.dispatcher() 248 self.assertEqual(d.readable(), True) 249 self.assertEqual(d.writable(), True) 250 251 def test_repr(self): 252 d = asyncore.dispatcher() 253 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) 254 255 def test_log(self): 256 d = asyncore.dispatcher() 257 258 # capture output of dispatcher.log() (to stderr) 259 l1 = "Lovely spam! Wonderful spam!" 260 l2 = "I don't like spam!" 261 with support.captured_stderr() as stderr: 262 d.log(l1) 263 d.log(l2) 264 265 lines = stderr.getvalue().splitlines() 266 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) 267 268 def test_log_info(self): 269 d = asyncore.dispatcher() 270 271 # capture output of dispatcher.log_info() (to stdout via print) 272 l1 = "Have you got anything without spam?" 273 l2 = "Why can't she have egg bacon spam and sausage?" 274 l3 = "THAT'S got spam in it!" 275 with support.captured_stdout() as stdout: 276 d.log_info(l1, 'EGGS') 277 d.log_info(l2) 278 d.log_info(l3, 'SPAM') 279 280 lines = stdout.getvalue().splitlines() 281 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] 282 self.assertEqual(lines, expected) 283 284 def test_unhandled(self): 285 d = asyncore.dispatcher() 286 d.ignore_log_types = () 287 288 # capture output of dispatcher.log_info() (to stdout via print) 289 with support.captured_stdout() as stdout: 290 d.handle_expt() 291 d.handle_read() 292 d.handle_write() 293 d.handle_connect() 294 295 lines = stdout.getvalue().splitlines() 296 expected = ['warning: unhandled incoming priority event', 297 'warning: unhandled read event', 298 'warning: unhandled write event', 299 'warning: unhandled connect event'] 300 self.assertEqual(lines, expected) 301 302 def test_strerror(self): 303 # refers to bug #8573 304 err = asyncore._strerror(errno.EPERM) 305 if hasattr(os, 'strerror'): 306 self.assertEqual(err, os.strerror(errno.EPERM)) 307 err = asyncore._strerror(-1) 308 self.assertTrue(err != "") 309 310 311class dispatcherwithsend_noread(asyncore.dispatcher_with_send): 312 def readable(self): 313 return False 314 315 def handle_connect(self): 316 pass 317 318 319class DispatcherWithSendTests(unittest.TestCase): 320 def setUp(self): 321 pass 322 323 def tearDown(self): 324 asyncore.close_all() 325 326 @support.reap_threads 327 def test_send(self): 328 evt = threading.Event() 329 sock = socket.socket() 330 sock.settimeout(3) 331 port = socket_helper.bind_port(sock) 332 333 cap = BytesIO() 334 args = (evt, cap, sock) 335 t = threading.Thread(target=capture_server, args=args) 336 t.start() 337 try: 338 # wait a little longer for the server to initialize (it sometimes 339 # refuses connections on slow machines without this wait) 340 time.sleep(0.2) 341 342 data = b"Suppose there isn't a 16-ton weight?" 343 d = dispatcherwithsend_noread() 344 d.create_socket() 345 d.connect((socket_helper.HOST, port)) 346 347 # give time for socket to connect 348 time.sleep(0.1) 349 350 d.send(data) 351 d.send(data) 352 d.send(b'\n') 353 354 n = 1000 355 while d.out_buffer and n > 0: 356 asyncore.poll() 357 n -= 1 358 359 evt.wait() 360 361 self.assertEqual(cap.getvalue(), data*2) 362 finally: 363 support.join_thread(t) 364 365 366@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 367 'asyncore.file_wrapper required') 368class FileWrapperTest(unittest.TestCase): 369 def setUp(self): 370 self.d = b"It's not dead, it's sleeping!" 371 with open(support.TESTFN, 'wb') as file: 372 file.write(self.d) 373 374 def tearDown(self): 375 support.unlink(support.TESTFN) 376 377 def test_recv(self): 378 fd = os.open(support.TESTFN, os.O_RDONLY) 379 w = asyncore.file_wrapper(fd) 380 os.close(fd) 381 382 self.assertNotEqual(w.fd, fd) 383 self.assertNotEqual(w.fileno(), fd) 384 self.assertEqual(w.recv(13), b"It's not dead") 385 self.assertEqual(w.read(6), b", it's") 386 w.close() 387 self.assertRaises(OSError, w.read, 1) 388 389 def test_send(self): 390 d1 = b"Come again?" 391 d2 = b"I want to buy some cheese." 392 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND) 393 w = asyncore.file_wrapper(fd) 394 os.close(fd) 395 396 w.write(d1) 397 w.send(d2) 398 w.close() 399 with open(support.TESTFN, 'rb') as file: 400 self.assertEqual(file.read(), self.d + d1 + d2) 401 402 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 403 'asyncore.file_dispatcher required') 404 def test_dispatcher(self): 405 fd = os.open(support.TESTFN, os.O_RDONLY) 406 data = [] 407 class FileDispatcher(asyncore.file_dispatcher): 408 def handle_read(self): 409 data.append(self.recv(29)) 410 s = FileDispatcher(fd) 411 os.close(fd) 412 asyncore.loop(timeout=0.01, use_poll=True, count=2) 413 self.assertEqual(b"".join(data), self.d) 414 415 def test_resource_warning(self): 416 # Issue #11453 417 fd = os.open(support.TESTFN, os.O_RDONLY) 418 f = asyncore.file_wrapper(fd) 419 420 os.close(fd) 421 with support.check_warnings(('', ResourceWarning)): 422 f = None 423 support.gc_collect() 424 425 def test_close_twice(self): 426 fd = os.open(support.TESTFN, os.O_RDONLY) 427 f = asyncore.file_wrapper(fd) 428 os.close(fd) 429 430 os.close(f.fd) # file_wrapper dupped fd 431 with self.assertRaises(OSError): 432 f.close() 433 434 self.assertEqual(f.fd, -1) 435 # calling close twice should not fail 436 f.close() 437 438 439class BaseTestHandler(asyncore.dispatcher): 440 441 def __init__(self, sock=None): 442 asyncore.dispatcher.__init__(self, sock) 443 self.flag = False 444 445 def handle_accept(self): 446 raise Exception("handle_accept not supposed to be called") 447 448 def handle_accepted(self): 449 raise Exception("handle_accepted not supposed to be called") 450 451 def handle_connect(self): 452 raise Exception("handle_connect not supposed to be called") 453 454 def handle_expt(self): 455 raise Exception("handle_expt not supposed to be called") 456 457 def handle_close(self): 458 raise Exception("handle_close not supposed to be called") 459 460 def handle_error(self): 461 raise 462 463 464class BaseServer(asyncore.dispatcher): 465 """A server which listens on an address and dispatches the 466 connection to a handler. 467 """ 468 469 def __init__(self, family, addr, handler=BaseTestHandler): 470 asyncore.dispatcher.__init__(self) 471 self.create_socket(family) 472 self.set_reuse_addr() 473 bind_af_aware(self.socket, addr) 474 self.listen(5) 475 self.handler = handler 476 477 @property 478 def address(self): 479 return self.socket.getsockname() 480 481 def handle_accepted(self, sock, addr): 482 self.handler(sock) 483 484 def handle_error(self): 485 raise 486 487 488class BaseClient(BaseTestHandler): 489 490 def __init__(self, family, address): 491 BaseTestHandler.__init__(self) 492 self.create_socket(family) 493 self.connect(address) 494 495 def handle_connect(self): 496 pass 497 498 499class BaseTestAPI: 500 501 def tearDown(self): 502 asyncore.close_all(ignore_all=True) 503 504 def loop_waiting_for_flag(self, instance, timeout=5): 505 timeout = float(timeout) / 100 506 count = 100 507 while asyncore.socket_map and count > 0: 508 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) 509 if instance.flag: 510 return 511 count -= 1 512 time.sleep(timeout) 513 self.fail("flag not set") 514 515 def test_handle_connect(self): 516 # make sure handle_connect is called on connect() 517 518 class TestClient(BaseClient): 519 def handle_connect(self): 520 self.flag = True 521 522 server = BaseServer(self.family, self.addr) 523 client = TestClient(self.family, server.address) 524 self.loop_waiting_for_flag(client) 525 526 def test_handle_accept(self): 527 # make sure handle_accept() is called when a client connects 528 529 class TestListener(BaseTestHandler): 530 531 def __init__(self, family, addr): 532 BaseTestHandler.__init__(self) 533 self.create_socket(family) 534 bind_af_aware(self.socket, addr) 535 self.listen(5) 536 self.address = self.socket.getsockname() 537 538 def handle_accept(self): 539 self.flag = True 540 541 server = TestListener(self.family, self.addr) 542 client = BaseClient(self.family, server.address) 543 self.loop_waiting_for_flag(server) 544 545 def test_handle_accepted(self): 546 # make sure handle_accepted() is called when a client connects 547 548 class TestListener(BaseTestHandler): 549 550 def __init__(self, family, addr): 551 BaseTestHandler.__init__(self) 552 self.create_socket(family) 553 bind_af_aware(self.socket, addr) 554 self.listen(5) 555 self.address = self.socket.getsockname() 556 557 def handle_accept(self): 558 asyncore.dispatcher.handle_accept(self) 559 560 def handle_accepted(self, sock, addr): 561 sock.close() 562 self.flag = True 563 564 server = TestListener(self.family, self.addr) 565 client = BaseClient(self.family, server.address) 566 self.loop_waiting_for_flag(server) 567 568 569 def test_handle_read(self): 570 # make sure handle_read is called on data received 571 572 class TestClient(BaseClient): 573 def handle_read(self): 574 self.flag = True 575 576 class TestHandler(BaseTestHandler): 577 def __init__(self, conn): 578 BaseTestHandler.__init__(self, conn) 579 self.send(b'x' * 1024) 580 581 server = BaseServer(self.family, self.addr, TestHandler) 582 client = TestClient(self.family, server.address) 583 self.loop_waiting_for_flag(client) 584 585 def test_handle_write(self): 586 # make sure handle_write is called 587 588 class TestClient(BaseClient): 589 def handle_write(self): 590 self.flag = True 591 592 server = BaseServer(self.family, self.addr) 593 client = TestClient(self.family, server.address) 594 self.loop_waiting_for_flag(client) 595 596 def test_handle_close(self): 597 # make sure handle_close is called when the other end closes 598 # the connection 599 600 class TestClient(BaseClient): 601 602 def handle_read(self): 603 # in order to make handle_close be called we are supposed 604 # to make at least one recv() call 605 self.recv(1024) 606 607 def handle_close(self): 608 self.flag = True 609 self.close() 610 611 class TestHandler(BaseTestHandler): 612 def __init__(self, conn): 613 BaseTestHandler.__init__(self, conn) 614 self.close() 615 616 server = BaseServer(self.family, self.addr, TestHandler) 617 client = TestClient(self.family, server.address) 618 self.loop_waiting_for_flag(client) 619 620 def test_handle_close_after_conn_broken(self): 621 # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and 622 # #11265). 623 624 data = b'\0' * 128 625 626 class TestClient(BaseClient): 627 628 def handle_write(self): 629 self.send(data) 630 631 def handle_close(self): 632 self.flag = True 633 self.close() 634 635 def handle_expt(self): 636 self.flag = True 637 self.close() 638 639 class TestHandler(BaseTestHandler): 640 641 def handle_read(self): 642 self.recv(len(data)) 643 self.close() 644 645 def writable(self): 646 return False 647 648 server = BaseServer(self.family, self.addr, TestHandler) 649 client = TestClient(self.family, server.address) 650 self.loop_waiting_for_flag(client) 651 652 @unittest.skipIf(sys.platform.startswith("sunos"), 653 "OOB support is broken on Solaris") 654 def test_handle_expt(self): 655 # Make sure handle_expt is called on OOB data received. 656 # Note: this might fail on some platforms as OOB data is 657 # tenuously supported and rarely used. 658 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 659 self.skipTest("Not applicable to AF_UNIX sockets.") 660 661 if sys.platform == "darwin" and self.use_poll: 662 self.skipTest("poll may fail on macOS; see issue #28087") 663 664 class TestClient(BaseClient): 665 def handle_expt(self): 666 self.socket.recv(1024, socket.MSG_OOB) 667 self.flag = True 668 669 class TestHandler(BaseTestHandler): 670 def __init__(self, conn): 671 BaseTestHandler.__init__(self, conn) 672 self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) 673 674 server = BaseServer(self.family, self.addr, TestHandler) 675 client = TestClient(self.family, server.address) 676 self.loop_waiting_for_flag(client) 677 678 def test_handle_error(self): 679 680 class TestClient(BaseClient): 681 def handle_write(self): 682 1.0 / 0 683 def handle_error(self): 684 self.flag = True 685 try: 686 raise 687 except ZeroDivisionError: 688 pass 689 else: 690 raise Exception("exception not raised") 691 692 server = BaseServer(self.family, self.addr) 693 client = TestClient(self.family, server.address) 694 self.loop_waiting_for_flag(client) 695 696 def test_connection_attributes(self): 697 server = BaseServer(self.family, self.addr) 698 client = BaseClient(self.family, server.address) 699 700 # we start disconnected 701 self.assertFalse(server.connected) 702 self.assertTrue(server.accepting) 703 # this can't be taken for granted across all platforms 704 #self.assertFalse(client.connected) 705 self.assertFalse(client.accepting) 706 707 # execute some loops so that client connects to server 708 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) 709 self.assertFalse(server.connected) 710 self.assertTrue(server.accepting) 711 self.assertTrue(client.connected) 712 self.assertFalse(client.accepting) 713 714 # disconnect the client 715 client.close() 716 self.assertFalse(server.connected) 717 self.assertTrue(server.accepting) 718 self.assertFalse(client.connected) 719 self.assertFalse(client.accepting) 720 721 # stop serving 722 server.close() 723 self.assertFalse(server.connected) 724 self.assertFalse(server.accepting) 725 726 def test_create_socket(self): 727 s = asyncore.dispatcher() 728 s.create_socket(self.family) 729 self.assertEqual(s.socket.type, socket.SOCK_STREAM) 730 self.assertEqual(s.socket.family, self.family) 731 self.assertEqual(s.socket.gettimeout(), 0) 732 self.assertFalse(s.socket.get_inheritable()) 733 734 def test_bind(self): 735 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 736 self.skipTest("Not applicable to AF_UNIX sockets.") 737 s1 = asyncore.dispatcher() 738 s1.create_socket(self.family) 739 s1.bind(self.addr) 740 s1.listen(5) 741 port = s1.socket.getsockname()[1] 742 743 s2 = asyncore.dispatcher() 744 s2.create_socket(self.family) 745 # EADDRINUSE indicates the socket was correctly bound 746 self.assertRaises(OSError, s2.bind, (self.addr[0], port)) 747 748 def test_set_reuse_addr(self): 749 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 750 self.skipTest("Not applicable to AF_UNIX sockets.") 751 752 with socket.socket(self.family) as sock: 753 try: 754 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 755 except OSError: 756 unittest.skip("SO_REUSEADDR not supported on this platform") 757 else: 758 # if SO_REUSEADDR succeeded for sock we expect asyncore 759 # to do the same 760 s = asyncore.dispatcher(socket.socket(self.family)) 761 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, 762 socket.SO_REUSEADDR)) 763 s.socket.close() 764 s.create_socket(self.family) 765 s.set_reuse_addr() 766 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, 767 socket.SO_REUSEADDR)) 768 769 @support.reap_threads 770 def test_quick_connect(self): 771 # see: http://bugs.python.org/issue10340 772 if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())): 773 self.skipTest("test specific to AF_INET and AF_INET6") 774 775 server = BaseServer(self.family, self.addr) 776 # run the thread 500 ms: the socket should be connected in 200 ms 777 t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, 778 count=5)) 779 t.start() 780 try: 781 with socket.socket(self.family, socket.SOCK_STREAM) as s: 782 s.settimeout(.2) 783 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 784 struct.pack('ii', 1, 0)) 785 786 try: 787 s.connect(server.address) 788 except OSError: 789 pass 790 finally: 791 support.join_thread(t) 792 793class TestAPI_UseIPv4Sockets(BaseTestAPI): 794 family = socket.AF_INET 795 addr = (socket_helper.HOST, 0) 796 797@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required') 798class TestAPI_UseIPv6Sockets(BaseTestAPI): 799 family = socket.AF_INET6 800 addr = (socket_helper.HOSTv6, 0) 801 802@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') 803class TestAPI_UseUnixSockets(BaseTestAPI): 804 if HAS_UNIX_SOCKETS: 805 family = socket.AF_UNIX 806 addr = support.TESTFN 807 808 def tearDown(self): 809 support.unlink(self.addr) 810 BaseTestAPI.tearDown(self) 811 812class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): 813 use_poll = False 814 815@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 816class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase): 817 use_poll = True 818 819class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase): 820 use_poll = False 821 822@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 823class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase): 824 use_poll = True 825 826class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase): 827 use_poll = False 828 829@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 830class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase): 831 use_poll = True 832 833if __name__ == "__main__": 834 unittest.main() 835