1 use crate::runtime::tests::loom_oneshot as oneshot;
2 use crate::runtime::{self, Runtime};
3 use crate::spawn;
4
5 use loom::sync::atomic::{AtomicBool, AtomicUsize};
6 use loom::sync::{Arc, Mutex};
7
8 use std::future::Future;
9 use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
10
11 #[test]
pool_multi_spawn()12 fn pool_multi_spawn() {
13 loom::model(|| {
14 let pool = mk_pool(2);
15 let c1 = Arc::new(AtomicUsize::new(0));
16
17 let (tx, rx) = oneshot::channel();
18 let tx1 = Arc::new(Mutex::new(Some(tx)));
19
20 // Spawn a task
21 let c2 = c1.clone();
22 let tx2 = tx1.clone();
23 pool.spawn(async move {
24 spawn(async move {
25 if 1 == c1.fetch_add(1, Relaxed) {
26 tx1.lock().unwrap().take().unwrap().send(());
27 }
28 });
29 });
30
31 // Spawn a second task
32 pool.spawn(async move {
33 spawn(async move {
34 if 1 == c2.fetch_add(1, Relaxed) {
35 tx2.lock().unwrap().take().unwrap().send(());
36 }
37 });
38 });
39
40 rx.recv();
41 });
42 }
43
only_blocking_inner(first_pending: bool)44 fn only_blocking_inner(first_pending: bool) {
45 loom::model(move || {
46 let pool = mk_pool(1);
47 let (block_tx, block_rx) = oneshot::channel();
48
49 pool.spawn(async move {
50 crate::task::block_in_place(move || {
51 block_tx.send(());
52 });
53 if first_pending {
54 yield_once().await
55 }
56 });
57
58 block_rx.recv();
59 drop(pool);
60 });
61 }
62
63 #[test]
only_blocking()64 fn only_blocking() {
65 only_blocking_inner(false)
66 }
67
68 #[test]
only_blocking_with_pending()69 fn only_blocking_with_pending() {
70 only_blocking_inner(true)
71 }
72
blocking_and_regular_inner(first_pending: bool)73 fn blocking_and_regular_inner(first_pending: bool) {
74 const NUM: usize = 3;
75 loom::model(move || {
76 let pool = mk_pool(1);
77 let cnt = Arc::new(AtomicUsize::new(0));
78
79 let (block_tx, block_rx) = oneshot::channel();
80 let (done_tx, done_rx) = oneshot::channel();
81 let done_tx = Arc::new(Mutex::new(Some(done_tx)));
82
83 pool.spawn(async move {
84 crate::task::block_in_place(move || {
85 block_tx.send(());
86 });
87 if first_pending {
88 yield_once().await
89 }
90 });
91
92 for _ in 0..NUM {
93 let cnt = cnt.clone();
94 let done_tx = done_tx.clone();
95
96 pool.spawn(async move {
97 if NUM == cnt.fetch_add(1, Relaxed) + 1 {
98 done_tx.lock().unwrap().take().unwrap().send(());
99 }
100 });
101 }
102
103 done_rx.recv();
104 block_rx.recv();
105
106 drop(pool);
107 });
108 }
109
110 #[test]
blocking_and_regular()111 fn blocking_and_regular() {
112 blocking_and_regular_inner(false);
113 }
114
115 #[test]
blocking_and_regular_with_pending()116 fn blocking_and_regular_with_pending() {
117 blocking_and_regular_inner(true);
118 }
119
120 #[test]
pool_multi_notify()121 fn pool_multi_notify() {
122 loom::model(|| {
123 let pool = mk_pool(2);
124
125 let c1 = Arc::new(AtomicUsize::new(0));
126
127 let (done_tx, done_rx) = oneshot::channel();
128 let done_tx1 = Arc::new(Mutex::new(Some(done_tx)));
129
130 // Spawn a task
131 let c2 = c1.clone();
132 let done_tx2 = done_tx1.clone();
133 pool.spawn(async move {
134 gated().await;
135 gated().await;
136
137 if 1 == c1.fetch_add(1, Relaxed) {
138 done_tx1.lock().unwrap().take().unwrap().send(());
139 }
140 });
141
142 // Spawn a second task
143 pool.spawn(async move {
144 gated().await;
145 gated().await;
146
147 if 1 == c2.fetch_add(1, Relaxed) {
148 done_tx2.lock().unwrap().take().unwrap().send(());
149 }
150 });
151
152 done_rx.recv();
153 });
154 }
155
156 #[test]
pool_shutdown()157 fn pool_shutdown() {
158 loom::model(|| {
159 let pool = mk_pool(2);
160
161 pool.spawn(async move {
162 gated2(true).await;
163 });
164
165 pool.spawn(async move {
166 gated2(false).await;
167 });
168
169 drop(pool);
170 });
171 }
172
173 #[test]
complete_block_on_under_load()174 fn complete_block_on_under_load() {
175 use futures::FutureExt;
176
177 loom::model(|| {
178 let mut pool = mk_pool(2);
179
180 pool.block_on({
181 futures::future::lazy(|_| ()).then(|_| {
182 // Spin hard
183 crate::spawn(async {
184 for _ in 0..2 {
185 yield_once().await;
186 }
187 });
188
189 gated2(true)
190 })
191 });
192 });
193 }
194
195 #[test]
shutdown_with_notification()196 fn shutdown_with_notification() {
197 use crate::stream::StreamExt;
198 use crate::sync::{mpsc, oneshot};
199
200 loom::model(|| {
201 let rt = mk_pool(2);
202 let (done_tx, done_rx) = oneshot::channel::<()>();
203
204 rt.spawn(async move {
205 let (mut tx, mut rx) = mpsc::channel::<()>(10);
206
207 crate::spawn(async move {
208 crate::task::spawn_blocking(move || {
209 let _ = tx.try_send(());
210 });
211
212 let _ = done_rx.await;
213 });
214
215 while let Some(_) = rx.next().await {}
216
217 let _ = done_tx.send(());
218 });
219 });
220 }
221
mk_pool(num_threads: usize) -> Runtime222 fn mk_pool(num_threads: usize) -> Runtime {
223 runtime::Builder::new()
224 .threaded_scheduler()
225 .core_threads(num_threads)
226 .build()
227 .unwrap()
228 }
229
230 use futures::future::poll_fn;
231 use std::task::Poll;
yield_once()232 async fn yield_once() {
233 let mut yielded = false;
234 poll_fn(|cx| {
235 if yielded {
236 Poll::Ready(())
237 } else {
238 loom::thread::yield_now();
239 yielded = true;
240 cx.waker().wake_by_ref();
241 Poll::Pending
242 }
243 })
244 .await
245 }
246
gated() -> impl Future<Output = &'static str>247 fn gated() -> impl Future<Output = &'static str> {
248 gated2(false)
249 }
250
gated2(thread: bool) -> impl Future<Output = &'static str>251 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
252 use loom::thread;
253 use std::sync::Arc;
254
255 let gate = Arc::new(AtomicBool::new(false));
256 let mut fired = false;
257
258 poll_fn(move |cx| {
259 if !fired {
260 let gate = gate.clone();
261 let waker = cx.waker().clone();
262
263 if thread {
264 thread::spawn(move || {
265 gate.store(true, Release);
266 waker.wake_by_ref();
267 });
268 } else {
269 spawn(async move {
270 gate.store(true, Release);
271 waker.wake_by_ref();
272 });
273 }
274
275 fired = true;
276
277 return Poll::Pending;
278 }
279
280 if gate.load(Acquire) {
281 Poll::Ready("hello world")
282 } else {
283 Poll::Pending
284 }
285 })
286 }
287