1# Test case for the os.poll() function
2
3import os
4import subprocess
5import random
6import select
7import threading
8import time
9import unittest
10from test.support import cpython_only
11from test.support import threading_helper
12from test.support.os_helper import TESTFN
13
14
15try:
16    select.poll
17except AttributeError:
18    raise unittest.SkipTest("select.poll not defined")
19
20
21def find_ready_matching(ready, flag):
22    match = []
23    for fd, mode in ready:
24        if mode & flag:
25            match.append(fd)
26    return match
27
28class PollTests(unittest.TestCase):
29
30    def test_poll1(self):
31        # Basic functional test of poll object
32        # Create a bunch of pipe and test that poll works with them.
33
34        p = select.poll()
35
36        NUM_PIPES = 12
37        MSG = b" This is a test."
38        MSG_LEN = len(MSG)
39        readers = []
40        writers = []
41        r2w = {}
42        w2r = {}
43
44        for i in range(NUM_PIPES):
45            rd, wr = os.pipe()
46            p.register(rd)
47            p.modify(rd, select.POLLIN)
48            p.register(wr, select.POLLOUT)
49            readers.append(rd)
50            writers.append(wr)
51            r2w[rd] = wr
52            w2r[wr] = rd
53
54        bufs = []
55
56        while writers:
57            ready = p.poll()
58            ready_writers = find_ready_matching(ready, select.POLLOUT)
59            if not ready_writers:
60                raise RuntimeError("no pipes ready for writing")
61            wr = random.choice(ready_writers)
62            os.write(wr, MSG)
63
64            ready = p.poll()
65            ready_readers = find_ready_matching(ready, select.POLLIN)
66            if not ready_readers:
67                raise RuntimeError("no pipes ready for reading")
68            rd = random.choice(ready_readers)
69            buf = os.read(rd, MSG_LEN)
70            self.assertEqual(len(buf), MSG_LEN)
71            bufs.append(buf)
72            os.close(r2w[rd]) ; os.close( rd )
73            p.unregister( r2w[rd] )
74            p.unregister( rd )
75            writers.remove(r2w[rd])
76
77        self.assertEqual(bufs, [MSG] * NUM_PIPES)
78
79    def test_poll_unit_tests(self):
80        # returns NVAL for invalid file descriptor
81        FD, w = os.pipe()
82        os.close(FD)
83        os.close(w)
84        p = select.poll()
85        p.register(FD)
86        r = p.poll()
87        self.assertEqual(r[0], (FD, select.POLLNVAL))
88
89        with open(TESTFN, 'w') as f:
90            fd = f.fileno()
91            p = select.poll()
92            p.register(f)
93            r = p.poll()
94            self.assertEqual(r[0][0], fd)
95        r = p.poll()
96        self.assertEqual(r[0], (fd, select.POLLNVAL))
97        os.unlink(TESTFN)
98
99        # type error for invalid arguments
100        p = select.poll()
101        self.assertRaises(TypeError, p.register, p)
102        self.assertRaises(TypeError, p.unregister, p)
103
104        # can't unregister non-existent object
105        p = select.poll()
106        self.assertRaises(KeyError, p.unregister, 3)
107
108        # Test error cases
109        pollster = select.poll()
110        class Nope:
111            pass
112
113        class Almost:
114            def fileno(self):
115                return 'fileno'
116
117        self.assertRaises(TypeError, pollster.register, Nope(), 0)
118        self.assertRaises(TypeError, pollster.register, Almost(), 0)
119
120    # Another test case for poll().  This is copied from the test case for
121    # select(), modified to use poll() instead.
122
123    def test_poll2(self):
124        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
125        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
126                                bufsize=0)
127        proc.__enter__()
128        self.addCleanup(proc.__exit__, None, None, None)
129        p = proc.stdout
130        pollster = select.poll()
131        pollster.register( p, select.POLLIN )
132        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
133            fdlist = pollster.poll(tout)
134            if (fdlist == []):
135                continue
136            fd, flags = fdlist[0]
137            if flags & select.POLLHUP:
138                line = p.readline()
139                if line != b"":
140                    self.fail('error: pipe seems to be closed, but still returns data')
141                continue
142
143            elif flags & select.POLLIN:
144                line = p.readline()
145                if not line:
146                    break
147                self.assertEqual(line, b'testing...\n')
148                continue
149            else:
150                self.fail('Unexpected return value from select.poll: %s' % fdlist)
151
152    def test_poll3(self):
153        # test int overflow
154        pollster = select.poll()
155        pollster.register(1)
156
157        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
158
159        x = 2 + 3
160        if x != 5:
161            self.fail('Overflow must have occurred')
162
163        # Issues #15989, #17919
164        self.assertRaises(ValueError, pollster.register, 0, -1)
165        self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
166        self.assertRaises(ValueError, pollster.modify, 1, -1)
167        self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
168
169    @cpython_only
170    def test_poll_c_limits(self):
171        from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
172        pollster = select.poll()
173        pollster.register(1)
174
175        # Issues #15989, #17919
176        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
177        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
178        self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
179        self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
180
181    @threading_helper.reap_threads
182    def test_threaded_poll(self):
183        r, w = os.pipe()
184        self.addCleanup(os.close, r)
185        self.addCleanup(os.close, w)
186        rfds = []
187        for i in range(10):
188            fd = os.dup(r)
189            self.addCleanup(os.close, fd)
190            rfds.append(fd)
191        pollster = select.poll()
192        for fd in rfds:
193            pollster.register(fd, select.POLLIN)
194
195        t = threading.Thread(target=pollster.poll)
196        t.start()
197        try:
198            time.sleep(0.5)
199            # trigger ufds array reallocation
200            for fd in rfds:
201                pollster.unregister(fd)
202            pollster.register(w, select.POLLOUT)
203            self.assertRaises(RuntimeError, pollster.poll)
204        finally:
205            # and make the call to poll() from the thread return
206            os.write(w, b'spam')
207            t.join()
208
209    @unittest.skipUnless(threading, 'Threading required for this test.')
210    @threading_helper.reap_threads
211    def test_poll_blocks_with_negative_ms(self):
212        for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
213            # Create two file descriptors. This will be used to unlock
214            # the blocking call to poll.poll inside the thread
215            r, w = os.pipe()
216            pollster = select.poll()
217            pollster.register(r, select.POLLIN)
218
219            poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
220            poll_thread.start()
221            poll_thread.join(timeout=0.1)
222            self.assertTrue(poll_thread.is_alive())
223
224            # Write to the pipe so pollster.poll unblocks and the thread ends.
225            os.write(w, b'spam')
226            poll_thread.join()
227            self.assertFalse(poll_thread.is_alive())
228            os.close(r)
229            os.close(w)
230
231
232if __name__ == '__main__':
233    unittest.main()
234