1 use futures::channel::oneshot;
2 use futures::executor::LocalPool;
3 use futures::future::{self, lazy, poll_fn, Future};
4 use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
5 use std::cell::{Cell, RefCell};
6 use std::pin::Pin;
7 use std::rc::Rc;
8 use std::sync::atomic::{AtomicBool, Ordering};
9 use std::sync::Arc;
10 use std::thread;
11 use std::time::Duration;
12 
13 struct Pending(Rc<()>);
14 
15 impl Future for Pending {
16     type Output = ();
17 
poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()>18     fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
19         Poll::Pending
20     }
21 }
22 
pending() -> Pending23 fn pending() -> Pending {
24     Pending(Rc::new(()))
25 }
26 
27 #[test]
run_until_single_future()28 fn run_until_single_future() {
29     let mut cnt = 0;
30 
31     {
32         let mut pool = LocalPool::new();
33         let fut = lazy(|_| {
34             cnt += 1;
35         });
36         pool.run_until(fut);
37     }
38 
39     assert_eq!(cnt, 1);
40 }
41 
42 #[test]
run_until_ignores_spawned()43 fn run_until_ignores_spawned() {
44     let mut pool = LocalPool::new();
45     let spawn = pool.spawner();
46     spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
47     pool.run_until(lazy(|_| ()));
48 }
49 
50 #[test]
run_until_executes_spawned()51 fn run_until_executes_spawned() {
52     let (tx, rx) = oneshot::channel();
53     let mut pool = LocalPool::new();
54     let spawn = pool.spawner();
55     spawn
56         .spawn_local_obj(
57             Box::pin(lazy(move |_| {
58                 tx.send(()).unwrap();
59             }))
60             .into(),
61         )
62         .unwrap();
63     pool.run_until(rx).unwrap();
64 }
65 
66 #[test]
run_returns_if_empty()67 fn run_returns_if_empty() {
68     let mut pool = LocalPool::new();
69     pool.run();
70     pool.run();
71 }
72 
73 #[test]
run_executes_spawned()74 fn run_executes_spawned() {
75     let cnt = Rc::new(Cell::new(0));
76     let cnt2 = cnt.clone();
77 
78     let mut pool = LocalPool::new();
79     let spawn = pool.spawner();
80     let spawn2 = pool.spawner();
81 
82     spawn
83         .spawn_local_obj(
84             Box::pin(lazy(move |_| {
85                 spawn2
86                     .spawn_local_obj(
87                         Box::pin(lazy(move |_| {
88                             cnt2.set(cnt2.get() + 1);
89                         }))
90                         .into(),
91                     )
92                     .unwrap();
93             }))
94             .into(),
95         )
96         .unwrap();
97 
98     pool.run();
99 
100     assert_eq!(cnt.get(), 1);
101 }
102 
103 #[test]
run_spawn_many()104 fn run_spawn_many() {
105     const ITER: usize = 200;
106 
107     let cnt = Rc::new(Cell::new(0));
108 
109     let mut pool = LocalPool::new();
110     let spawn = pool.spawner();
111 
112     for _ in 0..ITER {
113         let cnt = cnt.clone();
114         spawn
115             .spawn_local_obj(
116                 Box::pin(lazy(move |_| {
117                     cnt.set(cnt.get() + 1);
118                 }))
119                 .into(),
120             )
121             .unwrap();
122     }
123 
124     pool.run();
125 
126     assert_eq!(cnt.get(), ITER);
127 }
128 
129 #[test]
try_run_one_returns_if_empty()130 fn try_run_one_returns_if_empty() {
131     let mut pool = LocalPool::new();
132     assert!(!pool.try_run_one());
133 }
134 
135 #[test]
try_run_one_executes_one_ready()136 fn try_run_one_executes_one_ready() {
137     const ITER: usize = 200;
138 
139     let cnt = Rc::new(Cell::new(0));
140 
141     let mut pool = LocalPool::new();
142     let spawn = pool.spawner();
143 
144     for _ in 0..ITER {
145         spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
146 
147         let cnt = cnt.clone();
148         spawn
149             .spawn_local_obj(
150                 Box::pin(lazy(move |_| {
151                     cnt.set(cnt.get() + 1);
152                 }))
153                 .into(),
154             )
155             .unwrap();
156 
157         spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
158     }
159 
160     for i in 0..ITER {
161         assert_eq!(cnt.get(), i);
162         assert!(pool.try_run_one());
163         assert_eq!(cnt.get(), i + 1);
164     }
165     assert!(!pool.try_run_one());
166 }
167 
168 #[test]
try_run_one_returns_on_no_progress()169 fn try_run_one_returns_on_no_progress() {
170     const ITER: usize = 10;
171 
172     let cnt = Rc::new(Cell::new(0));
173 
174     let mut pool = LocalPool::new();
175     let spawn = pool.spawner();
176 
177     let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
178     {
179         let cnt = cnt.clone();
180         let waker = waker.clone();
181         spawn
182             .spawn_local_obj(
183                 Box::pin(poll_fn(move |ctx| {
184                     cnt.set(cnt.get() + 1);
185                     waker.set(Some(ctx.waker().clone()));
186                     if cnt.get() == ITER {
187                         Poll::Ready(())
188                     } else {
189                         Poll::Pending
190                     }
191                 }))
192                 .into(),
193             )
194             .unwrap();
195     }
196 
197     for i in 0..ITER - 1 {
198         assert_eq!(cnt.get(), i);
199         assert!(!pool.try_run_one());
200         assert_eq!(cnt.get(), i + 1);
201         let w = waker.take();
202         assert!(w.is_some());
203         w.unwrap().wake();
204     }
205     assert!(pool.try_run_one());
206     assert_eq!(cnt.get(), ITER);
207 }
208 
209 #[test]
try_run_one_runs_sub_futures()210 fn try_run_one_runs_sub_futures() {
211     let mut pool = LocalPool::new();
212     let spawn = pool.spawner();
213     let cnt = Rc::new(Cell::new(0));
214 
215     let inner_spawner = spawn.clone();
216     let cnt1 = cnt.clone();
217     spawn
218         .spawn_local_obj(
219             Box::pin(poll_fn(move |_| {
220                 cnt1.set(cnt1.get() + 1);
221 
222                 let cnt2 = cnt1.clone();
223                 inner_spawner
224                     .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
225                     .unwrap();
226 
227                 Poll::Pending
228             }))
229             .into(),
230         )
231         .unwrap();
232 
233     pool.try_run_one();
234     assert_eq!(cnt.get(), 2);
235 }
236 
237 #[test]
run_until_stalled_returns_if_empty()238 fn run_until_stalled_returns_if_empty() {
239     let mut pool = LocalPool::new();
240     pool.run_until_stalled();
241     pool.run_until_stalled();
242 }
243 
244 #[test]
run_until_stalled_returns_multiple_times()245 fn run_until_stalled_returns_multiple_times() {
246     let mut pool = LocalPool::new();
247     let spawn = pool.spawner();
248     let cnt = Rc::new(Cell::new(0));
249 
250     let cnt1 = cnt.clone();
251     spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
252     pool.run_until_stalled();
253     assert_eq!(cnt.get(), 1);
254 
255     let cnt2 = cnt.clone();
256     spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
257     pool.run_until_stalled();
258     assert_eq!(cnt.get(), 2);
259 }
260 
261 #[test]
run_until_stalled_runs_spawned_sub_futures()262 fn run_until_stalled_runs_spawned_sub_futures() {
263     let mut pool = LocalPool::new();
264     let spawn = pool.spawner();
265     let cnt = Rc::new(Cell::new(0));
266 
267     let inner_spawner = spawn.clone();
268     let cnt1 = cnt.clone();
269     spawn
270         .spawn_local_obj(
271             Box::pin(poll_fn(move |_| {
272                 cnt1.set(cnt1.get() + 1);
273 
274                 let cnt2 = cnt1.clone();
275                 inner_spawner
276                     .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
277                     .unwrap();
278 
279                 Poll::Pending
280             }))
281             .into(),
282         )
283         .unwrap();
284 
285     pool.run_until_stalled();
286     assert_eq!(cnt.get(), 2);
287 }
288 
289 #[test]
run_until_stalled_executes_all_ready()290 fn run_until_stalled_executes_all_ready() {
291     const ITER: usize = 200;
292     const PER_ITER: usize = 3;
293 
294     let cnt = Rc::new(Cell::new(0));
295 
296     let mut pool = LocalPool::new();
297     let spawn = pool.spawner();
298 
299     for i in 0..ITER {
300         for _ in 0..PER_ITER {
301             spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
302 
303             let cnt = cnt.clone();
304             spawn
305                 .spawn_local_obj(
306                     Box::pin(lazy(move |_| {
307                         cnt.set(cnt.get() + 1);
308                     }))
309                     .into(),
310                 )
311                 .unwrap();
312 
313             // also add some pending tasks to test if they are ignored
314             spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
315         }
316         assert_eq!(cnt.get(), i * PER_ITER);
317         pool.run_until_stalled();
318         assert_eq!(cnt.get(), (i + 1) * PER_ITER);
319     }
320 }
321 
322 #[test]
323 #[should_panic]
nesting_run()324 fn nesting_run() {
325     let mut pool = LocalPool::new();
326     let spawn = pool.spawner();
327 
328     spawn
329         .spawn_obj(
330             Box::pin(lazy(|_| {
331                 let mut pool = LocalPool::new();
332                 pool.run();
333             }))
334             .into(),
335         )
336         .unwrap();
337 
338     pool.run();
339 }
340 
341 #[test]
342 #[should_panic]
nesting_run_run_until_stalled()343 fn nesting_run_run_until_stalled() {
344     let mut pool = LocalPool::new();
345     let spawn = pool.spawner();
346 
347     spawn
348         .spawn_obj(
349             Box::pin(lazy(|_| {
350                 let mut pool = LocalPool::new();
351                 pool.run_until_stalled();
352             }))
353             .into(),
354         )
355         .unwrap();
356 
357     pool.run();
358 }
359 
360 #[test]
tasks_are_scheduled_fairly()361 fn tasks_are_scheduled_fairly() {
362     let state = Rc::new(RefCell::new([0, 0]));
363 
364     struct Spin {
365         state: Rc<RefCell<[i32; 2]>>,
366         idx: usize,
367     }
368 
369     impl Future for Spin {
370         type Output = ();
371 
372         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
373             let mut state = self.state.borrow_mut();
374 
375             if self.idx == 0 {
376                 let diff = state[0] - state[1];
377 
378                 assert!(diff.abs() <= 1);
379 
380                 if state[0] >= 50 {
381                     return Poll::Ready(());
382                 }
383             }
384 
385             state[self.idx] += 1;
386 
387             if state[self.idx] >= 100 {
388                 return Poll::Ready(());
389             }
390 
391             cx.waker().wake_by_ref();
392             Poll::Pending
393         }
394     }
395 
396     let mut pool = LocalPool::new();
397     let spawn = pool.spawner();
398 
399     spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
400 
401     spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
402 
403     pool.run();
404 }
405 
406 // Tests that the use of park/unpark in user-code has no
407 // effect on the expected behavior of the executor.
408 #[test]
park_unpark_independence()409 fn park_unpark_independence() {
410     let mut done = false;
411 
412     let future = future::poll_fn(move |cx| {
413         if done {
414             return Poll::Ready(());
415         }
416         done = true;
417         cx.waker().clone().wake(); // (*)
418                                    // some user-code that temporarily parks the thread
419         let test = thread::current();
420         let latch = Arc::new(AtomicBool::new(false));
421         let signal = latch.clone();
422         thread::spawn(move || {
423             thread::sleep(Duration::from_millis(10));
424             signal.store(true, Ordering::SeqCst);
425             test.unpark()
426         });
427         while !latch.load(Ordering::Relaxed) {
428             thread::park();
429         }
430         Poll::Pending // Expect to be called again due to (*).
431     });
432 
433     futures::executor::block_on(future)
434 }
435