1import asyncore
2import unittest
3import select
4import os
5import socket
6import sys
7import time
8import errno
9import struct
10import threading
11
12from test import support
13from io import BytesIO
14
15if support.PGO:
16    raise unittest.SkipTest("test is not helpful for PGO")
17
18
19TIMEOUT = 3
20HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX')
21
22class dummysocket:
23    def __init__(self):
24        self.closed = False
25
26    def close(self):
27        self.closed = True
28
29    def fileno(self):
30        return 42
31
32class dummychannel:
33    def __init__(self):
34        self.socket = dummysocket()
35
36    def close(self):
37        self.socket.close()
38
39class exitingdummy:
40    def __init__(self):
41        pass
42
43    def handle_read_event(self):
44        raise asyncore.ExitNow()
45
46    handle_write_event = handle_read_event
47    handle_close = handle_read_event
48    handle_expt_event = handle_read_event
49
50class crashingdummy:
51    def __init__(self):
52        self.error_handled = False
53
54    def handle_read_event(self):
55        raise Exception()
56
57    handle_write_event = handle_read_event
58    handle_close = handle_read_event
59    handle_expt_event = handle_read_event
60
61    def handle_error(self):
62        self.error_handled = True
63
64# used when testing senders; just collects what it gets until newline is sent
65def capture_server(evt, buf, serv):
66    try:
67        serv.listen()
68        conn, addr = serv.accept()
69    except socket.timeout:
70        pass
71    else:
72        n = 200
73        start = time.monotonic()
74        while n > 0 and time.monotonic() - start < 3.0:
75            r, w, e = select.select([conn], [], [], 0.1)
76            if r:
77                n -= 1
78                data = conn.recv(10)
79                # keep everything except for the newline terminator
80                buf.write(data.replace(b'\n', b''))
81                if b'\n' in data:
82                    break
83            time.sleep(0.01)
84
85        conn.close()
86    finally:
87        serv.close()
88        evt.set()
89
90def bind_af_aware(sock, addr):
91    """Helper function to bind a socket according to its family."""
92    if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
93        # Make sure the path doesn't exist.
94        support.unlink(addr)
95        support.bind_unix_socket(sock, addr)
96    else:
97        sock.bind(addr)
98
99
100class HelperFunctionTests(unittest.TestCase):
101    def test_readwriteexc(self):
102        # Check exception handling behavior of read, write and _exception
103
104        # check that ExitNow exceptions in the object handler method
105        # bubbles all the way up through asyncore read/write/_exception calls
106        tr1 = exitingdummy()
107        self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
108        self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
109        self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
110
111        # check that an exception other than ExitNow in the object handler
112        # method causes the handle_error method to get called
113        tr2 = crashingdummy()
114        asyncore.read(tr2)
115        self.assertEqual(tr2.error_handled, True)
116
117        tr2 = crashingdummy()
118        asyncore.write(tr2)
119        self.assertEqual(tr2.error_handled, True)
120
121        tr2 = crashingdummy()
122        asyncore._exception(tr2)
123        self.assertEqual(tr2.error_handled, True)
124
125    # asyncore.readwrite uses constants in the select module that
126    # are not present in Windows systems (see this thread:
127    # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
128    # These constants should be present as long as poll is available
129
130    @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
131    def test_readwrite(self):
132        # Check that correct methods are called by readwrite()
133
134        attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
135
136        expected = (
137            (select.POLLIN, 'read'),
138            (select.POLLPRI, 'expt'),
139            (select.POLLOUT, 'write'),
140            (select.POLLERR, 'closed'),
141            (select.POLLHUP, 'closed'),
142            (select.POLLNVAL, 'closed'),
143            )
144
145        class testobj:
146            def __init__(self):
147                self.read = False
148                self.write = False
149                self.closed = False
150                self.expt = False
151                self.error_handled = False
152
153            def handle_read_event(self):
154                self.read = True
155
156            def handle_write_event(self):
157                self.write = True
158
159            def handle_close(self):
160                self.closed = True
161
162            def handle_expt_event(self):
163                self.expt = True
164
165            def handle_error(self):
166                self.error_handled = True
167
168        for flag, expectedattr in expected:
169            tobj = testobj()
170            self.assertEqual(getattr(tobj, expectedattr), False)
171            asyncore.readwrite(tobj, flag)
172
173            # Only the attribute modified by the routine we expect to be
174            # called should be True.
175            for attr in attributes:
176                self.assertEqual(getattr(tobj, attr), attr==expectedattr)
177
178            # check that ExitNow exceptions in the object handler method
179            # bubbles all the way up through asyncore readwrite call
180            tr1 = exitingdummy()
181            self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
182
183            # check that an exception other than ExitNow in the object handler
184            # method causes the handle_error method to get called
185            tr2 = crashingdummy()
186            self.assertEqual(tr2.error_handled, False)
187            asyncore.readwrite(tr2, flag)
188            self.assertEqual(tr2.error_handled, True)
189
190    def test_closeall(self):
191        self.closeall_check(False)
192
193    def test_closeall_default(self):
194        self.closeall_check(True)
195
196    def closeall_check(self, usedefault):
197        # Check that close_all() closes everything in a given map
198
199        l = []
200        testmap = {}
201        for i in range(10):
202            c = dummychannel()
203            l.append(c)
204            self.assertEqual(c.socket.closed, False)
205            testmap[i] = c
206
207        if usedefault:
208            socketmap = asyncore.socket_map
209            try:
210                asyncore.socket_map = testmap
211                asyncore.close_all()
212            finally:
213                testmap, asyncore.socket_map = asyncore.socket_map, socketmap
214        else:
215            asyncore.close_all(testmap)
216
217        self.assertEqual(len(testmap), 0)
218
219        for c in l:
220            self.assertEqual(c.socket.closed, True)
221
222    def test_compact_traceback(self):
223        try:
224            raise Exception("I don't like spam!")
225        except:
226            real_t, real_v, real_tb = sys.exc_info()
227            r = asyncore.compact_traceback()
228        else:
229            self.fail("Expected exception")
230
231        (f, function, line), t, v, info = r
232        self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
233        self.assertEqual(function, 'test_compact_traceback')
234        self.assertEqual(t, real_t)
235        self.assertEqual(v, real_v)
236        self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
237
238
239class DispatcherTests(unittest.TestCase):
240    def setUp(self):
241        pass
242
243    def tearDown(self):
244        asyncore.close_all()
245
246    def test_basic(self):
247        d = asyncore.dispatcher()
248        self.assertEqual(d.readable(), True)
249        self.assertEqual(d.writable(), True)
250
251    def test_repr(self):
252        d = asyncore.dispatcher()
253        self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
254
255    def test_log(self):
256        d = asyncore.dispatcher()
257
258        # capture output of dispatcher.log() (to stderr)
259        l1 = "Lovely spam! Wonderful spam!"
260        l2 = "I don't like spam!"
261        with support.captured_stderr() as stderr:
262            d.log(l1)
263            d.log(l2)
264
265        lines = stderr.getvalue().splitlines()
266        self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
267
268    def test_log_info(self):
269        d = asyncore.dispatcher()
270
271        # capture output of dispatcher.log_info() (to stdout via print)
272        l1 = "Have you got anything without spam?"
273        l2 = "Why can't she have egg bacon spam and sausage?"
274        l3 = "THAT'S got spam in it!"
275        with support.captured_stdout() as stdout:
276            d.log_info(l1, 'EGGS')
277            d.log_info(l2)
278            d.log_info(l3, 'SPAM')
279
280        lines = stdout.getvalue().splitlines()
281        expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
282        self.assertEqual(lines, expected)
283
284    def test_unhandled(self):
285        d = asyncore.dispatcher()
286        d.ignore_log_types = ()
287
288        # capture output of dispatcher.log_info() (to stdout via print)
289        with support.captured_stdout() as stdout:
290            d.handle_expt()
291            d.handle_read()
292            d.handle_write()
293            d.handle_connect()
294
295        lines = stdout.getvalue().splitlines()
296        expected = ['warning: unhandled incoming priority event',
297                    'warning: unhandled read event',
298                    'warning: unhandled write event',
299                    'warning: unhandled connect event']
300        self.assertEqual(lines, expected)
301
302    def test_strerror(self):
303        # refers to bug #8573
304        err = asyncore._strerror(errno.EPERM)
305        if hasattr(os, 'strerror'):
306            self.assertEqual(err, os.strerror(errno.EPERM))
307        err = asyncore._strerror(-1)
308        self.assertTrue(err != "")
309
310
311class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
312    def readable(self):
313        return False
314
315    def handle_connect(self):
316        pass
317
318
319class DispatcherWithSendTests(unittest.TestCase):
320    def setUp(self):
321        pass
322
323    def tearDown(self):
324        asyncore.close_all()
325
326    @support.reap_threads
327    def test_send(self):
328        evt = threading.Event()
329        sock = socket.socket()
330        sock.settimeout(3)
331        port = support.bind_port(sock)
332
333        cap = BytesIO()
334        args = (evt, cap, sock)
335        t = threading.Thread(target=capture_server, args=args)
336        t.start()
337        try:
338            # wait a little longer for the server to initialize (it sometimes
339            # refuses connections on slow machines without this wait)
340            time.sleep(0.2)
341
342            data = b"Suppose there isn't a 16-ton weight?"
343            d = dispatcherwithsend_noread()
344            d.create_socket()
345            d.connect((support.HOST, port))
346
347            # give time for socket to connect
348            time.sleep(0.1)
349
350            d.send(data)
351            d.send(data)
352            d.send(b'\n')
353
354            n = 1000
355            while d.out_buffer and n > 0:
356                asyncore.poll()
357                n -= 1
358
359            evt.wait()
360
361            self.assertEqual(cap.getvalue(), data*2)
362        finally:
363            support.join_thread(t, timeout=TIMEOUT)
364
365
366@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
367                     'asyncore.file_wrapper required')
368class FileWrapperTest(unittest.TestCase):
369    def setUp(self):
370        self.d = b"It's not dead, it's sleeping!"
371        with open(support.TESTFN, 'wb') as file:
372            file.write(self.d)
373
374    def tearDown(self):
375        support.unlink(support.TESTFN)
376
377    def test_recv(self):
378        fd = os.open(support.TESTFN, os.O_RDONLY)
379        w = asyncore.file_wrapper(fd)
380        os.close(fd)
381
382        self.assertNotEqual(w.fd, fd)
383        self.assertNotEqual(w.fileno(), fd)
384        self.assertEqual(w.recv(13), b"It's not dead")
385        self.assertEqual(w.read(6), b", it's")
386        w.close()
387        self.assertRaises(OSError, w.read, 1)
388
389    def test_send(self):
390        d1 = b"Come again?"
391        d2 = b"I want to buy some cheese."
392        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
393        w = asyncore.file_wrapper(fd)
394        os.close(fd)
395
396        w.write(d1)
397        w.send(d2)
398        w.close()
399        with open(support.TESTFN, 'rb') as file:
400            self.assertEqual(file.read(), self.d + d1 + d2)
401
402    @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
403                         'asyncore.file_dispatcher required')
404    def test_dispatcher(self):
405        fd = os.open(support.TESTFN, os.O_RDONLY)
406        data = []
407        class FileDispatcher(asyncore.file_dispatcher):
408            def handle_read(self):
409                data.append(self.recv(29))
410        s = FileDispatcher(fd)
411        os.close(fd)
412        asyncore.loop(timeout=0.01, use_poll=True, count=2)
413        self.assertEqual(b"".join(data), self.d)
414
415    def test_resource_warning(self):
416        # Issue #11453
417        fd = os.open(support.TESTFN, os.O_RDONLY)
418        f = asyncore.file_wrapper(fd)
419
420        os.close(fd)
421        with support.check_warnings(('', ResourceWarning)):
422            f = None
423            support.gc_collect()
424
425    def test_close_twice(self):
426        fd = os.open(support.TESTFN, os.O_RDONLY)
427        f = asyncore.file_wrapper(fd)
428        os.close(fd)
429
430        os.close(f.fd)  # file_wrapper dupped fd
431        with self.assertRaises(OSError):
432            f.close()
433
434        self.assertEqual(f.fd, -1)
435        # calling close twice should not fail
436        f.close()
437
438
439class BaseTestHandler(asyncore.dispatcher):
440
441    def __init__(self, sock=None):
442        asyncore.dispatcher.__init__(self, sock)
443        self.flag = False
444
445    def handle_accept(self):
446        raise Exception("handle_accept not supposed to be called")
447
448    def handle_accepted(self):
449        raise Exception("handle_accepted not supposed to be called")
450
451    def handle_connect(self):
452        raise Exception("handle_connect not supposed to be called")
453
454    def handle_expt(self):
455        raise Exception("handle_expt not supposed to be called")
456
457    def handle_close(self):
458        raise Exception("handle_close not supposed to be called")
459
460    def handle_error(self):
461        raise
462
463
464class BaseServer(asyncore.dispatcher):
465    """A server which listens on an address and dispatches the
466    connection to a handler.
467    """
468
469    def __init__(self, family, addr, handler=BaseTestHandler):
470        asyncore.dispatcher.__init__(self)
471        self.create_socket(family)
472        self.set_reuse_addr()
473        bind_af_aware(self.socket, addr)
474        self.listen(5)
475        self.handler = handler
476
477    @property
478    def address(self):
479        return self.socket.getsockname()
480
481    def handle_accepted(self, sock, addr):
482        self.handler(sock)
483
484    def handle_error(self):
485        raise
486
487
488class BaseClient(BaseTestHandler):
489
490    def __init__(self, family, address):
491        BaseTestHandler.__init__(self)
492        self.create_socket(family)
493        self.connect(address)
494
495    def handle_connect(self):
496        pass
497
498
499class BaseTestAPI:
500
501    def tearDown(self):
502        asyncore.close_all(ignore_all=True)
503
504    def loop_waiting_for_flag(self, instance, timeout=5):
505        timeout = float(timeout) / 100
506        count = 100
507        while asyncore.socket_map and count > 0:
508            asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
509            if instance.flag:
510                return
511            count -= 1
512            time.sleep(timeout)
513        self.fail("flag not set")
514
515    def test_handle_connect(self):
516        # make sure handle_connect is called on connect()
517
518        class TestClient(BaseClient):
519            def handle_connect(self):
520                self.flag = True
521
522        server = BaseServer(self.family, self.addr)
523        client = TestClient(self.family, server.address)
524        self.loop_waiting_for_flag(client)
525
526    def test_handle_accept(self):
527        # make sure handle_accept() is called when a client connects
528
529        class TestListener(BaseTestHandler):
530
531            def __init__(self, family, addr):
532                BaseTestHandler.__init__(self)
533                self.create_socket(family)
534                bind_af_aware(self.socket, addr)
535                self.listen(5)
536                self.address = self.socket.getsockname()
537
538            def handle_accept(self):
539                self.flag = True
540
541        server = TestListener(self.family, self.addr)
542        client = BaseClient(self.family, server.address)
543        self.loop_waiting_for_flag(server)
544
545    def test_handle_accepted(self):
546        # make sure handle_accepted() is called when a client connects
547
548        class TestListener(BaseTestHandler):
549
550            def __init__(self, family, addr):
551                BaseTestHandler.__init__(self)
552                self.create_socket(family)
553                bind_af_aware(self.socket, addr)
554                self.listen(5)
555                self.address = self.socket.getsockname()
556
557            def handle_accept(self):
558                asyncore.dispatcher.handle_accept(self)
559
560            def handle_accepted(self, sock, addr):
561                sock.close()
562                self.flag = True
563
564        server = TestListener(self.family, self.addr)
565        client = BaseClient(self.family, server.address)
566        self.loop_waiting_for_flag(server)
567
568
569    def test_handle_read(self):
570        # make sure handle_read is called on data received
571
572        class TestClient(BaseClient):
573            def handle_read(self):
574                self.flag = True
575
576        class TestHandler(BaseTestHandler):
577            def __init__(self, conn):
578                BaseTestHandler.__init__(self, conn)
579                self.send(b'x' * 1024)
580
581        server = BaseServer(self.family, self.addr, TestHandler)
582        client = TestClient(self.family, server.address)
583        self.loop_waiting_for_flag(client)
584
585    def test_handle_write(self):
586        # make sure handle_write is called
587
588        class TestClient(BaseClient):
589            def handle_write(self):
590                self.flag = True
591
592        server = BaseServer(self.family, self.addr)
593        client = TestClient(self.family, server.address)
594        self.loop_waiting_for_flag(client)
595
596    def test_handle_close(self):
597        # make sure handle_close is called when the other end closes
598        # the connection
599
600        class TestClient(BaseClient):
601
602            def handle_read(self):
603                # in order to make handle_close be called we are supposed
604                # to make at least one recv() call
605                self.recv(1024)
606
607            def handle_close(self):
608                self.flag = True
609                self.close()
610
611        class TestHandler(BaseTestHandler):
612            def __init__(self, conn):
613                BaseTestHandler.__init__(self, conn)
614                self.close()
615
616        server = BaseServer(self.family, self.addr, TestHandler)
617        client = TestClient(self.family, server.address)
618        self.loop_waiting_for_flag(client)
619
620    def test_handle_close_after_conn_broken(self):
621        # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
622        # #11265).
623
624        data = b'\0' * 128
625
626        class TestClient(BaseClient):
627
628            def handle_write(self):
629                self.send(data)
630
631            def handle_close(self):
632                self.flag = True
633                self.close()
634
635            def handle_expt(self):
636                self.flag = True
637                self.close()
638
639        class TestHandler(BaseTestHandler):
640
641            def handle_read(self):
642                self.recv(len(data))
643                self.close()
644
645            def writable(self):
646                return False
647
648        server = BaseServer(self.family, self.addr, TestHandler)
649        client = TestClient(self.family, server.address)
650        self.loop_waiting_for_flag(client)
651
652    @unittest.skipIf(sys.platform.startswith("sunos"),
653                     "OOB support is broken on Solaris")
654    def test_handle_expt(self):
655        # Make sure handle_expt is called on OOB data received.
656        # Note: this might fail on some platforms as OOB data is
657        # tenuously supported and rarely used.
658        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
659            self.skipTest("Not applicable to AF_UNIX sockets.")
660
661        if sys.platform == "darwin" and self.use_poll:
662            self.skipTest("poll may fail on macOS; see issue #28087")
663
664        class TestClient(BaseClient):
665            def handle_expt(self):
666                self.socket.recv(1024, socket.MSG_OOB)
667                self.flag = True
668
669        class TestHandler(BaseTestHandler):
670            def __init__(self, conn):
671                BaseTestHandler.__init__(self, conn)
672                self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
673
674        server = BaseServer(self.family, self.addr, TestHandler)
675        client = TestClient(self.family, server.address)
676        self.loop_waiting_for_flag(client)
677
678    def test_handle_error(self):
679
680        class TestClient(BaseClient):
681            def handle_write(self):
682                1.0 / 0
683            def handle_error(self):
684                self.flag = True
685                try:
686                    raise
687                except ZeroDivisionError:
688                    pass
689                else:
690                    raise Exception("exception not raised")
691
692        server = BaseServer(self.family, self.addr)
693        client = TestClient(self.family, server.address)
694        self.loop_waiting_for_flag(client)
695
696    def test_connection_attributes(self):
697        server = BaseServer(self.family, self.addr)
698        client = BaseClient(self.family, server.address)
699
700        # we start disconnected
701        self.assertFalse(server.connected)
702        self.assertTrue(server.accepting)
703        # this can't be taken for granted across all platforms
704        #self.assertFalse(client.connected)
705        self.assertFalse(client.accepting)
706
707        # execute some loops so that client connects to server
708        asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
709        self.assertFalse(server.connected)
710        self.assertTrue(server.accepting)
711        self.assertTrue(client.connected)
712        self.assertFalse(client.accepting)
713
714        # disconnect the client
715        client.close()
716        self.assertFalse(server.connected)
717        self.assertTrue(server.accepting)
718        self.assertFalse(client.connected)
719        self.assertFalse(client.accepting)
720
721        # stop serving
722        server.close()
723        self.assertFalse(server.connected)
724        self.assertFalse(server.accepting)
725
726    def test_create_socket(self):
727        s = asyncore.dispatcher()
728        s.create_socket(self.family)
729        self.assertEqual(s.socket.type, socket.SOCK_STREAM)
730        self.assertEqual(s.socket.family, self.family)
731        self.assertEqual(s.socket.gettimeout(), 0)
732        self.assertFalse(s.socket.get_inheritable())
733
734    def test_bind(self):
735        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
736            self.skipTest("Not applicable to AF_UNIX sockets.")
737        s1 = asyncore.dispatcher()
738        s1.create_socket(self.family)
739        s1.bind(self.addr)
740        s1.listen(5)
741        port = s1.socket.getsockname()[1]
742
743        s2 = asyncore.dispatcher()
744        s2.create_socket(self.family)
745        # EADDRINUSE indicates the socket was correctly bound
746        self.assertRaises(OSError, s2.bind, (self.addr[0], port))
747
748    def test_set_reuse_addr(self):
749        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
750            self.skipTest("Not applicable to AF_UNIX sockets.")
751
752        with socket.socket(self.family) as sock:
753            try:
754                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
755            except OSError:
756                unittest.skip("SO_REUSEADDR not supported on this platform")
757            else:
758                # if SO_REUSEADDR succeeded for sock we expect asyncore
759                # to do the same
760                s = asyncore.dispatcher(socket.socket(self.family))
761                self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
762                                                     socket.SO_REUSEADDR))
763                s.socket.close()
764                s.create_socket(self.family)
765                s.set_reuse_addr()
766                self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
767                                                     socket.SO_REUSEADDR))
768
769    @support.reap_threads
770    def test_quick_connect(self):
771        # see: http://bugs.python.org/issue10340
772        if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
773            self.skipTest("test specific to AF_INET and AF_INET6")
774
775        server = BaseServer(self.family, self.addr)
776        # run the thread 500 ms: the socket should be connected in 200 ms
777        t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
778                                                          count=5))
779        t.start()
780        try:
781            with socket.socket(self.family, socket.SOCK_STREAM) as s:
782                s.settimeout(.2)
783                s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
784                             struct.pack('ii', 1, 0))
785
786                try:
787                    s.connect(server.address)
788                except OSError:
789                    pass
790        finally:
791            support.join_thread(t, timeout=TIMEOUT)
792
793class TestAPI_UseIPv4Sockets(BaseTestAPI):
794    family = socket.AF_INET
795    addr = (support.HOST, 0)
796
797@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required')
798class TestAPI_UseIPv6Sockets(BaseTestAPI):
799    family = socket.AF_INET6
800    addr = (support.HOSTv6, 0)
801
802@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
803class TestAPI_UseUnixSockets(BaseTestAPI):
804    if HAS_UNIX_SOCKETS:
805        family = socket.AF_UNIX
806    addr = support.TESTFN
807
808    def tearDown(self):
809        support.unlink(self.addr)
810        BaseTestAPI.tearDown(self)
811
812class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
813    use_poll = False
814
815@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
816class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase):
817    use_poll = True
818
819class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase):
820    use_poll = False
821
822@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
823class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase):
824    use_poll = True
825
826class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase):
827    use_poll = False
828
829@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
830class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase):
831    use_poll = True
832
833if __name__ == "__main__":
834    unittest.main()
835