1 extern crate futures;
2 extern crate tokio_current_thread;
3 extern crate tokio_executor;
4 
5 use tokio_current_thread::{block_on_all, CurrentThread};
6 
7 use std::any::Any;
8 use std::cell::{Cell, RefCell};
9 use std::rc::Rc;
10 use std::thread;
11 use std::time::Duration;
12 
13 use futures::future::{self, lazy};
14 use futures::task;
15 // This is not actually unused --- we need this trait to be in scope for
16 // the tests that sue TaskExecutor::current().execute(). The compiler
17 // doesn't realise that.
18 #[allow(unused_imports)]
19 use futures::future::Executor as _futures_Executor;
20 use futures::prelude::*;
21 use futures::sync::oneshot;
22 
23 mod from_block_on_all {
24     use super::*;
test<F: Fn(Box<Future<Item = (), Error = ()>>) + 'static>(spawn: F)25     fn test<F: Fn(Box<Future<Item = (), Error = ()>>) + 'static>(spawn: F) {
26         let cnt = Rc::new(Cell::new(0));
27         let c = cnt.clone();
28 
29         let msg = tokio_current_thread::block_on_all(lazy(move || {
30             c.set(1 + c.get());
31 
32             // Spawn!
33             spawn(Box::new(lazy(move || {
34                 c.set(1 + c.get());
35                 Ok::<(), ()>(())
36             })));
37 
38             Ok::<_, ()>("hello")
39         }))
40         .unwrap();
41 
42         assert_eq!(2, cnt.get());
43         assert_eq!(msg, "hello");
44     }
45 
46     #[test]
spawn()47     fn spawn() {
48         test(tokio_current_thread::spawn)
49     }
50 
51     #[test]
execute()52     fn execute() {
53         test(|f| {
54             tokio_current_thread::TaskExecutor::current()
55                 .execute(f)
56                 .unwrap();
57         });
58     }
59 }
60 
61 #[test]
block_waits()62 fn block_waits() {
63     let (tx, rx) = oneshot::channel();
64 
65     thread::spawn(|| {
66         thread::sleep(Duration::from_millis(1000));
67         tx.send(()).unwrap();
68     });
69 
70     let cnt = Rc::new(Cell::new(0));
71     let cnt2 = cnt.clone();
72 
73     block_on_all(rx.then(move |_| {
74         cnt.set(1 + cnt.get());
75         Ok::<_, ()>(())
76     }))
77     .unwrap();
78 
79     assert_eq!(1, cnt2.get());
80 }
81 
82 #[test]
spawn_many()83 fn spawn_many() {
84     const ITER: usize = 200;
85 
86     let cnt = Rc::new(Cell::new(0));
87     let mut tokio_current_thread = CurrentThread::new();
88 
89     for _ in 0..ITER {
90         let cnt = cnt.clone();
91         tokio_current_thread.spawn(lazy(move || {
92             cnt.set(1 + cnt.get());
93             Ok::<(), ()>(())
94         }));
95     }
96 
97     tokio_current_thread.run().unwrap();
98 
99     assert_eq!(cnt.get(), ITER);
100 }
101 
102 mod does_not_set_global_executor_by_default {
103     use super::*;
104 
test<F: Fn(Box<Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>( spawn: F, )105     fn test<F: Fn(Box<Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>(
106         spawn: F,
107     ) {
108         block_on_all(lazy(|| {
109             spawn(Box::new(lazy(|| ok()))).unwrap_err();
110             ok()
111         }))
112         .unwrap()
113     }
114 
115     #[test]
spawn()116     fn spawn() {
117         use tokio_executor::Executor;
118         test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
119     }
120 
121     #[test]
execute()122     fn execute() {
123         test(|f| tokio_executor::DefaultExecutor::current().execute(f))
124     }
125 }
126 
127 mod from_block_on_future {
128     use super::*;
129 
test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F)130     fn test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F) {
131         let cnt = Rc::new(Cell::new(0));
132 
133         let mut tokio_current_thread = CurrentThread::new();
134 
135         tokio_current_thread
136             .block_on(lazy(|| {
137                 let cnt = cnt.clone();
138 
139                 spawn(Box::new(lazy(move || {
140                     cnt.set(1 + cnt.get());
141                     Ok(())
142                 })));
143 
144                 Ok::<_, ()>(())
145             }))
146             .unwrap();
147 
148         tokio_current_thread.run().unwrap();
149 
150         assert_eq!(1, cnt.get());
151     }
152 
153     #[test]
spawn()154     fn spawn() {
155         test(tokio_current_thread::spawn);
156     }
157 
158     #[test]
execute()159     fn execute() {
160         test(|f| {
161             tokio_current_thread::TaskExecutor::current()
162                 .execute(f)
163                 .unwrap();
164         });
165     }
166 }
167 
168 struct Never(Rc<()>);
169 
170 impl Future for Never {
171     type Item = ();
172     type Error = ();
173 
poll(&mut self) -> Poll<(), ()>174     fn poll(&mut self) -> Poll<(), ()> {
175         Ok(Async::NotReady)
176     }
177 }
178 
179 mod outstanding_tasks_are_dropped_when_executor_is_dropped {
180     use super::*;
181 
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),182     fn test<F, G>(spawn: F, dotspawn: G)
183     where
184         F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
185         G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
186     {
187         let mut rc = Rc::new(());
188 
189         let mut tokio_current_thread = CurrentThread::new();
190         dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone())));
191 
192         drop(tokio_current_thread);
193 
194         // Ensure the daemon is dropped
195         assert!(Rc::get_mut(&mut rc).is_some());
196 
197         // Using the global spawn fn
198 
199         let mut rc = Rc::new(());
200 
201         let mut tokio_current_thread = CurrentThread::new();
202 
203         tokio_current_thread
204             .block_on(lazy(|| {
205                 spawn(Box::new(Never(rc.clone())));
206                 Ok::<_, ()>(())
207             }))
208             .unwrap();
209 
210         drop(tokio_current_thread);
211 
212         // Ensure the daemon is dropped
213         assert!(Rc::get_mut(&mut rc).is_some());
214     }
215 
216     #[test]
spawn()217     fn spawn() {
218         test(tokio_current_thread::spawn, |rt, f| {
219             rt.spawn(f);
220         })
221     }
222 
223     #[test]
execute()224     fn execute() {
225         test(
226             |f| {
227                 tokio_current_thread::TaskExecutor::current()
228                     .execute(f)
229                     .unwrap();
230             },
231             // Note: `CurrentThread` doesn't currently implement
232             // `futures::Executor`, so we'll call `.spawn(...)` rather than
233             // `.execute(...)` for now. If `CurrentThread` is changed to
234             // implement Executor, change this to `.execute(...).unwrap()`.
235             |rt, f| {
236                 rt.spawn(f);
237             },
238         );
239     }
240 }
241 
242 #[test]
243 #[should_panic]
nesting_run()244 fn nesting_run() {
245     block_on_all(lazy(|| {
246         block_on_all(lazy(|| ok())).unwrap();
247 
248         ok()
249     }))
250     .unwrap();
251 }
252 
253 mod run_in_future {
254     use super::*;
255 
256     #[test]
257     #[should_panic]
spawn()258     fn spawn() {
259         block_on_all(lazy(|| {
260             tokio_current_thread::spawn(lazy(|| {
261                 block_on_all(lazy(|| ok())).unwrap();
262                 ok()
263             }));
264             ok()
265         }))
266         .unwrap();
267     }
268 
269     #[test]
270     #[should_panic]
execute()271     fn execute() {
272         block_on_all(lazy(|| {
273             tokio_current_thread::TaskExecutor::current()
274                 .execute(lazy(|| {
275                     block_on_all(lazy(|| ok())).unwrap();
276                     ok()
277                 }))
278                 .unwrap();
279             ok()
280         }))
281         .unwrap();
282     }
283 }
284 
285 #[test]
tick_on_infini_future()286 fn tick_on_infini_future() {
287     let num = Rc::new(Cell::new(0));
288 
289     struct Infini {
290         num: Rc<Cell<usize>>,
291     }
292 
293     impl Future for Infini {
294         type Item = ();
295         type Error = ();
296 
297         fn poll(&mut self) -> Poll<(), ()> {
298             self.num.set(1 + self.num.get());
299             task::current().notify();
300             Ok(Async::NotReady)
301         }
302     }
303 
304     CurrentThread::new()
305         .spawn(Infini { num: num.clone() })
306         .turn(None)
307         .unwrap();
308 
309     assert_eq!(1, num.get());
310 }
311 
312 mod tasks_are_scheduled_fairly {
313     use super::*;
314     struct Spin {
315         state: Rc<RefCell<[i32; 2]>>,
316         idx: usize,
317     }
318 
319     impl Future for Spin {
320         type Item = ();
321         type Error = ();
322 
poll(&mut self) -> Poll<(), ()>323         fn poll(&mut self) -> Poll<(), ()> {
324             let mut state = self.state.borrow_mut();
325 
326             if self.idx == 0 {
327                 let diff = state[0] - state[1];
328 
329                 assert!(diff.abs() <= 1);
330 
331                 if state[0] >= 50 {
332                     return Ok(().into());
333                 }
334             }
335 
336             state[self.idx] += 1;
337 
338             if state[self.idx] >= 100 {
339                 return Ok(().into());
340             }
341 
342             task::current().notify();
343             Ok(Async::NotReady)
344         }
345     }
346 
test<F: Fn(Spin)>(spawn: F)347     fn test<F: Fn(Spin)>(spawn: F) {
348         let state = Rc::new(RefCell::new([0, 0]));
349 
350         block_on_all(lazy(|| {
351             spawn(Spin {
352                 state: state.clone(),
353                 idx: 0,
354             });
355 
356             spawn(Spin {
357                 state: state,
358                 idx: 1,
359             });
360 
361             ok()
362         }))
363         .unwrap();
364     }
365 
366     #[test]
spawn()367     fn spawn() {
368         test(tokio_current_thread::spawn)
369     }
370 
371     #[test]
execute()372     fn execute() {
373         test(|f| {
374             tokio_current_thread::TaskExecutor::current()
375                 .execute(f)
376                 .unwrap();
377         })
378     }
379 }
380 
381 mod and_turn {
382     use super::*;
383 
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),384     fn test<F, G>(spawn: F, dotspawn: G)
385     where
386         F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
387         G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
388     {
389         let cnt = Rc::new(Cell::new(0));
390         let c = cnt.clone();
391 
392         let mut tokio_current_thread = CurrentThread::new();
393 
394         // Spawn a basic task to get the executor to turn
395         dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(()))));
396 
397         // Turn once...
398         tokio_current_thread.turn(None).unwrap();
399 
400         dotspawn(
401             &mut tokio_current_thread,
402             Box::new(lazy(move || {
403                 c.set(1 + c.get());
404 
405                 // Spawn!
406                 spawn(Box::new(lazy(move || {
407                     c.set(1 + c.get());
408                     Ok::<(), ()>(())
409                 })));
410 
411                 Ok(())
412             })),
413         );
414 
415         // This does not run the newly spawned thread
416         tokio_current_thread.turn(None).unwrap();
417         assert_eq!(1, cnt.get());
418 
419         // This runs the newly spawned thread
420         tokio_current_thread.turn(None).unwrap();
421         assert_eq!(2, cnt.get());
422     }
423 
424     #[test]
spawn()425     fn spawn() {
426         test(tokio_current_thread::spawn, |rt, f| {
427             rt.spawn(f);
428         })
429     }
430 
431     #[test]
execute()432     fn execute() {
433         test(
434             |f| {
435                 tokio_current_thread::TaskExecutor::current()
436                     .execute(f)
437                     .unwrap();
438             },
439             // Note: `CurrentThread` doesn't currently implement
440             // `futures::Executor`, so we'll call `.spawn(...)` rather than
441             // `.execute(...)` for now. If `CurrentThread` is changed to
442             // implement Executor, change this to `.execute(...).unwrap()`.
443             |rt, f| {
444                 rt.spawn(f);
445             },
446         );
447     }
448 
449 }
450 
451 mod in_drop {
452     use super::*;
453     struct OnDrop<F: FnOnce()>(Option<F>);
454 
455     impl<F: FnOnce()> Drop for OnDrop<F> {
drop(&mut self)456         fn drop(&mut self) {
457             (self.0.take().unwrap())();
458         }
459     }
460 
461     struct MyFuture {
462         _data: Box<Any>,
463     }
464 
465     impl Future for MyFuture {
466         type Item = ();
467         type Error = ();
468 
poll(&mut self) -> Poll<(), ()>469         fn poll(&mut self) -> Poll<(), ()> {
470             Ok(().into())
471         }
472     }
473 
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),474     fn test<F, G>(spawn: F, dotspawn: G)
475     where
476         F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
477         G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
478     {
479         let mut tokio_current_thread = CurrentThread::new();
480 
481         let (tx, rx) = oneshot::channel();
482 
483         dotspawn(
484             &mut tokio_current_thread,
485             Box::new(MyFuture {
486                 _data: Box::new(OnDrop(Some(move || {
487                     spawn(Box::new(lazy(move || {
488                         tx.send(()).unwrap();
489                         Ok(())
490                     })));
491                 }))),
492             }),
493         );
494 
495         tokio_current_thread.block_on(rx).unwrap();
496         tokio_current_thread.run().unwrap();
497     }
498 
499     #[test]
spawn()500     fn spawn() {
501         test(tokio_current_thread::spawn, |rt, f| {
502             rt.spawn(f);
503         })
504     }
505 
506     #[test]
execute()507     fn execute() {
508         test(
509             |f| {
510                 tokio_current_thread::TaskExecutor::current()
511                     .execute(f)
512                     .unwrap();
513             },
514             // Note: `CurrentThread` doesn't currently implement
515             // `futures::Executor`, so we'll call `.spawn(...)` rather than
516             // `.execute(...)` for now. If `CurrentThread` is changed to
517             // implement Executor, change this to `.execute(...).unwrap()`.
518             |rt, f| {
519                 rt.spawn(f);
520             },
521         );
522     }
523 
524 }
525 
526 #[test]
hammer_turn()527 fn hammer_turn() {
528     use futures::sync::mpsc;
529 
530     const ITER: usize = 100;
531     const N: usize = 100;
532     const THREADS: usize = 4;
533 
534     for _ in 0..ITER {
535         let mut ths = vec![];
536 
537         // Add some jitter
538         for _ in 0..THREADS {
539             let th = thread::spawn(|| {
540                 let mut tokio_current_thread = CurrentThread::new();
541 
542                 let (tx, rx) = mpsc::unbounded();
543 
544                 tokio_current_thread.spawn({
545                     let cnt = Rc::new(Cell::new(0));
546                     let c = cnt.clone();
547 
548                     rx.for_each(move |_| {
549                         c.set(1 + c.get());
550                         Ok(())
551                     })
552                     .map_err(|e| panic!("err={:?}", e))
553                     .map(move |v| {
554                         assert_eq!(N, cnt.get());
555                         v
556                     })
557                 });
558 
559                 thread::spawn(move || {
560                     for _ in 0..N {
561                         tx.unbounded_send(()).unwrap();
562                         thread::yield_now();
563                     }
564                 });
565 
566                 while !tokio_current_thread.is_idle() {
567                     tokio_current_thread.turn(None).unwrap();
568                 }
569             });
570 
571             ths.push(th);
572         }
573 
574         for th in ths {
575             th.join().unwrap();
576         }
577     }
578 }
579 
580 #[test]
turn_has_polled()581 fn turn_has_polled() {
582     let mut tokio_current_thread = CurrentThread::new();
583 
584     // Spawn oneshot receiver
585     let (sender, receiver) = oneshot::channel::<()>();
586     tokio_current_thread.spawn(receiver.then(|_| Ok(())));
587 
588     // Turn once...
589     let res = tokio_current_thread
590         .turn(Some(Duration::from_millis(0)))
591         .unwrap();
592 
593     // Should've polled the receiver once, but considered it not ready
594     assert!(res.has_polled());
595 
596     // Turn another time
597     let res = tokio_current_thread
598         .turn(Some(Duration::from_millis(0)))
599         .unwrap();
600 
601     // Should've polled nothing, the receiver is not ready yet
602     assert!(!res.has_polled());
603 
604     // Make the receiver ready
605     sender.send(()).unwrap();
606 
607     // Turn another time
608     let res = tokio_current_thread
609         .turn(Some(Duration::from_millis(0)))
610         .unwrap();
611 
612     // Should've polled the receiver, it's ready now
613     assert!(res.has_polled());
614 
615     // Now the executor should be empty
616     assert!(tokio_current_thread.is_idle());
617     let res = tokio_current_thread
618         .turn(Some(Duration::from_millis(0)))
619         .unwrap();
620 
621     // So should've polled nothing
622     assert!(!res.has_polled());
623 }
624 
625 // Our own mock Park that is never really waiting and the only
626 // thing it does is to send, on request, something (once) to a oneshot
627 // channel
628 struct MyPark {
629     sender: Option<oneshot::Sender<()>>,
630     send_now: Rc<Cell<bool>>,
631 }
632 
633 struct MyUnpark;
634 
635 impl tokio_executor::park::Park for MyPark {
636     type Unpark = MyUnpark;
637     type Error = ();
638 
unpark(&self) -> Self::Unpark639     fn unpark(&self) -> Self::Unpark {
640         MyUnpark
641     }
642 
park(&mut self) -> Result<(), Self::Error>643     fn park(&mut self) -> Result<(), Self::Error> {
644         // If called twice with send_now, this will intentionally panic
645         if self.send_now.get() {
646             self.sender.take().unwrap().send(()).unwrap();
647         }
648 
649         Ok(())
650     }
651 
park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error>652     fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
653         self.park()
654     }
655 }
656 
657 impl tokio_executor::park::Unpark for MyUnpark {
unpark(&self)658     fn unpark(&self) {}
659 }
660 
661 #[test]
turn_fair()662 fn turn_fair() {
663     let send_now = Rc::new(Cell::new(false));
664 
665     let (sender, receiver) = oneshot::channel::<()>();
666     let (sender_2, receiver_2) = oneshot::channel::<()>();
667     let (sender_3, receiver_3) = oneshot::channel::<()>();
668 
669     let my_park = MyPark {
670         sender: Some(sender_3),
671         send_now: send_now.clone(),
672     };
673 
674     let mut tokio_current_thread = CurrentThread::new_with_park(my_park);
675 
676     let receiver_1_done = Rc::new(Cell::new(false));
677     let receiver_1_done_clone = receiver_1_done.clone();
678 
679     // Once an item is received on the oneshot channel, it will immediately
680     // immediately make the second oneshot channel ready
681     tokio_current_thread.spawn(receiver.map_err(|_| unreachable!()).and_then(move |_| {
682         sender_2.send(()).unwrap();
683         receiver_1_done_clone.set(true);
684 
685         Ok(())
686     }));
687 
688     let receiver_2_done = Rc::new(Cell::new(false));
689     let receiver_2_done_clone = receiver_2_done.clone();
690 
691     tokio_current_thread.spawn(receiver_2.map_err(|_| unreachable!()).and_then(move |_| {
692         receiver_2_done_clone.set(true);
693         Ok(())
694     }));
695 
696     // The third receiver is only woken up from our Park implementation, it simulates
697     // e.g. a socket that first has to be polled to know if it is ready now
698     let receiver_3_done = Rc::new(Cell::new(false));
699     let receiver_3_done_clone = receiver_3_done.clone();
700 
701     tokio_current_thread.spawn(receiver_3.map_err(|_| unreachable!()).and_then(move |_| {
702         receiver_3_done_clone.set(true);
703         Ok(())
704     }));
705 
706     // First turn should've polled both and considered them not ready
707     let res = tokio_current_thread
708         .turn(Some(Duration::from_millis(0)))
709         .unwrap();
710     assert!(res.has_polled());
711 
712     // Next turn should've polled nothing
713     let res = tokio_current_thread
714         .turn(Some(Duration::from_millis(0)))
715         .unwrap();
716     assert!(!res.has_polled());
717 
718     assert!(!receiver_1_done.get());
719     assert!(!receiver_2_done.get());
720     assert!(!receiver_3_done.get());
721 
722     // After this the receiver future will wake up the second receiver future,
723     // so there are pending futures again
724     sender.send(()).unwrap();
725 
726     // Now the first receiver should be done, the second receiver should be ready
727     // to be polled again and the socket not yet
728     let res = tokio_current_thread.turn(None).unwrap();
729     assert!(res.has_polled());
730 
731     assert!(receiver_1_done.get());
732     assert!(!receiver_2_done.get());
733     assert!(!receiver_3_done.get());
734 
735     // Now let our park implementation know that it should send something to sender 3
736     send_now.set(true);
737 
738     // This should resolve the second receiver directly, but also poll the socket
739     // and read the packet from it. If it didn't do both here, we would handle
740     // futures that are woken up from the reactor and directly unfairly and would
741     // favour the ones that are woken up directly.
742     let res = tokio_current_thread.turn(None).unwrap();
743     assert!(res.has_polled());
744 
745     assert!(receiver_1_done.get());
746     assert!(receiver_2_done.get());
747     assert!(receiver_3_done.get());
748 
749     // Don't send again
750     send_now.set(false);
751 
752     // Now we should be idle and turning should not poll anything
753     assert!(tokio_current_thread.is_idle());
754     let res = tokio_current_thread.turn(None).unwrap();
755     assert!(!res.has_polled());
756 }
757 
758 #[test]
spawn_from_other_thread()759 fn spawn_from_other_thread() {
760     let mut current_thread = CurrentThread::new();
761 
762     let handle = current_thread.handle();
763     let (sender, receiver) = oneshot::channel::<()>();
764 
765     thread::spawn(move || {
766         handle
767             .spawn(lazy(move || {
768                 sender.send(()).unwrap();
769                 Ok(())
770             }))
771             .unwrap();
772     });
773 
774     let _ = current_thread.block_on(receiver).unwrap();
775 }
776 
777 #[test]
spawn_from_other_thread_unpark()778 fn spawn_from_other_thread_unpark() {
779     use std::sync::mpsc::channel as mpsc_channel;
780 
781     let mut current_thread = CurrentThread::new();
782 
783     let handle = current_thread.handle();
784     let (sender_1, receiver_1) = oneshot::channel::<()>();
785     let (sender_2, receiver_2) = mpsc_channel::<()>();
786 
787     thread::spawn(move || {
788         let _ = receiver_2.recv().unwrap();
789 
790         handle
791             .spawn(lazy(move || {
792                 sender_1.send(()).unwrap();
793                 Ok(())
794             }))
795             .unwrap();
796     });
797 
798     // Ensure that unparking the executor works correctly. It will first
799     // check if there are new futures (there are none), then execute the
800     // lazy future below which will cause the future to be spawned from
801     // the other thread. Then the executor will park but should be woken
802     // up because *now* we have a new future to schedule
803     let _ = current_thread
804         .block_on(
805             lazy(move || {
806                 sender_2.send(()).unwrap();
807                 Ok(())
808             })
809             .and_then(|_| receiver_1),
810         )
811         .unwrap();
812 }
813 
814 #[test]
spawn_from_executor_with_handle()815 fn spawn_from_executor_with_handle() {
816     let mut current_thread = CurrentThread::new();
817     let handle = current_thread.handle();
818     let (tx, rx) = oneshot::channel();
819 
820     current_thread.spawn(lazy(move || {
821         handle
822             .spawn(lazy(move || {
823                 tx.send(()).unwrap();
824                 Ok(())
825             }))
826             .unwrap();
827         Ok::<_, ()>(())
828     }));
829 
830     current_thread.run();
831 
832     rx.wait().unwrap();
833 }
834 
ok() -> future::FutureResult<(), ()>835 fn ok() -> future::FutureResult<(), ()> {
836     future::ok(())
837 }
838