1 extern crate futures;
2 extern crate tokio_current_thread;
3 extern crate tokio_executor;
4 
5 use tokio_current_thread::{block_on_all, CurrentThread};
6 
7 use std::any::Any;
8 use std::cell::{Cell, RefCell};
9 use std::rc::Rc;
10 use std::thread;
11 use std::time::Duration;
12 
13 use futures::future::{self, lazy};
14 use futures::task;
15 // This is not actually unused --- we need this trait to be in scope for
16 // the tests that sue TaskExecutor::current().execute(). The compiler
17 // doesn't realise that.
18 #[allow(unused_imports)]
19 use futures::future::Executor as _futures_Executor;
20 use futures::prelude::*;
21 use futures::sync::oneshot;
22 
23 mod from_block_on_all {
24     use super::*;
test<F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static>(spawn: F)25     fn test<F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static>(spawn: F) {
26         let cnt = Rc::new(Cell::new(0));
27         let c = cnt.clone();
28 
29         let msg = tokio_current_thread::block_on_all(lazy(move || {
30             c.set(1 + c.get());
31 
32             // Spawn!
33             spawn(Box::new(lazy(move || {
34                 c.set(1 + c.get());
35                 Ok::<(), ()>(())
36             })));
37 
38             Ok::<_, ()>("hello")
39         }))
40         .unwrap();
41 
42         assert_eq!(2, cnt.get());
43         assert_eq!(msg, "hello");
44     }
45 
46     #[test]
spawn()47     fn spawn() {
48         test(tokio_current_thread::spawn)
49     }
50 
51     #[test]
execute()52     fn execute() {
53         test(|f| {
54             tokio_current_thread::TaskExecutor::current()
55                 .execute(f)
56                 .unwrap();
57         });
58     }
59 }
60 
61 #[test]
block_waits()62 fn block_waits() {
63     let (tx, rx) = oneshot::channel();
64 
65     thread::spawn(|| {
66         thread::sleep(Duration::from_millis(1000));
67         tx.send(()).unwrap();
68     });
69 
70     let cnt = Rc::new(Cell::new(0));
71     let cnt2 = cnt.clone();
72 
73     block_on_all(rx.then(move |_| {
74         cnt.set(1 + cnt.get());
75         Ok::<_, ()>(())
76     }))
77     .unwrap();
78 
79     assert_eq!(1, cnt2.get());
80 }
81 
82 #[test]
spawn_many()83 fn spawn_many() {
84     const ITER: usize = 200;
85 
86     let cnt = Rc::new(Cell::new(0));
87     let mut tokio_current_thread = CurrentThread::new();
88 
89     for _ in 0..ITER {
90         let cnt = cnt.clone();
91         tokio_current_thread.spawn(lazy(move || {
92             cnt.set(1 + cnt.get());
93             Ok::<(), ()>(())
94         }));
95     }
96 
97     tokio_current_thread.run().unwrap();
98 
99     assert_eq!(cnt.get(), ITER);
100 }
101 
102 mod does_not_set_global_executor_by_default {
103     use super::*;
104 
test<F: Fn(Box<dyn Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>( spawn: F, )105     fn test<F: Fn(Box<dyn Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>(
106         spawn: F,
107     ) {
108         block_on_all(lazy(|| {
109             spawn(Box::new(lazy(|| ok()))).unwrap_err();
110             ok()
111         }))
112         .unwrap()
113     }
114 
115     #[test]
spawn()116     fn spawn() {
117         use tokio_executor::Executor;
118         test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
119     }
120 
121     #[test]
execute()122     fn execute() {
123         test(|f| tokio_executor::DefaultExecutor::current().execute(f))
124     }
125 }
126 
127 mod from_block_on_future {
128     use super::*;
129 
test<F: Fn(Box<dyn Future<Item = (), Error = ()>>)>(spawn: F)130     fn test<F: Fn(Box<dyn Future<Item = (), Error = ()>>)>(spawn: F) {
131         let cnt = Rc::new(Cell::new(0));
132 
133         let mut tokio_current_thread = CurrentThread::new();
134 
135         tokio_current_thread
136             .block_on(lazy(|| {
137                 let cnt = cnt.clone();
138 
139                 spawn(Box::new(lazy(move || {
140                     cnt.set(1 + cnt.get());
141                     Ok(())
142                 })));
143 
144                 Ok::<_, ()>(())
145             }))
146             .unwrap();
147 
148         tokio_current_thread.run().unwrap();
149 
150         assert_eq!(1, cnt.get());
151     }
152 
153     #[test]
spawn()154     fn spawn() {
155         test(tokio_current_thread::spawn);
156     }
157 
158     #[test]
execute()159     fn execute() {
160         test(|f| {
161             tokio_current_thread::TaskExecutor::current()
162                 .execute(f)
163                 .unwrap();
164         });
165     }
166 }
167 
168 struct Never(Rc<()>);
169 
170 impl Future for Never {
171     type Item = ();
172     type Error = ();
173 
poll(&mut self) -> Poll<(), ()>174     fn poll(&mut self) -> Poll<(), ()> {
175         Ok(Async::NotReady)
176     }
177 }
178 
179 mod outstanding_tasks_are_dropped_when_executor_is_dropped {
180     use super::*;
181 
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>),182     fn test<F, G>(spawn: F, dotspawn: G)
183     where
184         F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static,
185         G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>),
186     {
187         let mut rc = Rc::new(());
188 
189         let mut tokio_current_thread = CurrentThread::new();
190         dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone())));
191 
192         drop(tokio_current_thread);
193 
194         // Ensure the daemon is dropped
195         assert!(Rc::get_mut(&mut rc).is_some());
196 
197         // Using the global spawn fn
198 
199         let mut rc = Rc::new(());
200 
201         let mut tokio_current_thread = CurrentThread::new();
202 
203         tokio_current_thread
204             .block_on(lazy(|| {
205                 spawn(Box::new(Never(rc.clone())));
206                 Ok::<_, ()>(())
207             }))
208             .unwrap();
209 
210         drop(tokio_current_thread);
211 
212         // Ensure the daemon is dropped
213         assert!(Rc::get_mut(&mut rc).is_some());
214     }
215 
216     #[test]
spawn()217     fn spawn() {
218         test(tokio_current_thread::spawn, |rt, f| {
219             rt.spawn(f);
220         })
221     }
222 
223     #[test]
execute()224     fn execute() {
225         test(
226             |f| {
227                 tokio_current_thread::TaskExecutor::current()
228                     .execute(f)
229                     .unwrap();
230             },
231             // Note: `CurrentThread` doesn't currently implement
232             // `futures::Executor`, so we'll call `.spawn(...)` rather than
233             // `.execute(...)` for now. If `CurrentThread` is changed to
234             // implement Executor, change this to `.execute(...).unwrap()`.
235             |rt, f| {
236                 rt.spawn(f);
237             },
238         );
239     }
240 }
241 
242 #[test]
243 #[should_panic]
nesting_run()244 fn nesting_run() {
245     block_on_all(lazy(|| {
246         block_on_all(lazy(|| ok())).unwrap();
247 
248         ok()
249     }))
250     .unwrap();
251 }
252 
253 mod run_in_future {
254     use super::*;
255 
256     #[test]
257     #[should_panic]
spawn()258     fn spawn() {
259         block_on_all(lazy(|| {
260             tokio_current_thread::spawn(lazy(|| {
261                 block_on_all(lazy(|| ok())).unwrap();
262                 ok()
263             }));
264             ok()
265         }))
266         .unwrap();
267     }
268 
269     #[test]
270     #[should_panic]
execute()271     fn execute() {
272         block_on_all(lazy(|| {
273             tokio_current_thread::TaskExecutor::current()
274                 .execute(lazy(|| {
275                     block_on_all(lazy(|| ok())).unwrap();
276                     ok()
277                 }))
278                 .unwrap();
279             ok()
280         }))
281         .unwrap();
282     }
283 }
284 
285 #[test]
tick_on_infini_future()286 fn tick_on_infini_future() {
287     let num = Rc::new(Cell::new(0));
288 
289     struct Infini {
290         num: Rc<Cell<usize>>,
291     }
292 
293     impl Future for Infini {
294         type Item = ();
295         type Error = ();
296 
297         fn poll(&mut self) -> Poll<(), ()> {
298             self.num.set(1 + self.num.get());
299             task::current().notify();
300             Ok(Async::NotReady)
301         }
302     }
303 
304     CurrentThread::new()
305         .spawn(Infini { num: num.clone() })
306         .turn(None)
307         .unwrap();
308 
309     assert_eq!(1, num.get());
310 }
311 
312 mod tasks_are_scheduled_fairly {
313     use super::*;
314     struct Spin {
315         state: Rc<RefCell<[i32; 2]>>,
316         idx: usize,
317     }
318 
319     impl Future for Spin {
320         type Item = ();
321         type Error = ();
322 
poll(&mut self) -> Poll<(), ()>323         fn poll(&mut self) -> Poll<(), ()> {
324             let mut state = self.state.borrow_mut();
325 
326             if self.idx == 0 {
327                 let diff = state[0] - state[1];
328 
329                 assert!(diff.abs() <= 1);
330 
331                 if state[0] >= 50 {
332                     return Ok(().into());
333                 }
334             }
335 
336             state[self.idx] += 1;
337 
338             if state[self.idx] >= 100 {
339                 return Ok(().into());
340             }
341 
342             task::current().notify();
343             Ok(Async::NotReady)
344         }
345     }
346 
test<F: Fn(Spin)>(spawn: F)347     fn test<F: Fn(Spin)>(spawn: F) {
348         let state = Rc::new(RefCell::new([0, 0]));
349 
350         block_on_all(lazy(|| {
351             spawn(Spin {
352                 state: state.clone(),
353                 idx: 0,
354             });
355 
356             spawn(Spin {
357                 state: state,
358                 idx: 1,
359             });
360 
361             ok()
362         }))
363         .unwrap();
364     }
365 
366     #[test]
spawn()367     fn spawn() {
368         test(tokio_current_thread::spawn)
369     }
370 
371     #[test]
execute()372     fn execute() {
373         test(|f| {
374             tokio_current_thread::TaskExecutor::current()
375                 .execute(f)
376                 .unwrap();
377         })
378     }
379 }
380 
381 mod and_turn {
382     use super::*;
383 
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>),384     fn test<F, G>(spawn: F, dotspawn: G)
385     where
386         F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static,
387         G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>),
388     {
389         let cnt = Rc::new(Cell::new(0));
390         let c = cnt.clone();
391 
392         let mut tokio_current_thread = CurrentThread::new();
393 
394         // Spawn a basic task to get the executor to turn
395         dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(()))));
396 
397         // Turn once...
398         tokio_current_thread.turn(None).unwrap();
399 
400         dotspawn(
401             &mut tokio_current_thread,
402             Box::new(lazy(move || {
403                 c.set(1 + c.get());
404 
405                 // Spawn!
406                 spawn(Box::new(lazy(move || {
407                     c.set(1 + c.get());
408                     Ok::<(), ()>(())
409                 })));
410 
411                 Ok(())
412             })),
413         );
414 
415         // This does not run the newly spawned thread
416         tokio_current_thread.turn(None).unwrap();
417         assert_eq!(1, cnt.get());
418 
419         // This runs the newly spawned thread
420         tokio_current_thread.turn(None).unwrap();
421         assert_eq!(2, cnt.get());
422     }
423 
424     #[test]
spawn()425     fn spawn() {
426         test(tokio_current_thread::spawn, |rt, f| {
427             rt.spawn(f);
428         })
429     }
430 
431     #[test]
execute()432     fn execute() {
433         test(
434             |f| {
435                 tokio_current_thread::TaskExecutor::current()
436                     .execute(f)
437                     .unwrap();
438             },
439             // Note: `CurrentThread` doesn't currently implement
440             // `futures::Executor`, so we'll call `.spawn(...)` rather than
441             // `.execute(...)` for now. If `CurrentThread` is changed to
442             // implement Executor, change this to `.execute(...).unwrap()`.
443             |rt, f| {
444                 rt.spawn(f);
445             },
446         );
447     }
448 }
449 
450 mod in_drop {
451     use super::*;
452     struct OnDrop<F: FnOnce()>(Option<F>);
453 
454     impl<F: FnOnce()> Drop for OnDrop<F> {
drop(&mut self)455         fn drop(&mut self) {
456             (self.0.take().unwrap())();
457         }
458     }
459 
460     struct MyFuture {
461         _data: Box<dyn Any>,
462     }
463 
464     impl Future for MyFuture {
465         type Item = ();
466         type Error = ();
467 
poll(&mut self) -> Poll<(), ()>468         fn poll(&mut self) -> Poll<(), ()> {
469             Ok(().into())
470         }
471     }
472 
test<F, G>(spawn: F, dotspawn: G) where F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static, G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>),473     fn test<F, G>(spawn: F, dotspawn: G)
474     where
475         F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static,
476         G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>),
477     {
478         let mut tokio_current_thread = CurrentThread::new();
479 
480         let (tx, rx) = oneshot::channel();
481 
482         dotspawn(
483             &mut tokio_current_thread,
484             Box::new(MyFuture {
485                 _data: Box::new(OnDrop(Some(move || {
486                     spawn(Box::new(lazy(move || {
487                         tx.send(()).unwrap();
488                         Ok(())
489                     })));
490                 }))),
491             }),
492         );
493 
494         tokio_current_thread.block_on(rx).unwrap();
495         tokio_current_thread.run().unwrap();
496     }
497 
498     #[test]
spawn()499     fn spawn() {
500         test(tokio_current_thread::spawn, |rt, f| {
501             rt.spawn(f);
502         })
503     }
504 
505     #[test]
execute()506     fn execute() {
507         test(
508             |f| {
509                 tokio_current_thread::TaskExecutor::current()
510                     .execute(f)
511                     .unwrap();
512             },
513             // Note: `CurrentThread` doesn't currently implement
514             // `futures::Executor`, so we'll call `.spawn(...)` rather than
515             // `.execute(...)` for now. If `CurrentThread` is changed to
516             // implement Executor, change this to `.execute(...).unwrap()`.
517             |rt, f| {
518                 rt.spawn(f);
519             },
520         );
521     }
522 }
523 
524 #[test]
hammer_turn()525 fn hammer_turn() {
526     use futures::sync::mpsc;
527 
528     const ITER: usize = 100;
529     const N: usize = 100;
530     const THREADS: usize = 4;
531 
532     for _ in 0..ITER {
533         let mut ths = vec![];
534 
535         // Add some jitter
536         for _ in 0..THREADS {
537             let th = thread::spawn(|| {
538                 let mut tokio_current_thread = CurrentThread::new();
539 
540                 let (tx, rx) = mpsc::unbounded();
541 
542                 tokio_current_thread.spawn({
543                     let cnt = Rc::new(Cell::new(0));
544                     let c = cnt.clone();
545 
546                     rx.for_each(move |_| {
547                         c.set(1 + c.get());
548                         Ok(())
549                     })
550                     .map_err(|e| panic!("err={:?}", e))
551                     .map(move |v| {
552                         assert_eq!(N, cnt.get());
553                         v
554                     })
555                 });
556 
557                 thread::spawn(move || {
558                     for _ in 0..N {
559                         tx.unbounded_send(()).unwrap();
560                         thread::yield_now();
561                     }
562                 });
563 
564                 while !tokio_current_thread.is_idle() {
565                     tokio_current_thread.turn(None).unwrap();
566                 }
567             });
568 
569             ths.push(th);
570         }
571 
572         for th in ths {
573             th.join().unwrap();
574         }
575     }
576 }
577 
578 #[test]
turn_has_polled()579 fn turn_has_polled() {
580     let mut tokio_current_thread = CurrentThread::new();
581 
582     // Spawn oneshot receiver
583     let (sender, receiver) = oneshot::channel::<()>();
584     tokio_current_thread.spawn(receiver.then(|_| Ok(())));
585 
586     // Turn once...
587     let res = tokio_current_thread
588         .turn(Some(Duration::from_millis(0)))
589         .unwrap();
590 
591     // Should've polled the receiver once, but considered it not ready
592     assert!(res.has_polled());
593 
594     // Turn another time
595     let res = tokio_current_thread
596         .turn(Some(Duration::from_millis(0)))
597         .unwrap();
598 
599     // Should've polled nothing, the receiver is not ready yet
600     assert!(!res.has_polled());
601 
602     // Make the receiver ready
603     sender.send(()).unwrap();
604 
605     // Turn another time
606     let res = tokio_current_thread
607         .turn(Some(Duration::from_millis(0)))
608         .unwrap();
609 
610     // Should've polled the receiver, it's ready now
611     assert!(res.has_polled());
612 
613     // Now the executor should be empty
614     assert!(tokio_current_thread.is_idle());
615     let res = tokio_current_thread
616         .turn(Some(Duration::from_millis(0)))
617         .unwrap();
618 
619     // So should've polled nothing
620     assert!(!res.has_polled());
621 }
622 
623 // Our own mock Park that is never really waiting and the only
624 // thing it does is to send, on request, something (once) to a oneshot
625 // channel
626 struct MyPark {
627     sender: Option<oneshot::Sender<()>>,
628     send_now: Rc<Cell<bool>>,
629 }
630 
631 struct MyUnpark;
632 
633 impl tokio_executor::park::Park for MyPark {
634     type Unpark = MyUnpark;
635     type Error = ();
636 
unpark(&self) -> Self::Unpark637     fn unpark(&self) -> Self::Unpark {
638         MyUnpark
639     }
640 
park(&mut self) -> Result<(), Self::Error>641     fn park(&mut self) -> Result<(), Self::Error> {
642         // If called twice with send_now, this will intentionally panic
643         if self.send_now.get() {
644             self.sender.take().unwrap().send(()).unwrap();
645         }
646 
647         Ok(())
648     }
649 
park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error>650     fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
651         self.park()
652     }
653 }
654 
655 impl tokio_executor::park::Unpark for MyUnpark {
unpark(&self)656     fn unpark(&self) {}
657 }
658 
659 #[test]
turn_fair()660 fn turn_fair() {
661     let send_now = Rc::new(Cell::new(false));
662 
663     let (sender, receiver) = oneshot::channel::<()>();
664     let (sender_2, receiver_2) = oneshot::channel::<()>();
665     let (sender_3, receiver_3) = oneshot::channel::<()>();
666 
667     let my_park = MyPark {
668         sender: Some(sender_3),
669         send_now: send_now.clone(),
670     };
671 
672     let mut tokio_current_thread = CurrentThread::new_with_park(my_park);
673 
674     let receiver_1_done = Rc::new(Cell::new(false));
675     let receiver_1_done_clone = receiver_1_done.clone();
676 
677     // Once an item is received on the oneshot channel, it will immediately
678     // immediately make the second oneshot channel ready
679     tokio_current_thread.spawn(receiver.map_err(|_| unreachable!()).and_then(move |_| {
680         sender_2.send(()).unwrap();
681         receiver_1_done_clone.set(true);
682 
683         Ok(())
684     }));
685 
686     let receiver_2_done = Rc::new(Cell::new(false));
687     let receiver_2_done_clone = receiver_2_done.clone();
688 
689     tokio_current_thread.spawn(receiver_2.map_err(|_| unreachable!()).and_then(move |_| {
690         receiver_2_done_clone.set(true);
691         Ok(())
692     }));
693 
694     // The third receiver is only woken up from our Park implementation, it simulates
695     // e.g. a socket that first has to be polled to know if it is ready now
696     let receiver_3_done = Rc::new(Cell::new(false));
697     let receiver_3_done_clone = receiver_3_done.clone();
698 
699     tokio_current_thread.spawn(receiver_3.map_err(|_| unreachable!()).and_then(move |_| {
700         receiver_3_done_clone.set(true);
701         Ok(())
702     }));
703 
704     // First turn should've polled both and considered them not ready
705     let res = tokio_current_thread
706         .turn(Some(Duration::from_millis(0)))
707         .unwrap();
708     assert!(res.has_polled());
709 
710     // Next turn should've polled nothing
711     let res = tokio_current_thread
712         .turn(Some(Duration::from_millis(0)))
713         .unwrap();
714     assert!(!res.has_polled());
715 
716     assert!(!receiver_1_done.get());
717     assert!(!receiver_2_done.get());
718     assert!(!receiver_3_done.get());
719 
720     // After this the receiver future will wake up the second receiver future,
721     // so there are pending futures again
722     sender.send(()).unwrap();
723 
724     // Now the first receiver should be done, the second receiver should be ready
725     // to be polled again and the socket not yet
726     let res = tokio_current_thread.turn(None).unwrap();
727     assert!(res.has_polled());
728 
729     assert!(receiver_1_done.get());
730     assert!(!receiver_2_done.get());
731     assert!(!receiver_3_done.get());
732 
733     // Now let our park implementation know that it should send something to sender 3
734     send_now.set(true);
735 
736     // This should resolve the second receiver directly, but also poll the socket
737     // and read the packet from it. If it didn't do both here, we would handle
738     // futures that are woken up from the reactor and directly unfairly and would
739     // favour the ones that are woken up directly.
740     let res = tokio_current_thread.turn(None).unwrap();
741     assert!(res.has_polled());
742 
743     assert!(receiver_1_done.get());
744     assert!(receiver_2_done.get());
745     assert!(receiver_3_done.get());
746 
747     // Don't send again
748     send_now.set(false);
749 
750     // Now we should be idle and turning should not poll anything
751     assert!(tokio_current_thread.is_idle());
752     let res = tokio_current_thread.turn(None).unwrap();
753     assert!(!res.has_polled());
754 }
755 
756 #[test]
spawn_from_other_thread()757 fn spawn_from_other_thread() {
758     let mut current_thread = CurrentThread::new();
759 
760     let handle = current_thread.handle();
761     let (sender, receiver) = oneshot::channel::<()>();
762 
763     thread::spawn(move || {
764         handle
765             .spawn(lazy(move || {
766                 sender.send(()).unwrap();
767                 Ok(())
768             }))
769             .unwrap();
770     });
771 
772     let _ = current_thread.block_on(receiver).unwrap();
773 }
774 
775 #[test]
spawn_from_other_thread_unpark()776 fn spawn_from_other_thread_unpark() {
777     use std::sync::mpsc::channel as mpsc_channel;
778 
779     let mut current_thread = CurrentThread::new();
780 
781     let handle = current_thread.handle();
782     let (sender_1, receiver_1) = oneshot::channel::<()>();
783     let (sender_2, receiver_2) = mpsc_channel::<()>();
784 
785     thread::spawn(move || {
786         let _ = receiver_2.recv().unwrap();
787 
788         handle
789             .spawn(lazy(move || {
790                 sender_1.send(()).unwrap();
791                 Ok(())
792             }))
793             .unwrap();
794     });
795 
796     // Ensure that unparking the executor works correctly. It will first
797     // check if there are new futures (there are none), then execute the
798     // lazy future below which will cause the future to be spawned from
799     // the other thread. Then the executor will park but should be woken
800     // up because *now* we have a new future to schedule
801     let _ = current_thread
802         .block_on(
803             lazy(move || {
804                 sender_2.send(()).unwrap();
805                 Ok(())
806             })
807             .and_then(|_| receiver_1),
808         )
809         .unwrap();
810 }
811 
812 #[test]
spawn_from_executor_with_handle()813 fn spawn_from_executor_with_handle() {
814     let mut current_thread = CurrentThread::new();
815     let handle = current_thread.handle();
816     let (tx, rx) = oneshot::channel();
817 
818     current_thread.spawn(lazy(move || {
819         handle
820             .spawn(lazy(move || {
821                 tx.send(()).unwrap();
822                 Ok(())
823             }))
824             .unwrap();
825         Ok::<_, ()>(())
826     }));
827 
828     current_thread.run();
829 
830     rx.wait().unwrap();
831 }
832 
ok() -> future::FutureResult<(), ()>833 fn ok() -> future::FutureResult<(), ()> {
834     future::ok(())
835 }
836