1 use futures::channel::oneshot;
2 use futures::executor::LocalPool;
3 use futures::future::{self, lazy, poll_fn, Future};
4 use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
5 use std::cell::{Cell, RefCell};
6 use std::pin::Pin;
7 use std::rc::Rc;
8 use std::sync::atomic::{AtomicBool, Ordering};
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12
13 struct Pending(Rc<()>);
14
15 impl Future for Pending {
16 type Output = ();
17
poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()>18 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
19 Poll::Pending
20 }
21 }
22
pending() -> Pending23 fn pending() -> Pending {
24 Pending(Rc::new(()))
25 }
26
27 #[test]
run_until_single_future()28 fn run_until_single_future() {
29 let mut cnt = 0;
30
31 {
32 let mut pool = LocalPool::new();
33 let fut = lazy(|_| {
34 cnt += 1;
35 });
36 pool.run_until(fut);
37 }
38
39 assert_eq!(cnt, 1);
40 }
41
42 #[test]
run_until_ignores_spawned()43 fn run_until_ignores_spawned() {
44 let mut pool = LocalPool::new();
45 let spawn = pool.spawner();
46 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
47 pool.run_until(lazy(|_| ()));
48 }
49
50 #[test]
run_until_executes_spawned()51 fn run_until_executes_spawned() {
52 let (tx, rx) = oneshot::channel();
53 let mut pool = LocalPool::new();
54 let spawn = pool.spawner();
55 spawn
56 .spawn_local_obj(
57 Box::pin(lazy(move |_| {
58 tx.send(()).unwrap();
59 }))
60 .into(),
61 )
62 .unwrap();
63 pool.run_until(rx).unwrap();
64 }
65
66 #[test]
run_returns_if_empty()67 fn run_returns_if_empty() {
68 let mut pool = LocalPool::new();
69 pool.run();
70 pool.run();
71 }
72
73 #[test]
run_executes_spawned()74 fn run_executes_spawned() {
75 let cnt = Rc::new(Cell::new(0));
76 let cnt2 = cnt.clone();
77
78 let mut pool = LocalPool::new();
79 let spawn = pool.spawner();
80 let spawn2 = pool.spawner();
81
82 spawn
83 .spawn_local_obj(
84 Box::pin(lazy(move |_| {
85 spawn2
86 .spawn_local_obj(
87 Box::pin(lazy(move |_| {
88 cnt2.set(cnt2.get() + 1);
89 }))
90 .into(),
91 )
92 .unwrap();
93 }))
94 .into(),
95 )
96 .unwrap();
97
98 pool.run();
99
100 assert_eq!(cnt.get(), 1);
101 }
102
103 #[test]
run_spawn_many()104 fn run_spawn_many() {
105 const ITER: usize = 200;
106
107 let cnt = Rc::new(Cell::new(0));
108
109 let mut pool = LocalPool::new();
110 let spawn = pool.spawner();
111
112 for _ in 0..ITER {
113 let cnt = cnt.clone();
114 spawn
115 .spawn_local_obj(
116 Box::pin(lazy(move |_| {
117 cnt.set(cnt.get() + 1);
118 }))
119 .into(),
120 )
121 .unwrap();
122 }
123
124 pool.run();
125
126 assert_eq!(cnt.get(), ITER);
127 }
128
129 #[test]
try_run_one_returns_if_empty()130 fn try_run_one_returns_if_empty() {
131 let mut pool = LocalPool::new();
132 assert!(!pool.try_run_one());
133 }
134
135 #[test]
try_run_one_executes_one_ready()136 fn try_run_one_executes_one_ready() {
137 const ITER: usize = 200;
138
139 let cnt = Rc::new(Cell::new(0));
140
141 let mut pool = LocalPool::new();
142 let spawn = pool.spawner();
143
144 for _ in 0..ITER {
145 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
146
147 let cnt = cnt.clone();
148 spawn
149 .spawn_local_obj(
150 Box::pin(lazy(move |_| {
151 cnt.set(cnt.get() + 1);
152 }))
153 .into(),
154 )
155 .unwrap();
156
157 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
158 }
159
160 for i in 0..ITER {
161 assert_eq!(cnt.get(), i);
162 assert!(pool.try_run_one());
163 assert_eq!(cnt.get(), i + 1);
164 }
165 assert!(!pool.try_run_one());
166 }
167
168 #[test]
try_run_one_returns_on_no_progress()169 fn try_run_one_returns_on_no_progress() {
170 const ITER: usize = 10;
171
172 let cnt = Rc::new(Cell::new(0));
173
174 let mut pool = LocalPool::new();
175 let spawn = pool.spawner();
176
177 let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
178 {
179 let cnt = cnt.clone();
180 let waker = waker.clone();
181 spawn
182 .spawn_local_obj(
183 Box::pin(poll_fn(move |ctx| {
184 cnt.set(cnt.get() + 1);
185 waker.set(Some(ctx.waker().clone()));
186 if cnt.get() == ITER {
187 Poll::Ready(())
188 } else {
189 Poll::Pending
190 }
191 }))
192 .into(),
193 )
194 .unwrap();
195 }
196
197 for i in 0..ITER - 1 {
198 assert_eq!(cnt.get(), i);
199 assert!(!pool.try_run_one());
200 assert_eq!(cnt.get(), i + 1);
201 let w = waker.take();
202 assert!(w.is_some());
203 w.unwrap().wake();
204 }
205 assert!(pool.try_run_one());
206 assert_eq!(cnt.get(), ITER);
207 }
208
209 #[test]
try_run_one_runs_sub_futures()210 fn try_run_one_runs_sub_futures() {
211 let mut pool = LocalPool::new();
212 let spawn = pool.spawner();
213 let cnt = Rc::new(Cell::new(0));
214
215 let inner_spawner = spawn.clone();
216 let cnt1 = cnt.clone();
217 spawn
218 .spawn_local_obj(
219 Box::pin(poll_fn(move |_| {
220 cnt1.set(cnt1.get() + 1);
221
222 let cnt2 = cnt1.clone();
223 inner_spawner
224 .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
225 .unwrap();
226
227 Poll::Pending
228 }))
229 .into(),
230 )
231 .unwrap();
232
233 pool.try_run_one();
234 assert_eq!(cnt.get(), 2);
235 }
236
237 #[test]
run_until_stalled_returns_if_empty()238 fn run_until_stalled_returns_if_empty() {
239 let mut pool = LocalPool::new();
240 pool.run_until_stalled();
241 pool.run_until_stalled();
242 }
243
244 #[test]
run_until_stalled_returns_multiple_times()245 fn run_until_stalled_returns_multiple_times() {
246 let mut pool = LocalPool::new();
247 let spawn = pool.spawner();
248 let cnt = Rc::new(Cell::new(0));
249
250 let cnt1 = cnt.clone();
251 spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
252 pool.run_until_stalled();
253 assert_eq!(cnt.get(), 1);
254
255 let cnt2 = cnt.clone();
256 spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
257 pool.run_until_stalled();
258 assert_eq!(cnt.get(), 2);
259 }
260
261 #[test]
run_until_stalled_runs_spawned_sub_futures()262 fn run_until_stalled_runs_spawned_sub_futures() {
263 let mut pool = LocalPool::new();
264 let spawn = pool.spawner();
265 let cnt = Rc::new(Cell::new(0));
266
267 let inner_spawner = spawn.clone();
268 let cnt1 = cnt.clone();
269 spawn
270 .spawn_local_obj(
271 Box::pin(poll_fn(move |_| {
272 cnt1.set(cnt1.get() + 1);
273
274 let cnt2 = cnt1.clone();
275 inner_spawner
276 .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
277 .unwrap();
278
279 Poll::Pending
280 }))
281 .into(),
282 )
283 .unwrap();
284
285 pool.run_until_stalled();
286 assert_eq!(cnt.get(), 2);
287 }
288
289 #[test]
run_until_stalled_executes_all_ready()290 fn run_until_stalled_executes_all_ready() {
291 const ITER: usize = 200;
292 const PER_ITER: usize = 3;
293
294 let cnt = Rc::new(Cell::new(0));
295
296 let mut pool = LocalPool::new();
297 let spawn = pool.spawner();
298
299 for i in 0..ITER {
300 for _ in 0..PER_ITER {
301 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
302
303 let cnt = cnt.clone();
304 spawn
305 .spawn_local_obj(
306 Box::pin(lazy(move |_| {
307 cnt.set(cnt.get() + 1);
308 }))
309 .into(),
310 )
311 .unwrap();
312
313 // also add some pending tasks to test if they are ignored
314 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
315 }
316 assert_eq!(cnt.get(), i * PER_ITER);
317 pool.run_until_stalled();
318 assert_eq!(cnt.get(), (i + 1) * PER_ITER);
319 }
320 }
321
322 #[test]
323 #[should_panic]
nesting_run()324 fn nesting_run() {
325 let mut pool = LocalPool::new();
326 let spawn = pool.spawner();
327
328 spawn
329 .spawn_obj(
330 Box::pin(lazy(|_| {
331 let mut pool = LocalPool::new();
332 pool.run();
333 }))
334 .into(),
335 )
336 .unwrap();
337
338 pool.run();
339 }
340
341 #[test]
342 #[should_panic]
nesting_run_run_until_stalled()343 fn nesting_run_run_until_stalled() {
344 let mut pool = LocalPool::new();
345 let spawn = pool.spawner();
346
347 spawn
348 .spawn_obj(
349 Box::pin(lazy(|_| {
350 let mut pool = LocalPool::new();
351 pool.run_until_stalled();
352 }))
353 .into(),
354 )
355 .unwrap();
356
357 pool.run();
358 }
359
360 #[test]
tasks_are_scheduled_fairly()361 fn tasks_are_scheduled_fairly() {
362 let state = Rc::new(RefCell::new([0, 0]));
363
364 struct Spin {
365 state: Rc<RefCell<[i32; 2]>>,
366 idx: usize,
367 }
368
369 impl Future for Spin {
370 type Output = ();
371
372 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
373 let mut state = self.state.borrow_mut();
374
375 if self.idx == 0 {
376 let diff = state[0] - state[1];
377
378 assert!(diff.abs() <= 1);
379
380 if state[0] >= 50 {
381 return Poll::Ready(());
382 }
383 }
384
385 state[self.idx] += 1;
386
387 if state[self.idx] >= 100 {
388 return Poll::Ready(());
389 }
390
391 cx.waker().wake_by_ref();
392 Poll::Pending
393 }
394 }
395
396 let mut pool = LocalPool::new();
397 let spawn = pool.spawner();
398
399 spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
400
401 spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
402
403 pool.run();
404 }
405
406 // Tests that the use of park/unpark in user-code has no
407 // effect on the expected behavior of the executor.
408 #[test]
park_unpark_independence()409 fn park_unpark_independence() {
410 let mut done = false;
411
412 let future = future::poll_fn(move |cx| {
413 if done {
414 return Poll::Ready(());
415 }
416 done = true;
417 cx.waker().clone().wake(); // (*)
418 // some user-code that temporarily parks the thread
419 let test = thread::current();
420 let latch = Arc::new(AtomicBool::new(false));
421 let signal = latch.clone();
422 thread::spawn(move || {
423 thread::sleep(Duration::from_millis(10));
424 signal.store(true, Ordering::SeqCst);
425 test.unpark()
426 });
427 while !latch.load(Ordering::Relaxed) {
428 thread::park();
429 }
430 Poll::Pending // Expect to be called again due to (*).
431 });
432
433 futures::executor::block_on(future)
434 }
435