1import pytest 2 3import weakref 4 5from ..testing import wait_all_tasks_blocked, assert_checkpoints 6 7from .. import _core 8from .. import _timeouts 9from .._timeouts import sleep_forever, move_on_after 10from .._sync import * 11 12 13async def test_Event(): 14 e = Event() 15 assert not e.is_set() 16 assert e.statistics().tasks_waiting == 0 17 18 e.set() 19 assert e.is_set() 20 with assert_checkpoints(): 21 await e.wait() 22 23 e = Event() 24 25 record = [] 26 27 async def child(): 28 record.append("sleeping") 29 await e.wait() 30 record.append("woken") 31 32 async with _core.open_nursery() as nursery: 33 nursery.start_soon(child) 34 nursery.start_soon(child) 35 await wait_all_tasks_blocked() 36 assert record == ["sleeping", "sleeping"] 37 assert e.statistics().tasks_waiting == 2 38 e.set() 39 await wait_all_tasks_blocked() 40 assert record == ["sleeping", "sleeping", "woken", "woken"] 41 42 43async def test_CapacityLimiter(): 44 with pytest.raises(TypeError): 45 CapacityLimiter(1.0) 46 with pytest.raises(ValueError): 47 CapacityLimiter(-1) 48 c = CapacityLimiter(2) 49 repr(c) # smoke test 50 assert c.total_tokens == 2 51 assert c.borrowed_tokens == 0 52 assert c.available_tokens == 2 53 with pytest.raises(RuntimeError): 54 c.release() 55 assert c.borrowed_tokens == 0 56 c.acquire_nowait() 57 assert c.borrowed_tokens == 1 58 assert c.available_tokens == 1 59 60 stats = c.statistics() 61 assert stats.borrowed_tokens == 1 62 assert stats.total_tokens == 2 63 assert stats.borrowers == [_core.current_task()] 64 assert stats.tasks_waiting == 0 65 66 # Can't re-acquire when we already have it 67 with pytest.raises(RuntimeError): 68 c.acquire_nowait() 69 assert c.borrowed_tokens == 1 70 with pytest.raises(RuntimeError): 71 await c.acquire() 72 assert c.borrowed_tokens == 1 73 74 # We can acquire on behalf of someone else though 75 with assert_checkpoints(): 76 await c.acquire_on_behalf_of("someone") 77 78 # But then we've run out of capacity 79 assert c.borrowed_tokens == 2 80 with pytest.raises(_core.WouldBlock): 81 c.acquire_on_behalf_of_nowait("third party") 82 83 assert set(c.statistics().borrowers) == {_core.current_task(), "someone"} 84 85 # Until we release one 86 c.release_on_behalf_of(_core.current_task()) 87 assert c.statistics().borrowers == ["someone"] 88 89 c.release_on_behalf_of("someone") 90 assert c.borrowed_tokens == 0 91 with assert_checkpoints(): 92 async with c: 93 assert c.borrowed_tokens == 1 94 95 async with _core.open_nursery() as nursery: 96 await c.acquire_on_behalf_of("value 1") 97 await c.acquire_on_behalf_of("value 2") 98 nursery.start_soon(c.acquire_on_behalf_of, "value 3") 99 await wait_all_tasks_blocked() 100 assert c.borrowed_tokens == 2 101 assert c.statistics().tasks_waiting == 1 102 c.release_on_behalf_of("value 2") 103 # Fairness: 104 assert c.borrowed_tokens == 2 105 with pytest.raises(_core.WouldBlock): 106 c.acquire_nowait() 107 108 c.release_on_behalf_of("value 3") 109 c.release_on_behalf_of("value 1") 110 111 112async def test_CapacityLimiter_inf(): 113 from math import inf 114 115 c = CapacityLimiter(inf) 116 repr(c) # smoke test 117 assert c.total_tokens == inf 118 assert c.borrowed_tokens == 0 119 assert c.available_tokens == inf 120 with pytest.raises(RuntimeError): 121 c.release() 122 assert c.borrowed_tokens == 0 123 c.acquire_nowait() 124 assert c.borrowed_tokens == 1 125 assert c.available_tokens == inf 126 127 128async def test_CapacityLimiter_change_total_tokens(): 129 c = CapacityLimiter(2) 130 131 with pytest.raises(TypeError): 132 c.total_tokens = 1.0 133 134 with pytest.raises(ValueError): 135 c.total_tokens = 0 136 137 with pytest.raises(ValueError): 138 c.total_tokens = -10 139 140 assert c.total_tokens == 2 141 142 async with _core.open_nursery() as nursery: 143 for i in range(5): 144 nursery.start_soon(c.acquire_on_behalf_of, i) 145 await wait_all_tasks_blocked() 146 assert set(c.statistics().borrowers) == {0, 1} 147 assert c.statistics().tasks_waiting == 3 148 c.total_tokens += 2 149 assert set(c.statistics().borrowers) == {0, 1, 2, 3} 150 assert c.statistics().tasks_waiting == 1 151 c.total_tokens -= 3 152 assert c.borrowed_tokens == 4 153 assert c.total_tokens == 1 154 c.release_on_behalf_of(0) 155 c.release_on_behalf_of(1) 156 c.release_on_behalf_of(2) 157 assert set(c.statistics().borrowers) == {3} 158 assert c.statistics().tasks_waiting == 1 159 c.release_on_behalf_of(3) 160 assert set(c.statistics().borrowers) == {4} 161 assert c.statistics().tasks_waiting == 0 162 163 164# regression test for issue #548 165async def test_CapacityLimiter_memleak_548(): 166 limiter = CapacityLimiter(total_tokens=1) 167 await limiter.acquire() 168 169 async with _core.open_nursery() as n: 170 n.start_soon(limiter.acquire) 171 await wait_all_tasks_blocked() # give it a chance to run the task 172 n.cancel_scope.cancel() 173 174 # if this is 1, the acquire call (despite being killed) is still there in the task, and will 175 # leak memory all the while the limiter is active 176 assert len(limiter._pending_borrowers) == 0 177 178 179async def test_Semaphore(): 180 with pytest.raises(TypeError): 181 Semaphore(1.0) 182 with pytest.raises(ValueError): 183 Semaphore(-1) 184 s = Semaphore(1) 185 repr(s) # smoke test 186 assert s.value == 1 187 assert s.max_value is None 188 s.release() 189 assert s.value == 2 190 assert s.statistics().tasks_waiting == 0 191 s.acquire_nowait() 192 assert s.value == 1 193 with assert_checkpoints(): 194 await s.acquire() 195 assert s.value == 0 196 with pytest.raises(_core.WouldBlock): 197 s.acquire_nowait() 198 199 s.release() 200 assert s.value == 1 201 with assert_checkpoints(): 202 async with s: 203 assert s.value == 0 204 assert s.value == 1 205 s.acquire_nowait() 206 207 record = [] 208 209 async def do_acquire(s): 210 record.append("started") 211 await s.acquire() 212 record.append("finished") 213 214 async with _core.open_nursery() as nursery: 215 nursery.start_soon(do_acquire, s) 216 await wait_all_tasks_blocked() 217 assert record == ["started"] 218 assert s.value == 0 219 s.release() 220 # Fairness: 221 assert s.value == 0 222 with pytest.raises(_core.WouldBlock): 223 s.acquire_nowait() 224 assert record == ["started", "finished"] 225 226 227async def test_Semaphore_bounded(): 228 with pytest.raises(TypeError): 229 Semaphore(1, max_value=1.0) 230 with pytest.raises(ValueError): 231 Semaphore(2, max_value=1) 232 bs = Semaphore(1, max_value=1) 233 assert bs.max_value == 1 234 repr(bs) # smoke test 235 with pytest.raises(ValueError): 236 bs.release() 237 assert bs.value == 1 238 bs.acquire_nowait() 239 assert bs.value == 0 240 bs.release() 241 assert bs.value == 1 242 243 244@pytest.mark.parametrize("lockcls", [Lock, StrictFIFOLock], ids=lambda fn: fn.__name__) 245async def test_Lock_and_StrictFIFOLock(lockcls): 246 l = lockcls() # noqa 247 assert not l.locked() 248 249 # make sure locks can be weakref'ed (gh-331) 250 r = weakref.ref(l) 251 assert r() is l 252 253 repr(l) # smoke test 254 # make sure repr uses the right name for subclasses 255 assert lockcls.__name__ in repr(l) 256 with assert_checkpoints(): 257 async with l: 258 assert l.locked() 259 repr(l) # smoke test (repr branches on locked/unlocked) 260 assert not l.locked() 261 l.acquire_nowait() 262 assert l.locked() 263 l.release() 264 assert not l.locked() 265 with assert_checkpoints(): 266 await l.acquire() 267 assert l.locked() 268 l.release() 269 assert not l.locked() 270 271 l.acquire_nowait() 272 with pytest.raises(RuntimeError): 273 # Error out if we already own the lock 274 l.acquire_nowait() 275 l.release() 276 with pytest.raises(RuntimeError): 277 # Error out if we don't own the lock 278 l.release() 279 280 holder_task = None 281 282 async def holder(): 283 nonlocal holder_task 284 holder_task = _core.current_task() 285 async with l: 286 await sleep_forever() 287 288 async with _core.open_nursery() as nursery: 289 assert not l.locked() 290 nursery.start_soon(holder) 291 await wait_all_tasks_blocked() 292 assert l.locked() 293 # WouldBlock if someone else holds the lock 294 with pytest.raises(_core.WouldBlock): 295 l.acquire_nowait() 296 # Can't release a lock someone else holds 297 with pytest.raises(RuntimeError): 298 l.release() 299 300 statistics = l.statistics() 301 print(statistics) 302 assert statistics.locked 303 assert statistics.owner is holder_task 304 assert statistics.tasks_waiting == 0 305 306 nursery.start_soon(holder) 307 await wait_all_tasks_blocked() 308 statistics = l.statistics() 309 print(statistics) 310 assert statistics.tasks_waiting == 1 311 312 nursery.cancel_scope.cancel() 313 314 statistics = l.statistics() 315 assert not statistics.locked 316 assert statistics.owner is None 317 assert statistics.tasks_waiting == 0 318 319 320async def test_Condition(): 321 with pytest.raises(TypeError): 322 Condition(Semaphore(1)) 323 with pytest.raises(TypeError): 324 Condition(StrictFIFOLock) 325 l = Lock() # noqa 326 c = Condition(l) 327 assert not l.locked() 328 assert not c.locked() 329 with assert_checkpoints(): 330 await c.acquire() 331 assert l.locked() 332 assert c.locked() 333 334 c = Condition() 335 assert not c.locked() 336 c.acquire_nowait() 337 assert c.locked() 338 with pytest.raises(RuntimeError): 339 c.acquire_nowait() 340 c.release() 341 342 with pytest.raises(RuntimeError): 343 # Can't wait without holding the lock 344 await c.wait() 345 with pytest.raises(RuntimeError): 346 # Can't notify without holding the lock 347 c.notify() 348 with pytest.raises(RuntimeError): 349 # Can't notify without holding the lock 350 c.notify_all() 351 352 finished_waiters = set() 353 354 async def waiter(i): 355 async with c: 356 await c.wait() 357 finished_waiters.add(i) 358 359 async with _core.open_nursery() as nursery: 360 for i in range(3): 361 nursery.start_soon(waiter, i) 362 await wait_all_tasks_blocked() 363 async with c: 364 c.notify() 365 assert c.locked() 366 await wait_all_tasks_blocked() 367 assert finished_waiters == {0} 368 async with c: 369 c.notify_all() 370 await wait_all_tasks_blocked() 371 assert finished_waiters == {0, 1, 2} 372 373 finished_waiters = set() 374 async with _core.open_nursery() as nursery: 375 for i in range(3): 376 nursery.start_soon(waiter, i) 377 await wait_all_tasks_blocked() 378 async with c: 379 c.notify(2) 380 statistics = c.statistics() 381 print(statistics) 382 assert statistics.tasks_waiting == 1 383 assert statistics.lock_statistics.tasks_waiting == 2 384 # exiting the context manager hands off the lock to the first task 385 assert c.statistics().lock_statistics.tasks_waiting == 1 386 387 await wait_all_tasks_blocked() 388 assert finished_waiters == {0, 1} 389 390 async with c: 391 c.notify_all() 392 393 # After being cancelled still hold the lock (!) 394 # (Note that c.__aexit__ checks that we hold the lock as well) 395 with _core.CancelScope() as scope: 396 async with c: 397 scope.cancel() 398 try: 399 await c.wait() 400 finally: 401 assert c.locked() 402 403 404from .._sync import async_cm 405from .._channel import open_memory_channel 406 407# Three ways of implementing a Lock in terms of a channel. Used to let us put 408# the channel through the generic lock tests. 409 410 411@async_cm 412class ChannelLock1: 413 def __init__(self, capacity): 414 self.s, self.r = open_memory_channel(capacity) 415 for _ in range(capacity - 1): 416 self.s.send_nowait(None) 417 418 def acquire_nowait(self): 419 self.s.send_nowait(None) 420 421 async def acquire(self): 422 await self.s.send(None) 423 424 def release(self): 425 self.r.receive_nowait() 426 427 428@async_cm 429class ChannelLock2: 430 def __init__(self): 431 self.s, self.r = open_memory_channel(10) 432 self.s.send_nowait(None) 433 434 def acquire_nowait(self): 435 self.r.receive_nowait() 436 437 async def acquire(self): 438 await self.r.receive() 439 440 def release(self): 441 self.s.send_nowait(None) 442 443 444@async_cm 445class ChannelLock3: 446 def __init__(self): 447 self.s, self.r = open_memory_channel(0) 448 # self.acquired is true when one task acquires the lock and 449 # only becomes false when it's released and no tasks are 450 # waiting to acquire. 451 self.acquired = False 452 453 def acquire_nowait(self): 454 assert not self.acquired 455 self.acquired = True 456 457 async def acquire(self): 458 if self.acquired: 459 await self.s.send(None) 460 else: 461 self.acquired = True 462 await _core.checkpoint() 463 464 def release(self): 465 try: 466 self.r.receive_nowait() 467 except _core.WouldBlock: 468 assert self.acquired 469 self.acquired = False 470 471 472lock_factories = [ 473 lambda: CapacityLimiter(1), 474 lambda: Semaphore(1), 475 Lock, 476 StrictFIFOLock, 477 lambda: ChannelLock1(10), 478 lambda: ChannelLock1(1), 479 ChannelLock2, 480 ChannelLock3, 481] 482lock_factory_names = [ 483 "CapacityLimiter(1)", 484 "Semaphore(1)", 485 "Lock", 486 "StrictFIFOLock", 487 "ChannelLock1(10)", 488 "ChannelLock1(1)", 489 "ChannelLock2", 490 "ChannelLock3", 491] 492 493generic_lock_test = pytest.mark.parametrize( 494 "lock_factory", lock_factories, ids=lock_factory_names 495) 496 497 498# Spawn a bunch of workers that take a lock and then yield; make sure that 499# only one worker is ever in the critical section at a time. 500@generic_lock_test 501async def test_generic_lock_exclusion(lock_factory): 502 LOOPS = 10 503 WORKERS = 5 504 in_critical_section = False 505 acquires = 0 506 507 async def worker(lock_like): 508 nonlocal in_critical_section, acquires 509 for _ in range(LOOPS): 510 async with lock_like: 511 acquires += 1 512 assert not in_critical_section 513 in_critical_section = True 514 await _core.checkpoint() 515 await _core.checkpoint() 516 assert in_critical_section 517 in_critical_section = False 518 519 async with _core.open_nursery() as nursery: 520 lock_like = lock_factory() 521 for _ in range(WORKERS): 522 nursery.start_soon(worker, lock_like) 523 assert not in_critical_section 524 assert acquires == LOOPS * WORKERS 525 526 527# Several workers queue on the same lock; make sure they each get it, in 528# order. 529@generic_lock_test 530async def test_generic_lock_fifo_fairness(lock_factory): 531 initial_order = [] 532 record = [] 533 LOOPS = 5 534 535 async def loopy(name, lock_like): 536 # Record the order each task was initially scheduled in 537 initial_order.append(name) 538 for _ in range(LOOPS): 539 async with lock_like: 540 record.append(name) 541 542 lock_like = lock_factory() 543 async with _core.open_nursery() as nursery: 544 nursery.start_soon(loopy, 1, lock_like) 545 nursery.start_soon(loopy, 2, lock_like) 546 nursery.start_soon(loopy, 3, lock_like) 547 # The first three could be in any order due to scheduling randomness, 548 # but after that they should repeat in the same order 549 for i in range(LOOPS): 550 assert record[3 * i : 3 * (i + 1)] == initial_order 551 552 553@generic_lock_test 554async def test_generic_lock_acquire_nowait_blocks_acquire(lock_factory): 555 lock_like = lock_factory() 556 557 record = [] 558 559 async def lock_taker(): 560 record.append("started") 561 async with lock_like: 562 pass 563 record.append("finished") 564 565 async with _core.open_nursery() as nursery: 566 lock_like.acquire_nowait() 567 nursery.start_soon(lock_taker) 568 await wait_all_tasks_blocked() 569 assert record == ["started"] 570 lock_like.release() 571