1# Copyright (c) 2009 AG Projects
2# Author: Denis Bilenko
3#
4# Permission is hereby granted, free of charge, to any person obtaining a copy
5# of this software and associated documentation files (the "Software"), to deal
6# in the Software without restriction, including without limitation the rights
7# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8# copies of the Software, and to permit persons to whom the Software is
9# furnished to do so, subject to the following conditions:
10#
11# The above copyright notice and this permission notice shall be included in
12# all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20# THE SOFTWARE.
21
22import re
23import time
24import unittest
25
26import gevent.testing as greentest
27import gevent.testing.timing
28
29import gevent
30from gevent import socket
31from gevent.hub import Waiter, get_hub
32from gevent._compat import NativeStrIO
33from gevent._compat import get_this_psutil_process
34
35DELAY = 0.1
36
37
38class TestCloseSocketWhilePolling(greentest.TestCase):
39
40    def test(self):
41        sock = socket.socket()
42        self._close_on_teardown(sock)
43        t = get_hub().loop.timer(0)
44        t.start(sock.close)
45        with self.assertRaises(socket.error):
46            try:
47                sock.connect(('python.org', 81))
48            finally:
49                t.close()
50
51        gevent.sleep(0)
52
53
54class TestExceptionInMainloop(greentest.TestCase):
55
56    def test_sleep(self):
57        # even if there was an error in the mainloop, the hub should continue to work
58        start = time.time()
59        gevent.sleep(DELAY)
60        delay = time.time() - start
61
62        delay_range = DELAY * 0.9
63        self.assertTimeWithinRange(delay, DELAY - delay_range, DELAY + delay_range)
64
65        error = greentest.ExpectedException('TestExceptionInMainloop.test_sleep/fail')
66
67        def fail():
68            raise error
69
70        with get_hub().loop.timer(0.001) as t:
71            t.start(fail)
72
73            self.expect_one_error()
74
75            start = time.time()
76            gevent.sleep(DELAY)
77            delay = time.time() - start
78
79            self.assert_error(value=error)
80            self.assertTimeWithinRange(delay, DELAY - delay_range, DELAY + delay_range)
81
82
83
84class TestSleep(gevent.testing.timing.AbstractGenericWaitTestCase):
85
86    def wait(self, timeout):
87        gevent.sleep(timeout)
88
89    def test_simple(self):
90        gevent.sleep(0)
91
92
93class TestWaiterGet(gevent.testing.timing.AbstractGenericWaitTestCase):
94
95    def setUp(self):
96        super(TestWaiterGet, self).setUp()
97        self.waiter = Waiter()
98
99    def wait(self, timeout):
100        with get_hub().loop.timer(timeout) as evt:
101            evt.start(self.waiter.switch, None)
102            return self.waiter.get()
103
104
105class TestWaiter(greentest.TestCase):
106
107    def test(self):
108        waiter = Waiter()
109        self.assertEqual(str(waiter), '<Waiter greenlet=None>')
110        waiter.switch(25)
111        self.assertEqual(str(waiter), '<Waiter greenlet=None value=25>')
112        self.assertEqual(waiter.get(), 25)
113
114        waiter = Waiter()
115        waiter.throw(ZeroDivisionError)
116        assert re.match('^<Waiter greenlet=None exc_info=.*ZeroDivisionError.*$', str(waiter)), str(waiter)
117        self.assertRaises(ZeroDivisionError, waiter.get)
118
119        waiter = Waiter()
120        g = gevent.spawn(waiter.get)
121        g.name = 'AName'
122        gevent.sleep(0)
123        str_waiter = str(waiter)
124        self.assertTrue(str_waiter.startswith('<Waiter greenlet=<Greenlet "AName'),
125                        str_waiter)
126
127        g.kill()
128
129
130@greentest.skipOnCI("Racy on CI")
131class TestPeriodicMonitoringThread(greentest.TestCase):
132
133    def _reset_hub(self):
134        hub = get_hub()
135        try:
136            del hub.exception_stream
137        except AttributeError:
138            pass
139        if hub._threadpool is not None:
140            hub.threadpool.join()
141            hub.threadpool.kill()
142            del hub.threadpool
143
144
145    def setUp(self):
146        super(TestPeriodicMonitoringThread, self).setUp()
147        self.monitor_thread = gevent.config.monitor_thread
148        gevent.config.monitor_thread = True
149        from gevent.monkey import get_original
150        self.lock = get_original('threading', 'Lock')()
151        self.monitor_fired = 0
152        self.monitored_hubs = set()
153        self._reset_hub()
154
155    def tearDown(self):
156        hub = get_hub()
157        if not self.monitor_thread and hub.periodic_monitoring_thread:
158            # If it was true, nothing to do. If it was false, tear things down.
159            hub.periodic_monitoring_thread.kill()
160            hub.periodic_monitoring_thread = None
161        gevent.config.monitor_thread = self.monitor_thread
162        self.monitored_hubs = None
163        self._reset_hub()
164        super(TestPeriodicMonitoringThread, self).tearDown()
165
166    def _monitor(self, hub):
167        with self.lock:
168            self.monitor_fired += 1
169            if self.monitored_hubs is not None:
170                self.monitored_hubs.add(hub)
171
172    def test_config(self):
173        self.assertEqual(0.1, gevent.config.max_blocking_time)
174
175    def _run_monitoring_threads(self, monitor, kill=True):
176        self.assertTrue(monitor.should_run)
177        from threading import Condition
178        cond = Condition()
179        cond.acquire()
180
181        def monitor_cond(_hub):
182            cond.acquire()
183            cond.notify_all()
184            cond.release()
185            if kill:
186                # Only run once. Especially helpful on PyPy, where
187                # formatting stacks is expensive.
188                monitor.kill()
189
190        monitor.add_monitoring_function(monitor_cond, 0.01)
191
192        cond.wait()
193        cond.release()
194        monitor.add_monitoring_function(monitor_cond, None)
195
196    @greentest.ignores_leakcheck
197    def test_kill_removes_trace(self):
198        from greenlet import gettrace
199        hub = get_hub()
200        hub.start_periodic_monitoring_thread()
201        self.assertIsNotNone(gettrace())
202        hub.periodic_monitoring_thread.kill()
203        self.assertIsNone(gettrace())
204
205    @greentest.ignores_leakcheck
206    def test_blocking_this_thread(self):
207        hub = get_hub()
208        stream = hub.exception_stream = NativeStrIO()
209        monitor = hub.start_periodic_monitoring_thread()
210        self.assertIsNotNone(monitor)
211
212        basic_monitor_func_count = 1
213        if get_this_psutil_process() is not None:
214            # psutil is installed
215            basic_monitor_func_count += 1
216
217        self.assertEqual(basic_monitor_func_count,
218                         len(monitor.monitoring_functions()))
219        monitor.add_monitoring_function(self._monitor, 0.1)
220        self.assertEqual(basic_monitor_func_count + 1,
221                         len(monitor.monitoring_functions()))
222        self.assertEqual(self._monitor, monitor.monitoring_functions()[-1].function)
223        self.assertEqual(0.1, monitor.monitoring_functions()[-1].period)
224
225        # We must make sure we have switched greenlets at least once,
226        # otherwise we can't detect a failure.
227        gevent.sleep(hub.loop.approx_timer_resolution)
228        assert hub.exception_stream is stream
229        try:
230            time.sleep(0.3) # Thrice the default
231            self._run_monitoring_threads(monitor)
232        finally:
233            monitor.add_monitoring_function(self._monitor, None)
234            self.assertEqual(basic_monitor_func_count,
235                             len(monitor._monitoring_functions))
236            assert hub.exception_stream is stream
237            monitor.kill()
238            del hub.exception_stream
239
240
241        self.assertGreaterEqual(self.monitor_fired, 1)
242        data = stream.getvalue()
243        self.assertIn('appears to be blocked', data)
244        self.assertIn('PeriodicMonitoringThread', data)
245
246    def _prep_worker_thread(self):
247        hub = get_hub()
248        threadpool = hub.threadpool
249
250        worker_hub = threadpool.apply(get_hub)
251        assert hub is not worker_hub
252        stream = NativeStrIO()
253
254        # It does not have a monitoring thread yet
255        self.assertIsNone(worker_hub.periodic_monitoring_thread)
256        # So switch to it and give it one by letting it run.
257        # XXX: Python 3.10 appears to have made some changes in the memory model.
258        # Specifically, reading values from the background that are set in the
259        # background hub *from this thread* is flaky. It takes them awhile to show up.
260        # Really, that's correct and expected from a standard C point of view, as we
261        # don't insert any memory barriers or things like that. It just always used to
262        # work in the past. So now, rather than read them directly, we need to read them
263        # from the background thread itself. The same, apparently, goes for
264        # writing.
265        # Need to figure out what exactly the change was.
266        def task():
267            get_hub().exception_stream = stream
268            gevent.sleep(0.01)
269            mon = get_hub().periodic_monitoring_thread
270            mon.add_monitoring_function(self._monitor, 0.1)
271            return mon
272        worker_monitor = threadpool.apply(task)
273        self.assertIsNotNone(worker_monitor)
274
275        return worker_hub, stream, worker_monitor
276
277    @greentest.ignores_leakcheck
278    def test_blocking_threadpool_thread_task_queue(self):
279        # A threadpool thread spends much of its time
280        # blocked on the native Lock object. Unless we take
281        # care, if that thread had created a hub, it will constantly
282        # be reported as blocked.
283
284        worker_hub, stream, worker_monitor = self._prep_worker_thread()
285
286        # Now wait until the monitoring threads have run.
287        self._run_monitoring_threads(worker_monitor)
288        worker_monitor.kill()
289
290        # We did run the monitor in the worker thread, but it
291        # did NOT report itself blocked by the worker thread sitting there.
292        with self.lock:
293            self.assertIn(worker_hub, self.monitored_hubs)
294            self.assertEqual(stream.getvalue(), '')
295
296    @greentest.ignores_leakcheck
297    def test_blocking_threadpool_thread_one_greenlet(self):
298        # If the background threadpool thread has no other greenlets to run
299        # and never switches, then even if it has a hub
300        # we don't report it blocking. The threadpool is *meant* to run
301        # tasks that block.
302
303        hub = get_hub()
304        threadpool = hub.threadpool
305        worker_hub, stream, worker_monitor = self._prep_worker_thread()
306
307        task = threadpool.spawn(time.sleep, 0.3)
308        # Now wait until the monitoring threads have run.
309        self._run_monitoring_threads(worker_monitor)
310        # and be sure the task ran
311        task.get()
312        worker_monitor.kill()
313
314        # We did run the monitor in the worker thread, but it
315        # did NOT report itself blocked by the worker thread
316        with self.lock:
317            self.assertIn(worker_hub, self.monitored_hubs)
318            self.assertEqual(stream.getvalue(), '')
319
320
321    @greentest.ignores_leakcheck
322    def test_blocking_threadpool_thread_multi_greenlet(self):
323        # If the background threadpool thread ever switches
324        # greenlets, monitoring goes into affect.
325
326        hub = get_hub()
327        threadpool = hub.threadpool
328        worker_hub, stream, worker_monitor = self._prep_worker_thread()
329
330        def task():
331            g = gevent.spawn(time.sleep, 0.7)
332            g.join()
333
334        task = threadpool.spawn(task)
335        # Now wait until the monitoring threads have run.
336        self._run_monitoring_threads(worker_monitor, kill=False)
337        # and be sure the task ran
338        task.get()
339        worker_monitor.kill()
340
341        # We did run the monitor in the worker thread, and it
342        # DID report itself blocked by the worker thread
343        self.assertIn(worker_hub, self.monitored_hubs)
344        data = stream.getvalue()
345        self.assertIn('appears to be blocked', data)
346        self.assertIn('PeriodicMonitoringThread', data)
347
348
349class TestLoopInterface(unittest.TestCase):
350
351    def test_implemensts_ILoop(self):
352        from gevent.testing import verify
353        from gevent._interfaces import ILoop
354
355        loop = get_hub().loop
356
357        verify.verifyObject(ILoop, loop)
358
359    def test_callback_implements_ICallback(self):
360        from gevent.testing import verify
361        from gevent._interfaces import ICallback
362
363        loop = get_hub().loop
364
365        cb = loop.run_callback(lambda: None)
366        verify.verifyObject(ICallback, cb)
367
368    def test_callback_ts_implements_ICallback(self):
369        from gevent.testing import verify
370        from gevent._interfaces import ICallback
371
372        loop = get_hub().loop
373
374        cb = loop.run_callback_threadsafe(lambda: None)
375        verify.verifyObject(ICallback, cb)
376
377
378class TestHandleError(unittest.TestCase):
379
380    def tearDown(self):
381        try:
382            del get_hub().handle_error
383        except AttributeError:
384            pass
385
386    def test_exception_in_custom_handle_error_does_not_crash(self):
387
388        def bad_handle_error(*args):
389            raise AttributeError
390
391        get_hub().handle_error = bad_handle_error
392
393        class MyException(Exception):
394            pass
395
396        def raises():
397            raise MyException
398
399        with self.assertRaises(MyException):
400            gevent.spawn(raises).get()
401
402
403if __name__ == '__main__':
404    greentest.main()
405