1import multiprocessing
2import threading
3import sys
4
5from multiprocessing.managers import BaseManager
6
7import pytest
8
9Stash = pytest.importorskip("wptserve.stash").Stash
10
11@pytest.fixture()
12def add_cleanup():
13    fns = []
14
15    def add(fn):
16        fns.append(fn)
17
18    yield add
19
20    for fn in fns:
21        fn()
22
23
24def run(process_queue, request_lock, response_lock):
25    """Create two Stash instances in parallel threads. Use the provided locks
26    to ensure the first thread is actively establishing an interprocess
27    communication channel at the moment the second thread executes."""
28
29    def target(thread_queue):
30        stash = Stash("/", ("localhost", 4543), b"some key")
31
32        # The `lock` property of the Stash instance should always be set
33        # immediately following initialization. These values are asserted in
34        # the active test.
35        thread_queue.put(stash.lock is None)
36
37    thread_queue = multiprocessing.Queue()
38    first = threading.Thread(target=target, args=(thread_queue,))
39    second = threading.Thread(target=target, args=(thread_queue,))
40
41    request_lock.acquire()
42    response_lock.acquire()
43    first.start()
44
45    request_lock.acquire()
46
47    # At this moment, the `first` thread is waiting for a proxied object.
48    # Create a second thread in order to inspect the behavior of the Stash
49    # constructor at this moment.
50
51    second.start()
52
53    # Allow the `first` thread to proceed
54
55    response_lock.release()
56
57    # Wait for both threads to complete and report their stateto the test
58    process_queue.put(thread_queue.get())
59    process_queue.put(thread_queue.get())
60
61
62class SlowLock(BaseManager):
63    # This can only be used in test_delayed_lock since that test modifies the
64    # class body, but it has to be a global for multiprocessing
65    pass
66
67
68@pytest.mark.xfail(sys.platform == "win32" or
69                   multiprocessing.get_start_method() == "spawn",
70                   reason="https://github.com/web-platform-tests/wpt/issues/16938")
71def test_delayed_lock(add_cleanup):
72    """Ensure that delays in proxied Lock retrieval do not interfere with
73    initialization in parallel threads."""
74
75    request_lock = multiprocessing.Lock()
76    response_lock = multiprocessing.Lock()
77
78    queue = multiprocessing.Queue()
79
80    def mutex_lock_request():
81        """This request handler allows the caller to delay execution of a
82        thread which has requested a proxied representation of the `lock`
83        property, simulating a "slow" interprocess communication channel."""
84
85        request_lock.release()
86        response_lock.acquire()
87        return threading.Lock()
88
89    SlowLock.register("get_dict", callable=lambda: {})
90    SlowLock.register("Lock", callable=mutex_lock_request)
91
92    slowlock = SlowLock(("localhost", 4543), b"some key")
93    slowlock.start()
94    add_cleanup(lambda: slowlock.shutdown())
95
96    parallel = multiprocessing.Process(target=run,
97                                       args=(queue, request_lock, response_lock))
98    parallel.start()
99    add_cleanup(lambda: parallel.terminate())
100
101    assert [queue.get(), queue.get()] == [False, False], (
102        "both instances had valid locks")
103
104
105class SlowDict(BaseManager):
106    # This can only be used in test_delayed_dict since that test modifies the
107    # class body, but it has to be a global for multiprocessing
108    pass
109
110
111@pytest.mark.xfail(sys.platform == "win32" or
112                   multiprocessing.get_start_method() == "spawn",
113                   reason="https://github.com/web-platform-tests/wpt/issues/16938")
114def test_delayed_dict(add_cleanup):
115    """Ensure that delays in proxied `dict` retrieval do not interfere with
116    initialization in parallel threads."""
117
118    request_lock = multiprocessing.Lock()
119    response_lock = multiprocessing.Lock()
120
121    queue = multiprocessing.Queue()
122
123    # This request handler allows the caller to delay execution of a thread
124    # which has requested a proxied representation of the "get_dict" property.
125    def mutex_dict_request():
126        """This request handler allows the caller to delay execution of a
127        thread which has requested a proxied representation of the `get_dict`
128        property, simulating a "slow" interprocess communication channel."""
129        request_lock.release()
130        response_lock.acquire()
131        return {}
132
133    SlowDict.register("get_dict", callable=mutex_dict_request)
134    SlowDict.register("Lock", callable=lambda: threading.Lock())
135
136    slowdict = SlowDict(("localhost", 4543), b"some key")
137    slowdict.start()
138    add_cleanup(lambda: slowdict.shutdown())
139
140    parallel = multiprocessing.Process(target=run,
141                                       args=(queue, request_lock, response_lock))
142    parallel.start()
143    add_cleanup(lambda: parallel.terminate())
144
145    assert [queue.get(), queue.get()] == [False, False], (
146        "both instances had valid locks")
147