1 extern crate futures;
2 extern crate tokio_current_thread;
3 extern crate tokio_executor;
4
5 use tokio_current_thread::{block_on_all, CurrentThread};
6
7 use std::any::Any;
8 use std::cell::{Cell, RefCell};
9 use std::rc::Rc;
10 use std::thread;
11 use std::time::Duration;
12
13 use futures::future::{self, lazy};
14 use futures::task;
15 // This is not actually unused --- we need this trait to be in scope for
16 // the tests that sue TaskExecutor::current().execute(). The compiler
17 // doesn't realise that.
18 #[allow(unused_imports)]
19 use futures::future::Executor as _futures_Executor;
20 use futures::prelude::*;
21 use futures::sync::oneshot;
22
23 mod from_block_on_all {
24 use super::*;
test<F: Fn(Box<Future<Item = (), Error = ()>>) + 'static>(spawn: F)25 fn test<F: Fn(Box<Future<Item = (), Error = ()>>) + 'static>(spawn: F) {
26 let cnt = Rc::new(Cell::new(0));
27 let c = cnt.clone();
28
29 let msg = tokio_current_thread::block_on_all(lazy(move || {
30 c.set(1 + c.get());
31
32 // Spawn!
33 spawn(Box::new(lazy(move || {
34 c.set(1 + c.get());
35 Ok::<(), ()>(())
36 })));
37
38 Ok::<_, ()>("hello")
39 }))
40 .unwrap();
41
42 assert_eq!(2, cnt.get());
43 assert_eq!(msg, "hello");
44 }
45
46 #[test]
spawn()47 fn spawn() {
48 test(tokio_current_thread::spawn)
49 }
50
51 #[test]
execute()52 fn execute() {
53 test(|f| {
54 tokio_current_thread::TaskExecutor::current()
55 .execute(f)
56 .unwrap();
57 });
58 }
59 }
60
61 #[test]
block_waits()62 fn block_waits() {
63 let (tx, rx) = oneshot::channel();
64
65 thread::spawn(|| {
66 thread::sleep(Duration::from_millis(1000));
67 tx.send(()).unwrap();
68 });
69
70 let cnt = Rc::new(Cell::new(0));
71 let cnt2 = cnt.clone();
72
73 block_on_all(rx.then(move |_| {
74 cnt.set(1 + cnt.get());
75 Ok::<_, ()>(())
76 }))
77 .unwrap();
78
79 assert_eq!(1, cnt2.get());
80 }
81
82 #[test]
spawn_many()83 fn spawn_many() {
84 const ITER: usize = 200;
85
86 let cnt = Rc::new(Cell::new(0));
87 let mut tokio_current_thread = CurrentThread::new();
88
89 for _ in 0..ITER {
90 let cnt = cnt.clone();
91 tokio_current_thread.spawn(lazy(move || {
92 cnt.set(1 + cnt.get());
93 Ok::<(), ()>(())
94 }));
95 }
96
97 tokio_current_thread.run().unwrap();
98
99 assert_eq!(cnt.get(), ITER);
100 }
101
102 mod does_not_set_global_executor_by_default {
103 use super::*;
104
test<F: Fn(Box<Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>( spawn: F, )105 fn test<F: Fn(Box<Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>(
106 spawn: F,
107 ) {
108 block_on_all(lazy(|| {
109 spawn(Box::new(lazy(|| ok()))).unwrap_err();
110 ok()
111 }))
112 .unwrap()
113 }
114
115 #[test]
spawn()116 fn spawn() {
117 use tokio_executor::Executor;
118 test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
119 }
120
121 #[test]
execute()122 fn execute() {
123 test(|f| tokio_executor::DefaultExecutor::current().execute(f))
124 }
125 }
126
127 mod from_block_on_future {
128 use super::*;
129
test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F)130 fn test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F) {
131 let cnt = Rc::new(Cell::new(0));
132
133 let mut tokio_current_thread = CurrentThread::new();
134
135 tokio_current_thread
136 .block_on(lazy(|| {
137 let cnt = cnt.clone();
138
139 spawn(Box::new(lazy(move || {
140 cnt.set(1 + cnt.get());
141 Ok(())
142 })));
143
144 Ok::<_, ()>(())
145 }))
146 .unwrap();
147
148 tokio_current_thread.run().unwrap();
149
150 assert_eq!(1, cnt.get());
151 }
152
153 #[test]
spawn()154 fn spawn() {
155 test(tokio_current_thread::spawn);
156 }
157
158 #[test]
execute()159 fn execute() {
160 test(|f| {
161 tokio_current_thread::TaskExecutor::current()
162 .execute(f)
163 .unwrap();
164 });
165 }
166 }
167
168 struct Never(Rc<()>);
169
170 impl Future for Never {
171 type Item = ();
172 type Error = ();
173
poll(&mut self) -> Poll<(), ()>174 fn poll(&mut self) -> Poll<(), ()> {
175 Ok(Async::NotReady)
176 }
177 }
178
179 mod outstanding_tasks_are_dropped_when_executor_is_dropped {
180 use super::*;
181
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),182 fn test<F, G>(spawn: F, dotspawn: G)
183 where
184 F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
185 G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
186 {
187 let mut rc = Rc::new(());
188
189 let mut tokio_current_thread = CurrentThread::new();
190 dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone())));
191
192 drop(tokio_current_thread);
193
194 // Ensure the daemon is dropped
195 assert!(Rc::get_mut(&mut rc).is_some());
196
197 // Using the global spawn fn
198
199 let mut rc = Rc::new(());
200
201 let mut tokio_current_thread = CurrentThread::new();
202
203 tokio_current_thread
204 .block_on(lazy(|| {
205 spawn(Box::new(Never(rc.clone())));
206 Ok::<_, ()>(())
207 }))
208 .unwrap();
209
210 drop(tokio_current_thread);
211
212 // Ensure the daemon is dropped
213 assert!(Rc::get_mut(&mut rc).is_some());
214 }
215
216 #[test]
spawn()217 fn spawn() {
218 test(tokio_current_thread::spawn, |rt, f| {
219 rt.spawn(f);
220 })
221 }
222
223 #[test]
execute()224 fn execute() {
225 test(
226 |f| {
227 tokio_current_thread::TaskExecutor::current()
228 .execute(f)
229 .unwrap();
230 },
231 // Note: `CurrentThread` doesn't currently implement
232 // `futures::Executor`, so we'll call `.spawn(...)` rather than
233 // `.execute(...)` for now. If `CurrentThread` is changed to
234 // implement Executor, change this to `.execute(...).unwrap()`.
235 |rt, f| {
236 rt.spawn(f);
237 },
238 );
239 }
240 }
241
242 #[test]
243 #[should_panic]
nesting_run()244 fn nesting_run() {
245 block_on_all(lazy(|| {
246 block_on_all(lazy(|| ok())).unwrap();
247
248 ok()
249 }))
250 .unwrap();
251 }
252
253 mod run_in_future {
254 use super::*;
255
256 #[test]
257 #[should_panic]
spawn()258 fn spawn() {
259 block_on_all(lazy(|| {
260 tokio_current_thread::spawn(lazy(|| {
261 block_on_all(lazy(|| ok())).unwrap();
262 ok()
263 }));
264 ok()
265 }))
266 .unwrap();
267 }
268
269 #[test]
270 #[should_panic]
execute()271 fn execute() {
272 block_on_all(lazy(|| {
273 tokio_current_thread::TaskExecutor::current()
274 .execute(lazy(|| {
275 block_on_all(lazy(|| ok())).unwrap();
276 ok()
277 }))
278 .unwrap();
279 ok()
280 }))
281 .unwrap();
282 }
283 }
284
285 #[test]
tick_on_infini_future()286 fn tick_on_infini_future() {
287 let num = Rc::new(Cell::new(0));
288
289 struct Infini {
290 num: Rc<Cell<usize>>,
291 }
292
293 impl Future for Infini {
294 type Item = ();
295 type Error = ();
296
297 fn poll(&mut self) -> Poll<(), ()> {
298 self.num.set(1 + self.num.get());
299 task::current().notify();
300 Ok(Async::NotReady)
301 }
302 }
303
304 CurrentThread::new()
305 .spawn(Infini { num: num.clone() })
306 .turn(None)
307 .unwrap();
308
309 assert_eq!(1, num.get());
310 }
311
312 mod tasks_are_scheduled_fairly {
313 use super::*;
314 struct Spin {
315 state: Rc<RefCell<[i32; 2]>>,
316 idx: usize,
317 }
318
319 impl Future for Spin {
320 type Item = ();
321 type Error = ();
322
poll(&mut self) -> Poll<(), ()>323 fn poll(&mut self) -> Poll<(), ()> {
324 let mut state = self.state.borrow_mut();
325
326 if self.idx == 0 {
327 let diff = state[0] - state[1];
328
329 assert!(diff.abs() <= 1);
330
331 if state[0] >= 50 {
332 return Ok(().into());
333 }
334 }
335
336 state[self.idx] += 1;
337
338 if state[self.idx] >= 100 {
339 return Ok(().into());
340 }
341
342 task::current().notify();
343 Ok(Async::NotReady)
344 }
345 }
346
test<F: Fn(Spin)>(spawn: F)347 fn test<F: Fn(Spin)>(spawn: F) {
348 let state = Rc::new(RefCell::new([0, 0]));
349
350 block_on_all(lazy(|| {
351 spawn(Spin {
352 state: state.clone(),
353 idx: 0,
354 });
355
356 spawn(Spin {
357 state: state,
358 idx: 1,
359 });
360
361 ok()
362 }))
363 .unwrap();
364 }
365
366 #[test]
spawn()367 fn spawn() {
368 test(tokio_current_thread::spawn)
369 }
370
371 #[test]
execute()372 fn execute() {
373 test(|f| {
374 tokio_current_thread::TaskExecutor::current()
375 .execute(f)
376 .unwrap();
377 })
378 }
379 }
380
381 mod and_turn {
382 use super::*;
383
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),384 fn test<F, G>(spawn: F, dotspawn: G)
385 where
386 F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
387 G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
388 {
389 let cnt = Rc::new(Cell::new(0));
390 let c = cnt.clone();
391
392 let mut tokio_current_thread = CurrentThread::new();
393
394 // Spawn a basic task to get the executor to turn
395 dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(()))));
396
397 // Turn once...
398 tokio_current_thread.turn(None).unwrap();
399
400 dotspawn(
401 &mut tokio_current_thread,
402 Box::new(lazy(move || {
403 c.set(1 + c.get());
404
405 // Spawn!
406 spawn(Box::new(lazy(move || {
407 c.set(1 + c.get());
408 Ok::<(), ()>(())
409 })));
410
411 Ok(())
412 })),
413 );
414
415 // This does not run the newly spawned thread
416 tokio_current_thread.turn(None).unwrap();
417 assert_eq!(1, cnt.get());
418
419 // This runs the newly spawned thread
420 tokio_current_thread.turn(None).unwrap();
421 assert_eq!(2, cnt.get());
422 }
423
424 #[test]
spawn()425 fn spawn() {
426 test(tokio_current_thread::spawn, |rt, f| {
427 rt.spawn(f);
428 })
429 }
430
431 #[test]
execute()432 fn execute() {
433 test(
434 |f| {
435 tokio_current_thread::TaskExecutor::current()
436 .execute(f)
437 .unwrap();
438 },
439 // Note: `CurrentThread` doesn't currently implement
440 // `futures::Executor`, so we'll call `.spawn(...)` rather than
441 // `.execute(...)` for now. If `CurrentThread` is changed to
442 // implement Executor, change this to `.execute(...).unwrap()`.
443 |rt, f| {
444 rt.spawn(f);
445 },
446 );
447 }
448
449 }
450
451 mod in_drop {
452 use super::*;
453 struct OnDrop<F: FnOnce()>(Option<F>);
454
455 impl<F: FnOnce()> Drop for OnDrop<F> {
drop(&mut self)456 fn drop(&mut self) {
457 (self.0.take().unwrap())();
458 }
459 }
460
461 struct MyFuture {
462 _data: Box<Any>,
463 }
464
465 impl Future for MyFuture {
466 type Item = ();
467 type Error = ();
468
poll(&mut self) -> Poll<(), ()>469 fn poll(&mut self) -> Poll<(), ()> {
470 Ok(().into())
471 }
472 }
473
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),474 fn test<F, G>(spawn: F, dotspawn: G)
475 where
476 F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
477 G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
478 {
479 let mut tokio_current_thread = CurrentThread::new();
480
481 let (tx, rx) = oneshot::channel();
482
483 dotspawn(
484 &mut tokio_current_thread,
485 Box::new(MyFuture {
486 _data: Box::new(OnDrop(Some(move || {
487 spawn(Box::new(lazy(move || {
488 tx.send(()).unwrap();
489 Ok(())
490 })));
491 }))),
492 }),
493 );
494
495 tokio_current_thread.block_on(rx).unwrap();
496 tokio_current_thread.run().unwrap();
497 }
498
499 #[test]
spawn()500 fn spawn() {
501 test(tokio_current_thread::spawn, |rt, f| {
502 rt.spawn(f);
503 })
504 }
505
506 #[test]
execute()507 fn execute() {
508 test(
509 |f| {
510 tokio_current_thread::TaskExecutor::current()
511 .execute(f)
512 .unwrap();
513 },
514 // Note: `CurrentThread` doesn't currently implement
515 // `futures::Executor`, so we'll call `.spawn(...)` rather than
516 // `.execute(...)` for now. If `CurrentThread` is changed to
517 // implement Executor, change this to `.execute(...).unwrap()`.
518 |rt, f| {
519 rt.spawn(f);
520 },
521 );
522 }
523
524 }
525
526 #[test]
hammer_turn()527 fn hammer_turn() {
528 use futures::sync::mpsc;
529
530 const ITER: usize = 100;
531 const N: usize = 100;
532 const THREADS: usize = 4;
533
534 for _ in 0..ITER {
535 let mut ths = vec![];
536
537 // Add some jitter
538 for _ in 0..THREADS {
539 let th = thread::spawn(|| {
540 let mut tokio_current_thread = CurrentThread::new();
541
542 let (tx, rx) = mpsc::unbounded();
543
544 tokio_current_thread.spawn({
545 let cnt = Rc::new(Cell::new(0));
546 let c = cnt.clone();
547
548 rx.for_each(move |_| {
549 c.set(1 + c.get());
550 Ok(())
551 })
552 .map_err(|e| panic!("err={:?}", e))
553 .map(move |v| {
554 assert_eq!(N, cnt.get());
555 v
556 })
557 });
558
559 thread::spawn(move || {
560 for _ in 0..N {
561 tx.unbounded_send(()).unwrap();
562 thread::yield_now();
563 }
564 });
565
566 while !tokio_current_thread.is_idle() {
567 tokio_current_thread.turn(None).unwrap();
568 }
569 });
570
571 ths.push(th);
572 }
573
574 for th in ths {
575 th.join().unwrap();
576 }
577 }
578 }
579
580 #[test]
turn_has_polled()581 fn turn_has_polled() {
582 let mut tokio_current_thread = CurrentThread::new();
583
584 // Spawn oneshot receiver
585 let (sender, receiver) = oneshot::channel::<()>();
586 tokio_current_thread.spawn(receiver.then(|_| Ok(())));
587
588 // Turn once...
589 let res = tokio_current_thread
590 .turn(Some(Duration::from_millis(0)))
591 .unwrap();
592
593 // Should've polled the receiver once, but considered it not ready
594 assert!(res.has_polled());
595
596 // Turn another time
597 let res = tokio_current_thread
598 .turn(Some(Duration::from_millis(0)))
599 .unwrap();
600
601 // Should've polled nothing, the receiver is not ready yet
602 assert!(!res.has_polled());
603
604 // Make the receiver ready
605 sender.send(()).unwrap();
606
607 // Turn another time
608 let res = tokio_current_thread
609 .turn(Some(Duration::from_millis(0)))
610 .unwrap();
611
612 // Should've polled the receiver, it's ready now
613 assert!(res.has_polled());
614
615 // Now the executor should be empty
616 assert!(tokio_current_thread.is_idle());
617 let res = tokio_current_thread
618 .turn(Some(Duration::from_millis(0)))
619 .unwrap();
620
621 // So should've polled nothing
622 assert!(!res.has_polled());
623 }
624
625 // Our own mock Park that is never really waiting and the only
626 // thing it does is to send, on request, something (once) to a oneshot
627 // channel
628 struct MyPark {
629 sender: Option<oneshot::Sender<()>>,
630 send_now: Rc<Cell<bool>>,
631 }
632
633 struct MyUnpark;
634
635 impl tokio_executor::park::Park for MyPark {
636 type Unpark = MyUnpark;
637 type Error = ();
638
unpark(&self) -> Self::Unpark639 fn unpark(&self) -> Self::Unpark {
640 MyUnpark
641 }
642
park(&mut self) -> Result<(), Self::Error>643 fn park(&mut self) -> Result<(), Self::Error> {
644 // If called twice with send_now, this will intentionally panic
645 if self.send_now.get() {
646 self.sender.take().unwrap().send(()).unwrap();
647 }
648
649 Ok(())
650 }
651
park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error>652 fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
653 self.park()
654 }
655 }
656
657 impl tokio_executor::park::Unpark for MyUnpark {
unpark(&self)658 fn unpark(&self) {}
659 }
660
661 #[test]
turn_fair()662 fn turn_fair() {
663 let send_now = Rc::new(Cell::new(false));
664
665 let (sender, receiver) = oneshot::channel::<()>();
666 let (sender_2, receiver_2) = oneshot::channel::<()>();
667 let (sender_3, receiver_3) = oneshot::channel::<()>();
668
669 let my_park = MyPark {
670 sender: Some(sender_3),
671 send_now: send_now.clone(),
672 };
673
674 let mut tokio_current_thread = CurrentThread::new_with_park(my_park);
675
676 let receiver_1_done = Rc::new(Cell::new(false));
677 let receiver_1_done_clone = receiver_1_done.clone();
678
679 // Once an item is received on the oneshot channel, it will immediately
680 // immediately make the second oneshot channel ready
681 tokio_current_thread.spawn(receiver.map_err(|_| unreachable!()).and_then(move |_| {
682 sender_2.send(()).unwrap();
683 receiver_1_done_clone.set(true);
684
685 Ok(())
686 }));
687
688 let receiver_2_done = Rc::new(Cell::new(false));
689 let receiver_2_done_clone = receiver_2_done.clone();
690
691 tokio_current_thread.spawn(receiver_2.map_err(|_| unreachable!()).and_then(move |_| {
692 receiver_2_done_clone.set(true);
693 Ok(())
694 }));
695
696 // The third receiver is only woken up from our Park implementation, it simulates
697 // e.g. a socket that first has to be polled to know if it is ready now
698 let receiver_3_done = Rc::new(Cell::new(false));
699 let receiver_3_done_clone = receiver_3_done.clone();
700
701 tokio_current_thread.spawn(receiver_3.map_err(|_| unreachable!()).and_then(move |_| {
702 receiver_3_done_clone.set(true);
703 Ok(())
704 }));
705
706 // First turn should've polled both and considered them not ready
707 let res = tokio_current_thread
708 .turn(Some(Duration::from_millis(0)))
709 .unwrap();
710 assert!(res.has_polled());
711
712 // Next turn should've polled nothing
713 let res = tokio_current_thread
714 .turn(Some(Duration::from_millis(0)))
715 .unwrap();
716 assert!(!res.has_polled());
717
718 assert!(!receiver_1_done.get());
719 assert!(!receiver_2_done.get());
720 assert!(!receiver_3_done.get());
721
722 // After this the receiver future will wake up the second receiver future,
723 // so there are pending futures again
724 sender.send(()).unwrap();
725
726 // Now the first receiver should be done, the second receiver should be ready
727 // to be polled again and the socket not yet
728 let res = tokio_current_thread.turn(None).unwrap();
729 assert!(res.has_polled());
730
731 assert!(receiver_1_done.get());
732 assert!(!receiver_2_done.get());
733 assert!(!receiver_3_done.get());
734
735 // Now let our park implementation know that it should send something to sender 3
736 send_now.set(true);
737
738 // This should resolve the second receiver directly, but also poll the socket
739 // and read the packet from it. If it didn't do both here, we would handle
740 // futures that are woken up from the reactor and directly unfairly and would
741 // favour the ones that are woken up directly.
742 let res = tokio_current_thread.turn(None).unwrap();
743 assert!(res.has_polled());
744
745 assert!(receiver_1_done.get());
746 assert!(receiver_2_done.get());
747 assert!(receiver_3_done.get());
748
749 // Don't send again
750 send_now.set(false);
751
752 // Now we should be idle and turning should not poll anything
753 assert!(tokio_current_thread.is_idle());
754 let res = tokio_current_thread.turn(None).unwrap();
755 assert!(!res.has_polled());
756 }
757
758 #[test]
spawn_from_other_thread()759 fn spawn_from_other_thread() {
760 let mut current_thread = CurrentThread::new();
761
762 let handle = current_thread.handle();
763 let (sender, receiver) = oneshot::channel::<()>();
764
765 thread::spawn(move || {
766 handle
767 .spawn(lazy(move || {
768 sender.send(()).unwrap();
769 Ok(())
770 }))
771 .unwrap();
772 });
773
774 let _ = current_thread.block_on(receiver).unwrap();
775 }
776
777 #[test]
spawn_from_other_thread_unpark()778 fn spawn_from_other_thread_unpark() {
779 use std::sync::mpsc::channel as mpsc_channel;
780
781 let mut current_thread = CurrentThread::new();
782
783 let handle = current_thread.handle();
784 let (sender_1, receiver_1) = oneshot::channel::<()>();
785 let (sender_2, receiver_2) = mpsc_channel::<()>();
786
787 thread::spawn(move || {
788 let _ = receiver_2.recv().unwrap();
789
790 handle
791 .spawn(lazy(move || {
792 sender_1.send(()).unwrap();
793 Ok(())
794 }))
795 .unwrap();
796 });
797
798 // Ensure that unparking the executor works correctly. It will first
799 // check if there are new futures (there are none), then execute the
800 // lazy future below which will cause the future to be spawned from
801 // the other thread. Then the executor will park but should be woken
802 // up because *now* we have a new future to schedule
803 let _ = current_thread
804 .block_on(
805 lazy(move || {
806 sender_2.send(()).unwrap();
807 Ok(())
808 })
809 .and_then(|_| receiver_1),
810 )
811 .unwrap();
812 }
813
814 #[test]
spawn_from_executor_with_handle()815 fn spawn_from_executor_with_handle() {
816 let mut current_thread = CurrentThread::new();
817 let handle = current_thread.handle();
818 let (tx, rx) = oneshot::channel();
819
820 current_thread.spawn(lazy(move || {
821 handle
822 .spawn(lazy(move || {
823 tx.send(()).unwrap();
824 Ok(())
825 }))
826 .unwrap();
827 Ok::<_, ()>(())
828 }));
829
830 current_thread.run();
831
832 rx.wait().unwrap();
833 }
834
ok() -> future::FutureResult<(), ()>835 fn ok() -> future::FutureResult<(), ()> {
836 future::ok(())
837 }
838