1# Copyright (c) 2009 AG Projects 2# Author: Denis Bilenko 3# 4# Permission is hereby granted, free of charge, to any person obtaining a copy 5# of this software and associated documentation files (the "Software"), to deal 6# in the Software without restriction, including without limitation the rights 7# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8# copies of the Software, and to permit persons to whom the Software is 9# furnished to do so, subject to the following conditions: 10# 11# The above copyright notice and this permission notice shall be included in 12# all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20# THE SOFTWARE. 21 22import re 23import time 24import unittest 25 26import gevent.testing as greentest 27import gevent.testing.timing 28 29import gevent 30from gevent import socket 31from gevent.hub import Waiter, get_hub 32from gevent._compat import NativeStrIO 33from gevent._compat import get_this_psutil_process 34 35DELAY = 0.1 36 37 38class TestCloseSocketWhilePolling(greentest.TestCase): 39 40 def test(self): 41 sock = socket.socket() 42 self._close_on_teardown(sock) 43 t = get_hub().loop.timer(0) 44 t.start(sock.close) 45 with self.assertRaises(socket.error): 46 try: 47 sock.connect(('python.org', 81)) 48 finally: 49 t.close() 50 51 gevent.sleep(0) 52 53 54class TestExceptionInMainloop(greentest.TestCase): 55 56 def test_sleep(self): 57 # even if there was an error in the mainloop, the hub should continue to work 58 start = time.time() 59 gevent.sleep(DELAY) 60 delay = time.time() - start 61 62 delay_range = DELAY * 0.9 63 self.assertTimeWithinRange(delay, DELAY - delay_range, DELAY + delay_range) 64 65 error = greentest.ExpectedException('TestExceptionInMainloop.test_sleep/fail') 66 67 def fail(): 68 raise error 69 70 with get_hub().loop.timer(0.001) as t: 71 t.start(fail) 72 73 self.expect_one_error() 74 75 start = time.time() 76 gevent.sleep(DELAY) 77 delay = time.time() - start 78 79 self.assert_error(value=error) 80 self.assertTimeWithinRange(delay, DELAY - delay_range, DELAY + delay_range) 81 82 83 84class TestSleep(gevent.testing.timing.AbstractGenericWaitTestCase): 85 86 def wait(self, timeout): 87 gevent.sleep(timeout) 88 89 def test_simple(self): 90 gevent.sleep(0) 91 92 93class TestWaiterGet(gevent.testing.timing.AbstractGenericWaitTestCase): 94 95 def setUp(self): 96 super(TestWaiterGet, self).setUp() 97 self.waiter = Waiter() 98 99 def wait(self, timeout): 100 with get_hub().loop.timer(timeout) as evt: 101 evt.start(self.waiter.switch, None) 102 return self.waiter.get() 103 104 105class TestWaiter(greentest.TestCase): 106 107 def test(self): 108 waiter = Waiter() 109 self.assertEqual(str(waiter), '<Waiter greenlet=None>') 110 waiter.switch(25) 111 self.assertEqual(str(waiter), '<Waiter greenlet=None value=25>') 112 self.assertEqual(waiter.get(), 25) 113 114 waiter = Waiter() 115 waiter.throw(ZeroDivisionError) 116 assert re.match('^<Waiter greenlet=None exc_info=.*ZeroDivisionError.*$', str(waiter)), str(waiter) 117 self.assertRaises(ZeroDivisionError, waiter.get) 118 119 waiter = Waiter() 120 g = gevent.spawn(waiter.get) 121 g.name = 'AName' 122 gevent.sleep(0) 123 str_waiter = str(waiter) 124 self.assertTrue(str_waiter.startswith('<Waiter greenlet=<Greenlet "AName'), 125 str_waiter) 126 127 g.kill() 128 129 130@greentest.skipOnCI("Racy on CI") 131class TestPeriodicMonitoringThread(greentest.TestCase): 132 133 def _reset_hub(self): 134 hub = get_hub() 135 try: 136 del hub.exception_stream 137 except AttributeError: 138 pass 139 if hub._threadpool is not None: 140 hub.threadpool.join() 141 hub.threadpool.kill() 142 del hub.threadpool 143 144 145 def setUp(self): 146 super(TestPeriodicMonitoringThread, self).setUp() 147 self.monitor_thread = gevent.config.monitor_thread 148 gevent.config.monitor_thread = True 149 from gevent.monkey import get_original 150 self.lock = get_original('threading', 'Lock')() 151 self.monitor_fired = 0 152 self.monitored_hubs = set() 153 self._reset_hub() 154 155 def tearDown(self): 156 hub = get_hub() 157 if not self.monitor_thread and hub.periodic_monitoring_thread: 158 # If it was true, nothing to do. If it was false, tear things down. 159 hub.periodic_monitoring_thread.kill() 160 hub.periodic_monitoring_thread = None 161 gevent.config.monitor_thread = self.monitor_thread 162 self.monitored_hubs = None 163 self._reset_hub() 164 super(TestPeriodicMonitoringThread, self).tearDown() 165 166 def _monitor(self, hub): 167 with self.lock: 168 self.monitor_fired += 1 169 if self.monitored_hubs is not None: 170 self.monitored_hubs.add(hub) 171 172 def test_config(self): 173 self.assertEqual(0.1, gevent.config.max_blocking_time) 174 175 def _run_monitoring_threads(self, monitor, kill=True): 176 self.assertTrue(monitor.should_run) 177 from threading import Condition 178 cond = Condition() 179 cond.acquire() 180 181 def monitor_cond(_hub): 182 cond.acquire() 183 cond.notify_all() 184 cond.release() 185 if kill: 186 # Only run once. Especially helpful on PyPy, where 187 # formatting stacks is expensive. 188 monitor.kill() 189 190 monitor.add_monitoring_function(monitor_cond, 0.01) 191 192 cond.wait() 193 cond.release() 194 monitor.add_monitoring_function(monitor_cond, None) 195 196 @greentest.ignores_leakcheck 197 def test_kill_removes_trace(self): 198 from greenlet import gettrace 199 hub = get_hub() 200 hub.start_periodic_monitoring_thread() 201 self.assertIsNotNone(gettrace()) 202 hub.periodic_monitoring_thread.kill() 203 self.assertIsNone(gettrace()) 204 205 @greentest.ignores_leakcheck 206 def test_blocking_this_thread(self): 207 hub = get_hub() 208 stream = hub.exception_stream = NativeStrIO() 209 monitor = hub.start_periodic_monitoring_thread() 210 self.assertIsNotNone(monitor) 211 212 basic_monitor_func_count = 1 213 if get_this_psutil_process() is not None: 214 # psutil is installed 215 basic_monitor_func_count += 1 216 217 self.assertEqual(basic_monitor_func_count, 218 len(monitor.monitoring_functions())) 219 monitor.add_monitoring_function(self._monitor, 0.1) 220 self.assertEqual(basic_monitor_func_count + 1, 221 len(monitor.monitoring_functions())) 222 self.assertEqual(self._monitor, monitor.monitoring_functions()[-1].function) 223 self.assertEqual(0.1, monitor.monitoring_functions()[-1].period) 224 225 # We must make sure we have switched greenlets at least once, 226 # otherwise we can't detect a failure. 227 gevent.sleep(hub.loop.approx_timer_resolution) 228 assert hub.exception_stream is stream 229 try: 230 time.sleep(0.3) # Thrice the default 231 self._run_monitoring_threads(monitor) 232 finally: 233 monitor.add_monitoring_function(self._monitor, None) 234 self.assertEqual(basic_monitor_func_count, 235 len(monitor._monitoring_functions)) 236 assert hub.exception_stream is stream 237 monitor.kill() 238 del hub.exception_stream 239 240 241 self.assertGreaterEqual(self.monitor_fired, 1) 242 data = stream.getvalue() 243 self.assertIn('appears to be blocked', data) 244 self.assertIn('PeriodicMonitoringThread', data) 245 246 def _prep_worker_thread(self): 247 hub = get_hub() 248 threadpool = hub.threadpool 249 250 worker_hub = threadpool.apply(get_hub) 251 assert hub is not worker_hub 252 stream = NativeStrIO() 253 254 # It does not have a monitoring thread yet 255 self.assertIsNone(worker_hub.periodic_monitoring_thread) 256 # So switch to it and give it one by letting it run. 257 # XXX: Python 3.10 appears to have made some changes in the memory model. 258 # Specifically, reading values from the background that are set in the 259 # background hub *from this thread* is flaky. It takes them awhile to show up. 260 # Really, that's correct and expected from a standard C point of view, as we 261 # don't insert any memory barriers or things like that. It just always used to 262 # work in the past. So now, rather than read them directly, we need to read them 263 # from the background thread itself. The same, apparently, goes for 264 # writing. 265 # Need to figure out what exactly the change was. 266 def task(): 267 get_hub().exception_stream = stream 268 gevent.sleep(0.01) 269 mon = get_hub().periodic_monitoring_thread 270 mon.add_monitoring_function(self._monitor, 0.1) 271 return mon 272 worker_monitor = threadpool.apply(task) 273 self.assertIsNotNone(worker_monitor) 274 275 return worker_hub, stream, worker_monitor 276 277 @greentest.ignores_leakcheck 278 def test_blocking_threadpool_thread_task_queue(self): 279 # A threadpool thread spends much of its time 280 # blocked on the native Lock object. Unless we take 281 # care, if that thread had created a hub, it will constantly 282 # be reported as blocked. 283 284 worker_hub, stream, worker_monitor = self._prep_worker_thread() 285 286 # Now wait until the monitoring threads have run. 287 self._run_monitoring_threads(worker_monitor) 288 worker_monitor.kill() 289 290 # We did run the monitor in the worker thread, but it 291 # did NOT report itself blocked by the worker thread sitting there. 292 with self.lock: 293 self.assertIn(worker_hub, self.monitored_hubs) 294 self.assertEqual(stream.getvalue(), '') 295 296 @greentest.ignores_leakcheck 297 def test_blocking_threadpool_thread_one_greenlet(self): 298 # If the background threadpool thread has no other greenlets to run 299 # and never switches, then even if it has a hub 300 # we don't report it blocking. The threadpool is *meant* to run 301 # tasks that block. 302 303 hub = get_hub() 304 threadpool = hub.threadpool 305 worker_hub, stream, worker_monitor = self._prep_worker_thread() 306 307 task = threadpool.spawn(time.sleep, 0.3) 308 # Now wait until the monitoring threads have run. 309 self._run_monitoring_threads(worker_monitor) 310 # and be sure the task ran 311 task.get() 312 worker_monitor.kill() 313 314 # We did run the monitor in the worker thread, but it 315 # did NOT report itself blocked by the worker thread 316 with self.lock: 317 self.assertIn(worker_hub, self.monitored_hubs) 318 self.assertEqual(stream.getvalue(), '') 319 320 321 @greentest.ignores_leakcheck 322 def test_blocking_threadpool_thread_multi_greenlet(self): 323 # If the background threadpool thread ever switches 324 # greenlets, monitoring goes into affect. 325 326 hub = get_hub() 327 threadpool = hub.threadpool 328 worker_hub, stream, worker_monitor = self._prep_worker_thread() 329 330 def task(): 331 g = gevent.spawn(time.sleep, 0.7) 332 g.join() 333 334 task = threadpool.spawn(task) 335 # Now wait until the monitoring threads have run. 336 self._run_monitoring_threads(worker_monitor, kill=False) 337 # and be sure the task ran 338 task.get() 339 worker_monitor.kill() 340 341 # We did run the monitor in the worker thread, and it 342 # DID report itself blocked by the worker thread 343 self.assertIn(worker_hub, self.monitored_hubs) 344 data = stream.getvalue() 345 self.assertIn('appears to be blocked', data) 346 self.assertIn('PeriodicMonitoringThread', data) 347 348 349class TestLoopInterface(unittest.TestCase): 350 351 def test_implemensts_ILoop(self): 352 from gevent.testing import verify 353 from gevent._interfaces import ILoop 354 355 loop = get_hub().loop 356 357 verify.verifyObject(ILoop, loop) 358 359 def test_callback_implements_ICallback(self): 360 from gevent.testing import verify 361 from gevent._interfaces import ICallback 362 363 loop = get_hub().loop 364 365 cb = loop.run_callback(lambda: None) 366 verify.verifyObject(ICallback, cb) 367 368 def test_callback_ts_implements_ICallback(self): 369 from gevent.testing import verify 370 from gevent._interfaces import ICallback 371 372 loop = get_hub().loop 373 374 cb = loop.run_callback_threadsafe(lambda: None) 375 verify.verifyObject(ICallback, cb) 376 377 378class TestHandleError(unittest.TestCase): 379 380 def tearDown(self): 381 try: 382 del get_hub().handle_error 383 except AttributeError: 384 pass 385 386 def test_exception_in_custom_handle_error_does_not_crash(self): 387 388 def bad_handle_error(*args): 389 raise AttributeError 390 391 get_hub().handle_error = bad_handle_error 392 393 class MyException(Exception): 394 pass 395 396 def raises(): 397 raise MyException 398 399 with self.assertRaises(MyException): 400 gevent.spawn(raises).get() 401 402 403if __name__ == '__main__': 404 greentest.main() 405