1import multiprocessing 2import threading 3import sys 4 5from multiprocessing.managers import BaseManager 6 7import pytest 8 9Stash = pytest.importorskip("wptserve.stash").Stash 10 11@pytest.fixture() 12def add_cleanup(): 13 fns = [] 14 15 def add(fn): 16 fns.append(fn) 17 18 yield add 19 20 for fn in fns: 21 fn() 22 23 24def run(process_queue, request_lock, response_lock): 25 """Create two Stash instances in parallel threads. Use the provided locks 26 to ensure the first thread is actively establishing an interprocess 27 communication channel at the moment the second thread executes.""" 28 29 def target(thread_queue): 30 stash = Stash("/", ("localhost", 4543), b"some key") 31 32 # The `lock` property of the Stash instance should always be set 33 # immediately following initialization. These values are asserted in 34 # the active test. 35 thread_queue.put(stash.lock is None) 36 37 thread_queue = multiprocessing.Queue() 38 first = threading.Thread(target=target, args=(thread_queue,)) 39 second = threading.Thread(target=target, args=(thread_queue,)) 40 41 request_lock.acquire() 42 response_lock.acquire() 43 first.start() 44 45 request_lock.acquire() 46 47 # At this moment, the `first` thread is waiting for a proxied object. 48 # Create a second thread in order to inspect the behavior of the Stash 49 # constructor at this moment. 50 51 second.start() 52 53 # Allow the `first` thread to proceed 54 55 response_lock.release() 56 57 # Wait for both threads to complete and report their stateto the test 58 process_queue.put(thread_queue.get()) 59 process_queue.put(thread_queue.get()) 60 61 62class SlowLock(BaseManager): 63 # This can only be used in test_delayed_lock since that test modifies the 64 # class body, but it has to be a global for multiprocessing 65 pass 66 67 68@pytest.mark.xfail(sys.platform == "win32" or 69 multiprocessing.get_start_method() == "spawn", 70 reason="https://github.com/web-platform-tests/wpt/issues/16938") 71def test_delayed_lock(add_cleanup): 72 """Ensure that delays in proxied Lock retrieval do not interfere with 73 initialization in parallel threads.""" 74 75 request_lock = multiprocessing.Lock() 76 response_lock = multiprocessing.Lock() 77 78 queue = multiprocessing.Queue() 79 80 def mutex_lock_request(): 81 """This request handler allows the caller to delay execution of a 82 thread which has requested a proxied representation of the `lock` 83 property, simulating a "slow" interprocess communication channel.""" 84 85 request_lock.release() 86 response_lock.acquire() 87 return threading.Lock() 88 89 SlowLock.register("get_dict", callable=lambda: {}) 90 SlowLock.register("Lock", callable=mutex_lock_request) 91 92 slowlock = SlowLock(("localhost", 4543), b"some key") 93 slowlock.start() 94 add_cleanup(lambda: slowlock.shutdown()) 95 96 parallel = multiprocessing.Process(target=run, 97 args=(queue, request_lock, response_lock)) 98 parallel.start() 99 add_cleanup(lambda: parallel.terminate()) 100 101 assert [queue.get(), queue.get()] == [False, False], ( 102 "both instances had valid locks") 103 104 105class SlowDict(BaseManager): 106 # This can only be used in test_delayed_dict since that test modifies the 107 # class body, but it has to be a global for multiprocessing 108 pass 109 110 111@pytest.mark.xfail(sys.platform == "win32" or 112 multiprocessing.get_start_method() == "spawn", 113 reason="https://github.com/web-platform-tests/wpt/issues/16938") 114def test_delayed_dict(add_cleanup): 115 """Ensure that delays in proxied `dict` retrieval do not interfere with 116 initialization in parallel threads.""" 117 118 request_lock = multiprocessing.Lock() 119 response_lock = multiprocessing.Lock() 120 121 queue = multiprocessing.Queue() 122 123 # This request handler allows the caller to delay execution of a thread 124 # which has requested a proxied representation of the "get_dict" property. 125 def mutex_dict_request(): 126 """This request handler allows the caller to delay execution of a 127 thread which has requested a proxied representation of the `get_dict` 128 property, simulating a "slow" interprocess communication channel.""" 129 request_lock.release() 130 response_lock.acquire() 131 return {} 132 133 SlowDict.register("get_dict", callable=mutex_dict_request) 134 SlowDict.register("Lock", callable=lambda: threading.Lock()) 135 136 slowdict = SlowDict(("localhost", 4543), b"some key") 137 slowdict.start() 138 add_cleanup(lambda: slowdict.shutdown()) 139 140 parallel = multiprocessing.Process(target=run, 141 args=(queue, request_lock, response_lock)) 142 parallel.start() 143 add_cleanup(lambda: parallel.terminate()) 144 145 assert [queue.get(), queue.get()] == [False, False], ( 146 "both instances had valid locks") 147