1 use crate::runtime::tests::loom_oneshot as oneshot;
2 use crate::runtime::{self, Runtime};
3 use crate::spawn;
4 
5 use loom::sync::atomic::{AtomicBool, AtomicUsize};
6 use loom::sync::{Arc, Mutex};
7 
8 use std::future::Future;
9 use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
10 
11 #[test]
pool_multi_spawn()12 fn pool_multi_spawn() {
13     loom::model(|| {
14         let pool = mk_pool(2);
15         let c1 = Arc::new(AtomicUsize::new(0));
16 
17         let (tx, rx) = oneshot::channel();
18         let tx1 = Arc::new(Mutex::new(Some(tx)));
19 
20         // Spawn a task
21         let c2 = c1.clone();
22         let tx2 = tx1.clone();
23         pool.spawn(async move {
24             spawn(async move {
25                 if 1 == c1.fetch_add(1, Relaxed) {
26                     tx1.lock().unwrap().take().unwrap().send(());
27                 }
28             });
29         });
30 
31         // Spawn a second task
32         pool.spawn(async move {
33             spawn(async move {
34                 if 1 == c2.fetch_add(1, Relaxed) {
35                     tx2.lock().unwrap().take().unwrap().send(());
36                 }
37             });
38         });
39 
40         rx.recv();
41     });
42 }
43 
only_blocking_inner(first_pending: bool)44 fn only_blocking_inner(first_pending: bool) {
45     loom::model(move || {
46         let pool = mk_pool(1);
47         let (block_tx, block_rx) = oneshot::channel();
48 
49         pool.spawn(async move {
50             crate::task::block_in_place(move || {
51                 block_tx.send(());
52             });
53             if first_pending {
54                 yield_once().await
55             }
56         });
57 
58         block_rx.recv();
59         drop(pool);
60     });
61 }
62 
63 #[test]
only_blocking()64 fn only_blocking() {
65     only_blocking_inner(false)
66 }
67 
68 #[test]
only_blocking_with_pending()69 fn only_blocking_with_pending() {
70     only_blocking_inner(true)
71 }
72 
blocking_and_regular_inner(first_pending: bool)73 fn blocking_and_regular_inner(first_pending: bool) {
74     const NUM: usize = 3;
75     loom::model(move || {
76         let pool = mk_pool(1);
77         let cnt = Arc::new(AtomicUsize::new(0));
78 
79         let (block_tx, block_rx) = oneshot::channel();
80         let (done_tx, done_rx) = oneshot::channel();
81         let done_tx = Arc::new(Mutex::new(Some(done_tx)));
82 
83         pool.spawn(async move {
84             crate::task::block_in_place(move || {
85                 block_tx.send(());
86             });
87             if first_pending {
88                 yield_once().await
89             }
90         });
91 
92         for _ in 0..NUM {
93             let cnt = cnt.clone();
94             let done_tx = done_tx.clone();
95 
96             pool.spawn(async move {
97                 if NUM == cnt.fetch_add(1, Relaxed) + 1 {
98                     done_tx.lock().unwrap().take().unwrap().send(());
99                 }
100             });
101         }
102 
103         done_rx.recv();
104         block_rx.recv();
105 
106         drop(pool);
107     });
108 }
109 
110 #[test]
blocking_and_regular()111 fn blocking_and_regular() {
112     blocking_and_regular_inner(false);
113 }
114 
115 #[test]
blocking_and_regular_with_pending()116 fn blocking_and_regular_with_pending() {
117     blocking_and_regular_inner(true);
118 }
119 
120 #[test]
pool_multi_notify()121 fn pool_multi_notify() {
122     loom::model(|| {
123         let pool = mk_pool(2);
124 
125         let c1 = Arc::new(AtomicUsize::new(0));
126 
127         let (done_tx, done_rx) = oneshot::channel();
128         let done_tx1 = Arc::new(Mutex::new(Some(done_tx)));
129 
130         // Spawn a task
131         let c2 = c1.clone();
132         let done_tx2 = done_tx1.clone();
133         pool.spawn(async move {
134             gated().await;
135             gated().await;
136 
137             if 1 == c1.fetch_add(1, Relaxed) {
138                 done_tx1.lock().unwrap().take().unwrap().send(());
139             }
140         });
141 
142         // Spawn a second task
143         pool.spawn(async move {
144             gated().await;
145             gated().await;
146 
147             if 1 == c2.fetch_add(1, Relaxed) {
148                 done_tx2.lock().unwrap().take().unwrap().send(());
149             }
150         });
151 
152         done_rx.recv();
153     });
154 }
155 
156 #[test]
pool_shutdown()157 fn pool_shutdown() {
158     loom::model(|| {
159         let pool = mk_pool(2);
160 
161         pool.spawn(async move {
162             gated2(true).await;
163         });
164 
165         pool.spawn(async move {
166             gated2(false).await;
167         });
168 
169         drop(pool);
170     });
171 }
172 
173 #[test]
complete_block_on_under_load()174 fn complete_block_on_under_load() {
175     use futures::FutureExt;
176 
177     loom::model(|| {
178         let mut pool = mk_pool(2);
179 
180         pool.block_on({
181             futures::future::lazy(|_| ()).then(|_| {
182                 // Spin hard
183                 crate::spawn(async {
184                     for _ in 0..2 {
185                         yield_once().await;
186                     }
187                 });
188 
189                 gated2(true)
190             })
191         });
192     });
193 }
194 
195 #[test]
shutdown_with_notification()196 fn shutdown_with_notification() {
197     use crate::stream::StreamExt;
198     use crate::sync::{mpsc, oneshot};
199 
200     loom::model(|| {
201         let rt = mk_pool(2);
202         let (done_tx, done_rx) = oneshot::channel::<()>();
203 
204         rt.spawn(async move {
205             let (mut tx, mut rx) = mpsc::channel::<()>(10);
206 
207             crate::spawn(async move {
208                 crate::task::spawn_blocking(move || {
209                     let _ = tx.try_send(());
210                 });
211 
212                 let _ = done_rx.await;
213             });
214 
215             while let Some(_) = rx.next().await {}
216 
217             let _ = done_tx.send(());
218         });
219     });
220 }
221 
mk_pool(num_threads: usize) -> Runtime222 fn mk_pool(num_threads: usize) -> Runtime {
223     runtime::Builder::new()
224         .threaded_scheduler()
225         .core_threads(num_threads)
226         .build()
227         .unwrap()
228 }
229 
230 use futures::future::poll_fn;
231 use std::task::Poll;
yield_once()232 async fn yield_once() {
233     let mut yielded = false;
234     poll_fn(|cx| {
235         if yielded {
236             Poll::Ready(())
237         } else {
238             loom::thread::yield_now();
239             yielded = true;
240             cx.waker().wake_by_ref();
241             Poll::Pending
242         }
243     })
244     .await
245 }
246 
gated() -> impl Future<Output = &'static str>247 fn gated() -> impl Future<Output = &'static str> {
248     gated2(false)
249 }
250 
gated2(thread: bool) -> impl Future<Output = &'static str>251 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
252     use loom::thread;
253     use std::sync::Arc;
254 
255     let gate = Arc::new(AtomicBool::new(false));
256     let mut fired = false;
257 
258     poll_fn(move |cx| {
259         if !fired {
260             let gate = gate.clone();
261             let waker = cx.waker().clone();
262 
263             if thread {
264                 thread::spawn(move || {
265                     gate.store(true, Release);
266                     waker.wake_by_ref();
267                 });
268             } else {
269                 spawn(async move {
270                     gate.store(true, Release);
271                     waker.wake_by_ref();
272                 });
273             }
274 
275             fired = true;
276 
277             return Poll::Pending;
278         }
279 
280         if gate.load(Acquire) {
281             Poll::Ready("hello world")
282         } else {
283             Poll::Pending
284         }
285     })
286 }
287