1import asyncore
2import unittest
3import select
4import os
5import socket
6import sys
7import time
8import errno
9import struct
10import threading
11
12from test import support
13from test.support import socket_helper
14from io import BytesIO
15
16if support.PGO:
17    raise unittest.SkipTest("test is not helpful for PGO")
18
19
20HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX')
21
22class dummysocket:
23    def __init__(self):
24        self.closed = False
25
26    def close(self):
27        self.closed = True
28
29    def fileno(self):
30        return 42
31
32class dummychannel:
33    def __init__(self):
34        self.socket = dummysocket()
35
36    def close(self):
37        self.socket.close()
38
39class exitingdummy:
40    def __init__(self):
41        pass
42
43    def handle_read_event(self):
44        raise asyncore.ExitNow()
45
46    handle_write_event = handle_read_event
47    handle_close = handle_read_event
48    handle_expt_event = handle_read_event
49
50class crashingdummy:
51    def __init__(self):
52        self.error_handled = False
53
54    def handle_read_event(self):
55        raise Exception()
56
57    handle_write_event = handle_read_event
58    handle_close = handle_read_event
59    handle_expt_event = handle_read_event
60
61    def handle_error(self):
62        self.error_handled = True
63
64# used when testing senders; just collects what it gets until newline is sent
65def capture_server(evt, buf, serv):
66    try:
67        serv.listen()
68        conn, addr = serv.accept()
69    except socket.timeout:
70        pass
71    else:
72        n = 200
73        start = time.monotonic()
74        while n > 0 and time.monotonic() - start < 3.0:
75            r, w, e = select.select([conn], [], [], 0.1)
76            if r:
77                n -= 1
78                data = conn.recv(10)
79                # keep everything except for the newline terminator
80                buf.write(data.replace(b'\n', b''))
81                if b'\n' in data:
82                    break
83            time.sleep(0.01)
84
85        conn.close()
86    finally:
87        serv.close()
88        evt.set()
89
90def bind_af_aware(sock, addr):
91    """Helper function to bind a socket according to its family."""
92    if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
93        # Make sure the path doesn't exist.
94        support.unlink(addr)
95        socket_helper.bind_unix_socket(sock, addr)
96    else:
97        sock.bind(addr)
98
99
100class HelperFunctionTests(unittest.TestCase):
101    def test_readwriteexc(self):
102        # Check exception handling behavior of read, write and _exception
103
104        # check that ExitNow exceptions in the object handler method
105        # bubbles all the way up through asyncore read/write/_exception calls
106        tr1 = exitingdummy()
107        self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
108        self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
109        self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
110
111        # check that an exception other than ExitNow in the object handler
112        # method causes the handle_error method to get called
113        tr2 = crashingdummy()
114        asyncore.read(tr2)
115        self.assertEqual(tr2.error_handled, True)
116
117        tr2 = crashingdummy()
118        asyncore.write(tr2)
119        self.assertEqual(tr2.error_handled, True)
120
121        tr2 = crashingdummy()
122        asyncore._exception(tr2)
123        self.assertEqual(tr2.error_handled, True)
124
125    # asyncore.readwrite uses constants in the select module that
126    # are not present in Windows systems (see this thread:
127    # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
128    # These constants should be present as long as poll is available
129
130    @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
131    def test_readwrite(self):
132        # Check that correct methods are called by readwrite()
133
134        attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
135
136        expected = (
137            (select.POLLIN, 'read'),
138            (select.POLLPRI, 'expt'),
139            (select.POLLOUT, 'write'),
140            (select.POLLERR, 'closed'),
141            (select.POLLHUP, 'closed'),
142            (select.POLLNVAL, 'closed'),
143            )
144
145        class testobj:
146            def __init__(self):
147                self.read = False
148                self.write = False
149                self.closed = False
150                self.expt = False
151                self.error_handled = False
152
153            def handle_read_event(self):
154                self.read = True
155
156            def handle_write_event(self):
157                self.write = True
158
159            def handle_close(self):
160                self.closed = True
161
162            def handle_expt_event(self):
163                self.expt = True
164
165            def handle_error(self):
166                self.error_handled = True
167
168        for flag, expectedattr in expected:
169            tobj = testobj()
170            self.assertEqual(getattr(tobj, expectedattr), False)
171            asyncore.readwrite(tobj, flag)
172
173            # Only the attribute modified by the routine we expect to be
174            # called should be True.
175            for attr in attributes:
176                self.assertEqual(getattr(tobj, attr), attr==expectedattr)
177
178            # check that ExitNow exceptions in the object handler method
179            # bubbles all the way up through asyncore readwrite call
180            tr1 = exitingdummy()
181            self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
182
183            # check that an exception other than ExitNow in the object handler
184            # method causes the handle_error method to get called
185            tr2 = crashingdummy()
186            self.assertEqual(tr2.error_handled, False)
187            asyncore.readwrite(tr2, flag)
188            self.assertEqual(tr2.error_handled, True)
189
190    def test_closeall(self):
191        self.closeall_check(False)
192
193    def test_closeall_default(self):
194        self.closeall_check(True)
195
196    def closeall_check(self, usedefault):
197        # Check that close_all() closes everything in a given map
198
199        l = []
200        testmap = {}
201        for i in range(10):
202            c = dummychannel()
203            l.append(c)
204            self.assertEqual(c.socket.closed, False)
205            testmap[i] = c
206
207        if usedefault:
208            socketmap = asyncore.socket_map
209            try:
210                asyncore.socket_map = testmap
211                asyncore.close_all()
212            finally:
213                testmap, asyncore.socket_map = asyncore.socket_map, socketmap
214        else:
215            asyncore.close_all(testmap)
216
217        self.assertEqual(len(testmap), 0)
218
219        for c in l:
220            self.assertEqual(c.socket.closed, True)
221
222    def test_compact_traceback(self):
223        try:
224            raise Exception("I don't like spam!")
225        except:
226            real_t, real_v, real_tb = sys.exc_info()
227            r = asyncore.compact_traceback()
228        else:
229            self.fail("Expected exception")
230
231        (f, function, line), t, v, info = r
232        self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
233        self.assertEqual(function, 'test_compact_traceback')
234        self.assertEqual(t, real_t)
235        self.assertEqual(v, real_v)
236        self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
237
238
239class DispatcherTests(unittest.TestCase):
240    def setUp(self):
241        pass
242
243    def tearDown(self):
244        asyncore.close_all()
245
246    def test_basic(self):
247        d = asyncore.dispatcher()
248        self.assertEqual(d.readable(), True)
249        self.assertEqual(d.writable(), True)
250
251    def test_repr(self):
252        d = asyncore.dispatcher()
253        self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
254
255    def test_log(self):
256        d = asyncore.dispatcher()
257
258        # capture output of dispatcher.log() (to stderr)
259        l1 = "Lovely spam! Wonderful spam!"
260        l2 = "I don't like spam!"
261        with support.captured_stderr() as stderr:
262            d.log(l1)
263            d.log(l2)
264
265        lines = stderr.getvalue().splitlines()
266        self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
267
268    def test_log_info(self):
269        d = asyncore.dispatcher()
270
271        # capture output of dispatcher.log_info() (to stdout via print)
272        l1 = "Have you got anything without spam?"
273        l2 = "Why can't she have egg bacon spam and sausage?"
274        l3 = "THAT'S got spam in it!"
275        with support.captured_stdout() as stdout:
276            d.log_info(l1, 'EGGS')
277            d.log_info(l2)
278            d.log_info(l3, 'SPAM')
279
280        lines = stdout.getvalue().splitlines()
281        expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
282        self.assertEqual(lines, expected)
283
284    def test_unhandled(self):
285        d = asyncore.dispatcher()
286        d.ignore_log_types = ()
287
288        # capture output of dispatcher.log_info() (to stdout via print)
289        with support.captured_stdout() as stdout:
290            d.handle_expt()
291            d.handle_read()
292            d.handle_write()
293            d.handle_connect()
294
295        lines = stdout.getvalue().splitlines()
296        expected = ['warning: unhandled incoming priority event',
297                    'warning: unhandled read event',
298                    'warning: unhandled write event',
299                    'warning: unhandled connect event']
300        self.assertEqual(lines, expected)
301
302    def test_strerror(self):
303        # refers to bug #8573
304        err = asyncore._strerror(errno.EPERM)
305        if hasattr(os, 'strerror'):
306            self.assertEqual(err, os.strerror(errno.EPERM))
307        err = asyncore._strerror(-1)
308        self.assertTrue(err != "")
309
310
311class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
312    def readable(self):
313        return False
314
315    def handle_connect(self):
316        pass
317
318
319class DispatcherWithSendTests(unittest.TestCase):
320    def setUp(self):
321        pass
322
323    def tearDown(self):
324        asyncore.close_all()
325
326    @support.reap_threads
327    def test_send(self):
328        evt = threading.Event()
329        sock = socket.socket()
330        sock.settimeout(3)
331        port = socket_helper.bind_port(sock)
332
333        cap = BytesIO()
334        args = (evt, cap, sock)
335        t = threading.Thread(target=capture_server, args=args)
336        t.start()
337        try:
338            # wait a little longer for the server to initialize (it sometimes
339            # refuses connections on slow machines without this wait)
340            time.sleep(0.2)
341
342            data = b"Suppose there isn't a 16-ton weight?"
343            d = dispatcherwithsend_noread()
344            d.create_socket()
345            d.connect((socket_helper.HOST, port))
346
347            # give time for socket to connect
348            time.sleep(0.1)
349
350            d.send(data)
351            d.send(data)
352            d.send(b'\n')
353
354            n = 1000
355            while d.out_buffer and n > 0:
356                asyncore.poll()
357                n -= 1
358
359            evt.wait()
360
361            self.assertEqual(cap.getvalue(), data*2)
362        finally:
363            support.join_thread(t)
364
365
366@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
367                     'asyncore.file_wrapper required')
368class FileWrapperTest(unittest.TestCase):
369    def setUp(self):
370        self.d = b"It's not dead, it's sleeping!"
371        with open(support.TESTFN, 'wb') as file:
372            file.write(self.d)
373
374    def tearDown(self):
375        support.unlink(support.TESTFN)
376
377    def test_recv(self):
378        fd = os.open(support.TESTFN, os.O_RDONLY)
379        w = asyncore.file_wrapper(fd)
380        os.close(fd)
381
382        self.assertNotEqual(w.fd, fd)
383        self.assertNotEqual(w.fileno(), fd)
384        self.assertEqual(w.recv(13), b"It's not dead")
385        self.assertEqual(w.read(6), b", it's")
386        w.close()
387        self.assertRaises(OSError, w.read, 1)
388
389    def test_send(self):
390        d1 = b"Come again?"
391        d2 = b"I want to buy some cheese."
392        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
393        w = asyncore.file_wrapper(fd)
394        os.close(fd)
395
396        w.write(d1)
397        w.send(d2)
398        w.close()
399        with open(support.TESTFN, 'rb') as file:
400            self.assertEqual(file.read(), self.d + d1 + d2)
401
402    @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
403                         'asyncore.file_dispatcher required')
404    def test_dispatcher(self):
405        fd = os.open(support.TESTFN, os.O_RDONLY)
406        data = []
407        class FileDispatcher(asyncore.file_dispatcher):
408            def handle_read(self):
409                data.append(self.recv(29))
410        s = FileDispatcher(fd)
411        os.close(fd)
412        asyncore.loop(timeout=0.01, use_poll=True, count=2)
413        self.assertEqual(b"".join(data), self.d)
414
415    def test_resource_warning(self):
416        # Issue #11453
417        fd = os.open(support.TESTFN, os.O_RDONLY)
418        f = asyncore.file_wrapper(fd)
419
420        os.close(fd)
421        with support.check_warnings(('', ResourceWarning)):
422            f = None
423            support.gc_collect()
424
425    def test_close_twice(self):
426        fd = os.open(support.TESTFN, os.O_RDONLY)
427        f = asyncore.file_wrapper(fd)
428        os.close(fd)
429
430        os.close(f.fd)  # file_wrapper dupped fd
431        with self.assertRaises(OSError):
432            f.close()
433
434        self.assertEqual(f.fd, -1)
435        # calling close twice should not fail
436        f.close()
437
438
439class BaseTestHandler(asyncore.dispatcher):
440
441    def __init__(self, sock=None):
442        asyncore.dispatcher.__init__(self, sock)
443        self.flag = False
444
445    def handle_accept(self):
446        raise Exception("handle_accept not supposed to be called")
447
448    def handle_accepted(self):
449        raise Exception("handle_accepted not supposed to be called")
450
451    def handle_connect(self):
452        raise Exception("handle_connect not supposed to be called")
453
454    def handle_expt(self):
455        raise Exception("handle_expt not supposed to be called")
456
457    def handle_close(self):
458        raise Exception("handle_close not supposed to be called")
459
460    def handle_error(self):
461        raise
462
463
464class BaseServer(asyncore.dispatcher):
465    """A server which listens on an address and dispatches the
466    connection to a handler.
467    """
468
469    def __init__(self, family, addr, handler=BaseTestHandler):
470        asyncore.dispatcher.__init__(self)
471        self.create_socket(family)
472        self.set_reuse_addr()
473        bind_af_aware(self.socket, addr)
474        self.listen(5)
475        self.handler = handler
476
477    @property
478    def address(self):
479        return self.socket.getsockname()
480
481    def handle_accepted(self, sock, addr):
482        self.handler(sock)
483
484    def handle_error(self):
485        raise
486
487
488class BaseClient(BaseTestHandler):
489
490    def __init__(self, family, address):
491        BaseTestHandler.__init__(self)
492        self.create_socket(family)
493        self.connect(address)
494
495    def handle_connect(self):
496        pass
497
498
499class BaseTestAPI:
500
501    def tearDown(self):
502        asyncore.close_all(ignore_all=True)
503
504    def loop_waiting_for_flag(self, instance, timeout=5):
505        timeout = float(timeout) / 100
506        count = 100
507        while asyncore.socket_map and count > 0:
508            asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
509            if instance.flag:
510                return
511            count -= 1
512            time.sleep(timeout)
513        self.fail("flag not set")
514
515    def test_handle_connect(self):
516        # make sure handle_connect is called on connect()
517
518        class TestClient(BaseClient):
519            def handle_connect(self):
520                self.flag = True
521
522        server = BaseServer(self.family, self.addr)
523        client = TestClient(self.family, server.address)
524        self.loop_waiting_for_flag(client)
525
526    def test_handle_accept(self):
527        # make sure handle_accept() is called when a client connects
528
529        class TestListener(BaseTestHandler):
530
531            def __init__(self, family, addr):
532                BaseTestHandler.__init__(self)
533                self.create_socket(family)
534                bind_af_aware(self.socket, addr)
535                self.listen(5)
536                self.address = self.socket.getsockname()
537
538            def handle_accept(self):
539                self.flag = True
540
541        server = TestListener(self.family, self.addr)
542        client = BaseClient(self.family, server.address)
543        self.loop_waiting_for_flag(server)
544
545    def test_handle_accepted(self):
546        # make sure handle_accepted() is called when a client connects
547
548        class TestListener(BaseTestHandler):
549
550            def __init__(self, family, addr):
551                BaseTestHandler.__init__(self)
552                self.create_socket(family)
553                bind_af_aware(self.socket, addr)
554                self.listen(5)
555                self.address = self.socket.getsockname()
556
557            def handle_accept(self):
558                asyncore.dispatcher.handle_accept(self)
559
560            def handle_accepted(self, sock, addr):
561                sock.close()
562                self.flag = True
563
564        server = TestListener(self.family, self.addr)
565        client = BaseClient(self.family, server.address)
566        self.loop_waiting_for_flag(server)
567
568
569    def test_handle_read(self):
570        # make sure handle_read is called on data received
571
572        class TestClient(BaseClient):
573            def handle_read(self):
574                self.flag = True
575
576        class TestHandler(BaseTestHandler):
577            def __init__(self, conn):
578                BaseTestHandler.__init__(self, conn)
579                self.send(b'x' * 1024)
580
581        server = BaseServer(self.family, self.addr, TestHandler)
582        client = TestClient(self.family, server.address)
583        self.loop_waiting_for_flag(client)
584
585    def test_handle_write(self):
586        # make sure handle_write is called
587
588        class TestClient(BaseClient):
589            def handle_write(self):
590                self.flag = True
591
592        server = BaseServer(self.family, self.addr)
593        client = TestClient(self.family, server.address)
594        self.loop_waiting_for_flag(client)
595
596    def test_handle_close(self):
597        # make sure handle_close is called when the other end closes
598        # the connection
599
600        class TestClient(BaseClient):
601
602            def handle_read(self):
603                # in order to make handle_close be called we are supposed
604                # to make at least one recv() call
605                self.recv(1024)
606
607            def handle_close(self):
608                self.flag = True
609                self.close()
610
611        class TestHandler(BaseTestHandler):
612            def __init__(self, conn):
613                BaseTestHandler.__init__(self, conn)
614                self.close()
615
616        server = BaseServer(self.family, self.addr, TestHandler)
617        client = TestClient(self.family, server.address)
618        self.loop_waiting_for_flag(client)
619
620    def test_handle_close_after_conn_broken(self):
621        # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
622        # #11265).
623
624        data = b'\0' * 128
625
626        class TestClient(BaseClient):
627
628            def handle_write(self):
629                self.send(data)
630
631            def handle_close(self):
632                self.flag = True
633                self.close()
634
635            def handle_expt(self):
636                self.flag = True
637                self.close()
638
639        class TestHandler(BaseTestHandler):
640
641            def handle_read(self):
642                self.recv(len(data))
643                self.close()
644
645            def writable(self):
646                return False
647
648        server = BaseServer(self.family, self.addr, TestHandler)
649        client = TestClient(self.family, server.address)
650        self.loop_waiting_for_flag(client)
651
652    @unittest.skipIf(sys.platform.startswith("sunos"),
653                     "OOB support is broken on Solaris")
654    def test_handle_expt(self):
655        # Make sure handle_expt is called on OOB data received.
656        # Note: this might fail on some platforms as OOB data is
657        # tenuously supported and rarely used.
658        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
659            self.skipTest("Not applicable to AF_UNIX sockets.")
660
661        if sys.platform == "darwin" and self.use_poll:
662            self.skipTest("poll may fail on macOS; see issue #28087")
663
664        class TestClient(BaseClient):
665            def handle_expt(self):
666                self.socket.recv(1024, socket.MSG_OOB)
667                self.flag = True
668
669        class TestHandler(BaseTestHandler):
670            def __init__(self, conn):
671                BaseTestHandler.__init__(self, conn)
672                self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
673
674        server = BaseServer(self.family, self.addr, TestHandler)
675        client = TestClient(self.family, server.address)
676        self.loop_waiting_for_flag(client)
677
678    def test_handle_error(self):
679
680        class TestClient(BaseClient):
681            def handle_write(self):
682                1.0 / 0
683            def handle_error(self):
684                self.flag = True
685                try:
686                    raise
687                except ZeroDivisionError:
688                    pass
689                else:
690                    raise Exception("exception not raised")
691
692        server = BaseServer(self.family, self.addr)
693        client = TestClient(self.family, server.address)
694        self.loop_waiting_for_flag(client)
695
696    def test_connection_attributes(self):
697        server = BaseServer(self.family, self.addr)
698        client = BaseClient(self.family, server.address)
699
700        # we start disconnected
701        self.assertFalse(server.connected)
702        self.assertTrue(server.accepting)
703        # this can't be taken for granted across all platforms
704        #self.assertFalse(client.connected)
705        self.assertFalse(client.accepting)
706
707        # execute some loops so that client connects to server
708        asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
709        self.assertFalse(server.connected)
710        self.assertTrue(server.accepting)
711        self.assertTrue(client.connected)
712        self.assertFalse(client.accepting)
713
714        # disconnect the client
715        client.close()
716        self.assertFalse(server.connected)
717        self.assertTrue(server.accepting)
718        self.assertFalse(client.connected)
719        self.assertFalse(client.accepting)
720
721        # stop serving
722        server.close()
723        self.assertFalse(server.connected)
724        self.assertFalse(server.accepting)
725
726    def test_create_socket(self):
727        s = asyncore.dispatcher()
728        s.create_socket(self.family)
729        self.assertEqual(s.socket.type, socket.SOCK_STREAM)
730        self.assertEqual(s.socket.family, self.family)
731        self.assertEqual(s.socket.gettimeout(), 0)
732        self.assertFalse(s.socket.get_inheritable())
733
734    def test_bind(self):
735        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
736            self.skipTest("Not applicable to AF_UNIX sockets.")
737        s1 = asyncore.dispatcher()
738        s1.create_socket(self.family)
739        s1.bind(self.addr)
740        s1.listen(5)
741        port = s1.socket.getsockname()[1]
742
743        s2 = asyncore.dispatcher()
744        s2.create_socket(self.family)
745        # EADDRINUSE indicates the socket was correctly bound
746        self.assertRaises(OSError, s2.bind, (self.addr[0], port))
747
748    def test_set_reuse_addr(self):
749        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
750            self.skipTest("Not applicable to AF_UNIX sockets.")
751
752        with socket.socket(self.family) as sock:
753            try:
754                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
755            except OSError:
756                unittest.skip("SO_REUSEADDR not supported on this platform")
757            else:
758                # if SO_REUSEADDR succeeded for sock we expect asyncore
759                # to do the same
760                s = asyncore.dispatcher(socket.socket(self.family))
761                self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
762                                                     socket.SO_REUSEADDR))
763                s.socket.close()
764                s.create_socket(self.family)
765                s.set_reuse_addr()
766                self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
767                                                     socket.SO_REUSEADDR))
768
769    @support.reap_threads
770    def test_quick_connect(self):
771        # see: http://bugs.python.org/issue10340
772        if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
773            self.skipTest("test specific to AF_INET and AF_INET6")
774
775        server = BaseServer(self.family, self.addr)
776        # run the thread 500 ms: the socket should be connected in 200 ms
777        t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
778                                                          count=5))
779        t.start()
780        try:
781            with socket.socket(self.family, socket.SOCK_STREAM) as s:
782                s.settimeout(.2)
783                s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
784                             struct.pack('ii', 1, 0))
785
786                try:
787                    s.connect(server.address)
788                except OSError:
789                    pass
790        finally:
791            support.join_thread(t)
792
793class TestAPI_UseIPv4Sockets(BaseTestAPI):
794    family = socket.AF_INET
795    addr = (socket_helper.HOST, 0)
796
797@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required')
798class TestAPI_UseIPv6Sockets(BaseTestAPI):
799    family = socket.AF_INET6
800    addr = (socket_helper.HOSTv6, 0)
801
802@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
803class TestAPI_UseUnixSockets(BaseTestAPI):
804    if HAS_UNIX_SOCKETS:
805        family = socket.AF_UNIX
806    addr = support.TESTFN
807
808    def tearDown(self):
809        support.unlink(self.addr)
810        BaseTestAPI.tearDown(self)
811
812class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
813    use_poll = False
814
815@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
816class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase):
817    use_poll = True
818
819class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase):
820    use_poll = False
821
822@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
823class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase):
824    use_poll = True
825
826class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase):
827    use_poll = False
828
829@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
830class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase):
831    use_poll = True
832
833if __name__ == "__main__":
834    unittest.main()
835