1 /// Full runtime loom tests. These are heavy tests and take significant time to
2 /// run on CI.
3 ///
4 /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
5 ///
6 /// In order to speed up the C
7 use crate::future::poll_fn;
8 use crate::runtime::tests::loom_oneshot as oneshot;
9 use crate::runtime::{self, Runtime};
10 use crate::{spawn, task};
11 use tokio_test::assert_ok;
12 
13 use loom::sync::atomic::{AtomicBool, AtomicUsize};
14 use loom::sync::Arc;
15 
16 use pin_project_lite::pin_project;
17 use std::future::Future;
18 use std::pin::Pin;
19 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
20 use std::task::{Context, Poll};
21 
22 mod atomic_take {
23     use loom::sync::atomic::AtomicBool;
24     use std::mem::MaybeUninit;
25     use std::sync::atomic::Ordering::SeqCst;
26 
27     pub(super) struct AtomicTake<T> {
28         inner: MaybeUninit<T>,
29         taken: AtomicBool,
30     }
31 
32     impl<T> AtomicTake<T> {
new(value: T) -> Self33         pub(super) fn new(value: T) -> Self {
34             Self {
35                 inner: MaybeUninit::new(value),
36                 taken: AtomicBool::new(false),
37             }
38         }
39 
take(&self) -> Option<T>40         pub(super) fn take(&self) -> Option<T> {
41             // safety: Only one thread will see the boolean change from false
42             // to true, so that thread is able to take the value.
43             match self.taken.fetch_or(true, SeqCst) {
44                 false => unsafe { Some(std::ptr::read(self.inner.as_ptr())) },
45                 true => None,
46             }
47         }
48     }
49 
50     impl<T> Drop for AtomicTake<T> {
drop(&mut self)51         fn drop(&mut self) {
52             drop(self.take());
53         }
54     }
55 }
56 
57 #[derive(Clone)]
58 struct AtomicOneshot<T> {
59     value: std::sync::Arc<atomic_take::AtomicTake<oneshot::Sender<T>>>,
60 }
61 impl<T> AtomicOneshot<T> {
new(sender: oneshot::Sender<T>) -> Self62     fn new(sender: oneshot::Sender<T>) -> Self {
63         Self {
64             value: std::sync::Arc::new(atomic_take::AtomicTake::new(sender)),
65         }
66     }
67 
assert_send(&self, value: T)68     fn assert_send(&self, value: T) {
69         self.value.take().unwrap().send(value);
70     }
71 }
72 
73 /// Tests are divided into groups to make the runs faster on CI.
74 mod group_a {
75     use super::*;
76 
77     #[test]
racy_shutdown()78     fn racy_shutdown() {
79         loom::model(|| {
80             let pool = mk_pool(1);
81 
82             // here's the case we want to exercise:
83             //
84             // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
85             // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
86             // shutdown method never gets run.
87             //
88             // we do this by spawning two tasks on one worker, the first of which does block_in_place,
89             // and then immediately drop the pool.
90 
91             pool.spawn(track(async {
92                 crate::task::block_in_place(|| {});
93             }));
94             pool.spawn(track(async {}));
95             drop(pool);
96         });
97     }
98 
99     #[test]
pool_multi_spawn()100     fn pool_multi_spawn() {
101         loom::model(|| {
102             let pool = mk_pool(2);
103             let c1 = Arc::new(AtomicUsize::new(0));
104 
105             let (tx, rx) = oneshot::channel();
106             let tx1 = AtomicOneshot::new(tx);
107 
108             // Spawn a task
109             let c2 = c1.clone();
110             let tx2 = tx1.clone();
111             pool.spawn(track(async move {
112                 spawn(track(async move {
113                     if 1 == c1.fetch_add(1, Relaxed) {
114                         tx1.assert_send(());
115                     }
116                 }));
117             }));
118 
119             // Spawn a second task
120             pool.spawn(track(async move {
121                 spawn(track(async move {
122                     if 1 == c2.fetch_add(1, Relaxed) {
123                         tx2.assert_send(());
124                     }
125                 }));
126             }));
127 
128             rx.recv();
129         });
130     }
131 
only_blocking_inner(first_pending: bool)132     fn only_blocking_inner(first_pending: bool) {
133         loom::model(move || {
134             let pool = mk_pool(1);
135             let (block_tx, block_rx) = oneshot::channel();
136 
137             pool.spawn(track(async move {
138                 crate::task::block_in_place(move || {
139                     block_tx.send(());
140                 });
141                 if first_pending {
142                     task::yield_now().await
143                 }
144             }));
145 
146             block_rx.recv();
147             drop(pool);
148         });
149     }
150 
151     #[test]
only_blocking_without_pending()152     fn only_blocking_without_pending() {
153         only_blocking_inner(false)
154     }
155 
156     #[test]
only_blocking_with_pending()157     fn only_blocking_with_pending() {
158         only_blocking_inner(true)
159     }
160 }
161 
162 mod group_b {
163     use super::*;
164 
blocking_and_regular_inner(first_pending: bool)165     fn blocking_and_regular_inner(first_pending: bool) {
166         const NUM: usize = 3;
167         loom::model(move || {
168             let pool = mk_pool(1);
169             let cnt = Arc::new(AtomicUsize::new(0));
170 
171             let (block_tx, block_rx) = oneshot::channel();
172             let (done_tx, done_rx) = oneshot::channel();
173             let done_tx = AtomicOneshot::new(done_tx);
174 
175             pool.spawn(track(async move {
176                 crate::task::block_in_place(move || {
177                     block_tx.send(());
178                 });
179                 if first_pending {
180                     task::yield_now().await
181                 }
182             }));
183 
184             for _ in 0..NUM {
185                 let cnt = cnt.clone();
186                 let done_tx = done_tx.clone();
187 
188                 pool.spawn(track(async move {
189                     if NUM == cnt.fetch_add(1, Relaxed) + 1 {
190                         done_tx.assert_send(());
191                     }
192                 }));
193             }
194 
195             done_rx.recv();
196             block_rx.recv();
197 
198             drop(pool);
199         });
200     }
201 
202     #[test]
blocking_and_regular()203     fn blocking_and_regular() {
204         blocking_and_regular_inner(false);
205     }
206 
207     #[test]
blocking_and_regular_with_pending()208     fn blocking_and_regular_with_pending() {
209         blocking_and_regular_inner(true);
210     }
211 
212     #[test]
join_output()213     fn join_output() {
214         loom::model(|| {
215             let rt = mk_pool(1);
216 
217             rt.block_on(async {
218                 let t = crate::spawn(track(async { "hello" }));
219 
220                 let out = assert_ok!(t.await);
221                 assert_eq!("hello", out.into_inner());
222             });
223         });
224     }
225 
226     #[test]
poll_drop_handle_then_drop()227     fn poll_drop_handle_then_drop() {
228         loom::model(|| {
229             let rt = mk_pool(1);
230 
231             rt.block_on(async move {
232                 let mut t = crate::spawn(track(async { "hello" }));
233 
234                 poll_fn(|cx| {
235                     let _ = Pin::new(&mut t).poll(cx);
236                     Poll::Ready(())
237                 })
238                 .await;
239             });
240         })
241     }
242 
243     #[test]
complete_block_on_under_load()244     fn complete_block_on_under_load() {
245         loom::model(|| {
246             let pool = mk_pool(1);
247 
248             pool.block_on(async {
249                 // Trigger a re-schedule
250                 crate::spawn(track(async {
251                     for _ in 0..2 {
252                         task::yield_now().await;
253                     }
254                 }));
255 
256                 gated2(true).await
257             });
258         });
259     }
260 
261     #[test]
shutdown_with_notification()262     fn shutdown_with_notification() {
263         use crate::sync::oneshot;
264 
265         loom::model(|| {
266             let rt = mk_pool(2);
267             let (done_tx, done_rx) = oneshot::channel::<()>();
268 
269             rt.spawn(track(async move {
270                 let (tx, rx) = oneshot::channel::<()>();
271 
272                 crate::spawn(async move {
273                     crate::task::spawn_blocking(move || {
274                         let _ = tx.send(());
275                     });
276 
277                     let _ = done_rx.await;
278                 });
279 
280                 let _ = rx.await;
281 
282                 let _ = done_tx.send(());
283             }));
284         });
285     }
286 }
287 
288 mod group_c {
289     use super::*;
290 
291     #[test]
pool_shutdown()292     fn pool_shutdown() {
293         loom::model(|| {
294             let pool = mk_pool(2);
295 
296             pool.spawn(track(async move {
297                 gated2(true).await;
298             }));
299 
300             pool.spawn(track(async move {
301                 gated2(false).await;
302             }));
303 
304             drop(pool);
305         });
306     }
307 }
308 
309 mod group_d {
310     use super::*;
311 
312     #[test]
pool_multi_notify()313     fn pool_multi_notify() {
314         loom::model(|| {
315             let pool = mk_pool(2);
316 
317             let c1 = Arc::new(AtomicUsize::new(0));
318 
319             let (done_tx, done_rx) = oneshot::channel();
320             let done_tx1 = AtomicOneshot::new(done_tx);
321             let done_tx2 = done_tx1.clone();
322 
323             // Spawn a task
324             let c2 = c1.clone();
325             pool.spawn(track(async move {
326                 gated().await;
327                 gated().await;
328 
329                 if 1 == c1.fetch_add(1, Relaxed) {
330                     done_tx1.assert_send(());
331                 }
332             }));
333 
334             // Spawn a second task
335             pool.spawn(track(async move {
336                 gated().await;
337                 gated().await;
338 
339                 if 1 == c2.fetch_add(1, Relaxed) {
340                     done_tx2.assert_send(());
341                 }
342             }));
343 
344             done_rx.recv();
345         });
346     }
347 }
348 
mk_pool(num_threads: usize) -> Runtime349 fn mk_pool(num_threads: usize) -> Runtime {
350     runtime::Builder::new_multi_thread()
351         .worker_threads(num_threads)
352         .build()
353         .unwrap()
354 }
355 
gated() -> impl Future<Output = &'static str>356 fn gated() -> impl Future<Output = &'static str> {
357     gated2(false)
358 }
359 
gated2(thread: bool) -> impl Future<Output = &'static str>360 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
361     use loom::thread;
362     use std::sync::Arc;
363 
364     let gate = Arc::new(AtomicBool::new(false));
365     let mut fired = false;
366 
367     poll_fn(move |cx| {
368         if !fired {
369             let gate = gate.clone();
370             let waker = cx.waker().clone();
371 
372             if thread {
373                 thread::spawn(move || {
374                     gate.store(true, SeqCst);
375                     waker.wake_by_ref();
376                 });
377             } else {
378                 spawn(track(async move {
379                     gate.store(true, SeqCst);
380                     waker.wake_by_ref();
381                 }));
382             }
383 
384             fired = true;
385 
386             return Poll::Pending;
387         }
388 
389         if gate.load(SeqCst) {
390             Poll::Ready("hello world")
391         } else {
392             Poll::Pending
393         }
394     })
395 }
396 
track<T: Future>(f: T) -> Track<T>397 fn track<T: Future>(f: T) -> Track<T> {
398     Track {
399         inner: f,
400         arc: Arc::new(()),
401     }
402 }
403 
404 pin_project! {
405     struct Track<T> {
406         #[pin]
407         inner: T,
408         // Arc is used to hook into loom's leak tracking.
409         arc: Arc<()>,
410     }
411 }
412 
413 impl<T> Track<T> {
into_inner(self) -> T414     fn into_inner(self) -> T {
415         self.inner
416     }
417 }
418 
419 impl<T: Future> Future for Track<T> {
420     type Output = Track<T::Output>;
421 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>422     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
423         let me = self.project();
424 
425         Poll::Ready(Track {
426             inner: ready!(me.inner.poll(cx)),
427             arc: me.arc.clone(),
428         })
429     }
430 }
431