1 #![allow(clippy::needless_range_loop, clippy::stable_sort_primitive)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 // Tests to run on both current-thread & thread-pool runtime variants.
6 
7 macro_rules! rt_test {
8     ($($t:tt)*) => {
9         mod basic_scheduler {
10             $($t)*
11 
12             fn rt() -> Runtime {
13                 tokio::runtime::Builder::new()
14                     .basic_scheduler()
15                     .enable_all()
16                     .build()
17                     .unwrap()
18             }
19         }
20 
21         mod threaded_scheduler_4_threads {
22             $($t)*
23 
24             fn rt() -> Runtime {
25                 tokio::runtime::Builder::new()
26                     .threaded_scheduler()
27                     .core_threads(4)
28                     .enable_all()
29                     .build()
30                     .unwrap()
31             }
32         }
33 
34         mod threaded_scheduler_1_thread {
35             $($t)*
36 
37             fn rt() -> Runtime {
38                 tokio::runtime::Builder::new()
39                     .threaded_scheduler()
40                     .core_threads(1)
41                     .enable_all()
42                     .build()
43                     .unwrap()
44             }
45         }
46     }
47 }
48 
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51     use tokio::runtime::Runtime;
52     fn is_send<T: Send + Sync>() {}
53 
54     is_send::<Runtime>();
55 }
56 
57 rt_test! {
58     use tokio::net::{TcpListener, TcpStream, UdpSocket};
59     use tokio::prelude::*;
60     use tokio::runtime::Runtime;
61     use tokio::sync::oneshot;
62     use tokio::{task, time};
63     use tokio_test::{assert_err, assert_ok};
64 
65     use futures::future::poll_fn;
66     use std::future::Future;
67     use std::pin::Pin;
68     use std::sync::{mpsc, Arc};
69     use std::task::{Context, Poll};
70     use std::thread;
71     use std::time::{Duration, Instant};
72 
73     #[test]
74     fn block_on_sync() {
75         let mut rt = rt();
76 
77         let mut win = false;
78         rt.block_on(async {
79             win = true;
80         });
81 
82         assert!(win);
83     }
84 
85     #[test]
86     fn block_on_handle_sync() {
87         let rt = rt();
88 
89         let mut win = false;
90         rt.handle().block_on(async {
91             win = true;
92         });
93 
94         assert!(win);
95     }
96 
97     #[test]
98     fn block_on_async() {
99         let mut rt = rt();
100 
101         let out = rt.block_on(async {
102             let (tx, rx) = oneshot::channel();
103 
104             thread::spawn(move || {
105                 thread::sleep(Duration::from_millis(50));
106                 tx.send("ZOMG").unwrap();
107             });
108 
109             assert_ok!(rx.await)
110         });
111 
112         assert_eq!(out, "ZOMG");
113     }
114 
115     #[test]
116     fn block_on_handle_async() {
117         let rt = rt();
118 
119         let out = rt.handle().block_on(async {
120             let (tx, rx) = oneshot::channel();
121 
122             thread::spawn(move || {
123                 thread::sleep(Duration::from_millis(50));
124                 tx.send("ZOMG").unwrap();
125             });
126 
127             assert_ok!(rx.await)
128         });
129 
130         assert_eq!(out, "ZOMG");
131     }
132 
133     #[test]
134     fn spawn_one_bg() {
135         let mut rt = rt();
136 
137         let out = rt.block_on(async {
138             let (tx, rx) = oneshot::channel();
139 
140             tokio::spawn(async move {
141                 tx.send("ZOMG").unwrap();
142             });
143 
144             assert_ok!(rx.await)
145         });
146 
147         assert_eq!(out, "ZOMG");
148     }
149 
150     #[test]
151     fn spawn_one_join() {
152         let mut rt = rt();
153 
154         let out = rt.block_on(async {
155             let (tx, rx) = oneshot::channel();
156 
157             let handle = tokio::spawn(async move {
158                 tx.send("ZOMG").unwrap();
159                 "DONE"
160             });
161 
162             let msg = assert_ok!(rx.await);
163 
164             let out = assert_ok!(handle.await);
165             assert_eq!(out, "DONE");
166 
167             msg
168         });
169 
170         assert_eq!(out, "ZOMG");
171     }
172 
173     #[test]
174     fn spawn_two() {
175         let mut rt = rt();
176 
177         let out = rt.block_on(async {
178             let (tx1, rx1) = oneshot::channel();
179             let (tx2, rx2) = oneshot::channel();
180 
181             tokio::spawn(async move {
182                 assert_ok!(tx1.send("ZOMG"));
183             });
184 
185             tokio::spawn(async move {
186                 let msg = assert_ok!(rx1.await);
187                 assert_ok!(tx2.send(msg));
188             });
189 
190             assert_ok!(rx2.await)
191         });
192 
193         assert_eq!(out, "ZOMG");
194     }
195 
196     #[test]
197     fn spawn_many_from_block_on() {
198         use tokio::sync::mpsc;
199 
200         const ITER: usize = 200;
201 
202         let mut rt = rt();
203 
204         let out = rt.block_on(async {
205             let (done_tx, mut done_rx) = mpsc::unbounded_channel();
206 
207             let mut txs = (0..ITER)
208                 .map(|i| {
209                     let (tx, rx) = oneshot::channel();
210                     let done_tx = done_tx.clone();
211 
212                     tokio::spawn(async move {
213                         let msg = assert_ok!(rx.await);
214                         assert_eq!(i, msg);
215                         assert_ok!(done_tx.send(msg));
216                     });
217 
218                     tx
219                 })
220                 .collect::<Vec<_>>();
221 
222             drop(done_tx);
223 
224             thread::spawn(move || {
225                 for (i, tx) in txs.drain(..).enumerate() {
226                     assert_ok!(tx.send(i));
227                 }
228             });
229 
230             let mut out = vec![];
231             while let Some(i) = done_rx.recv().await {
232                 out.push(i);
233             }
234 
235             out.sort();
236             out
237         });
238 
239         assert_eq!(ITER, out.len());
240 
241         for i in 0..ITER {
242             assert_eq!(i, out[i]);
243         }
244     }
245 
246     #[test]
247     fn spawn_many_from_task() {
248         use tokio::sync::mpsc;
249 
250         const ITER: usize = 500;
251 
252         let mut rt = rt();
253 
254         let out = rt.block_on(async {
255             tokio::spawn(async move {
256                 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
257 
258                 /*
259                 for _ in 0..100 {
260                     tokio::spawn(async move { });
261                 }
262 
263                 tokio::task::yield_now().await;
264                 */
265 
266                 let mut txs = (0..ITER)
267                     .map(|i| {
268                         let (tx, rx) = oneshot::channel();
269                         let done_tx = done_tx.clone();
270 
271                         tokio::spawn(async move {
272                             let msg = assert_ok!(rx.await);
273                             assert_eq!(i, msg);
274                             assert_ok!(done_tx.send(msg));
275                         });
276 
277                         tx
278                     })
279                     .collect::<Vec<_>>();
280 
281                 drop(done_tx);
282 
283                 thread::spawn(move || {
284                     for (i, tx) in txs.drain(..).enumerate() {
285                         assert_ok!(tx.send(i));
286                     }
287                 });
288 
289                 let mut out = vec![];
290                 while let Some(i) = done_rx.recv().await {
291                     out.push(i);
292                 }
293 
294                 out.sort();
295                 out
296             }).await.unwrap()
297         });
298 
299         assert_eq!(ITER, out.len());
300 
301         for i in 0..ITER {
302             assert_eq!(i, out[i]);
303         }
304     }
305 
306     #[test]
307     fn spawn_await_chain() {
308         let mut rt = rt();
309 
310         let out = rt.block_on(async {
311             assert_ok!(tokio::spawn(async {
312                 assert_ok!(tokio::spawn(async {
313                     "hello"
314                 }).await)
315             }).await)
316         });
317 
318         assert_eq!(out, "hello");
319     }
320 
321     #[test]
322     fn outstanding_tasks_dropped() {
323         let mut rt = rt();
324 
325         let cnt = Arc::new(());
326 
327         rt.block_on(async {
328             let cnt = cnt.clone();
329 
330             tokio::spawn(poll_fn(move |_| {
331                 assert_eq!(2, Arc::strong_count(&cnt));
332                 Poll::<()>::Pending
333             }));
334         });
335 
336         assert_eq!(2, Arc::strong_count(&cnt));
337 
338         drop(rt);
339 
340         assert_eq!(1, Arc::strong_count(&cnt));
341     }
342 
343     #[test]
344     #[should_panic]
345     fn nested_rt() {
346         let mut rt1 = rt();
347         let mut rt2 = rt();
348 
349         rt1.block_on(async { rt2.block_on(async { "hello" }) });
350     }
351 
352     #[test]
353     fn create_rt_in_block_on() {
354         let mut rt1 = rt();
355         let mut rt2 = rt1.block_on(async { rt() });
356         let out = rt2.block_on(async { "ZOMG" });
357 
358         assert_eq!(out, "ZOMG");
359     }
360 
361     #[test]
362     fn complete_block_on_under_load() {
363         let mut rt = rt();
364 
365         rt.block_on(async {
366             let (tx, rx) = oneshot::channel();
367 
368             // Spin hard
369             tokio::spawn(async {
370                 loop {
371                     yield_once().await;
372                 }
373             });
374 
375             thread::spawn(move || {
376                 thread::sleep(Duration::from_millis(50));
377                 assert_ok!(tx.send(()));
378             });
379 
380             assert_ok!(rx.await);
381         });
382     }
383 
384     #[test]
385     fn complete_task_under_load() {
386         let mut rt = rt();
387 
388         rt.block_on(async {
389             let (tx1, rx1) = oneshot::channel();
390             let (tx2, rx2) = oneshot::channel();
391 
392             // Spin hard
393             tokio::spawn(async {
394                 loop {
395                     yield_once().await;
396                 }
397             });
398 
399             thread::spawn(move || {
400                 thread::sleep(Duration::from_millis(50));
401                 assert_ok!(tx1.send(()));
402             });
403 
404             tokio::spawn(async move {
405                 assert_ok!(rx1.await);
406                 assert_ok!(tx2.send(()));
407             });
408 
409             assert_ok!(rx2.await);
410         });
411     }
412 
413     #[test]
414     fn spawn_from_other_thread_idle() {
415         let mut rt = rt();
416         let handle = rt.handle().clone();
417 
418         let (tx, rx) = oneshot::channel();
419 
420         thread::spawn(move || {
421             thread::sleep(Duration::from_millis(50));
422 
423             handle.spawn(async move {
424                 assert_ok!(tx.send(()));
425             });
426         });
427 
428         rt.block_on(async move {
429             assert_ok!(rx.await);
430         });
431     }
432 
433     #[test]
434     fn spawn_from_other_thread_under_load() {
435         let mut rt = rt();
436         let handle = rt.handle().clone();
437 
438         let (tx, rx) = oneshot::channel();
439 
440         thread::spawn(move || {
441             handle.spawn(async move {
442                 assert_ok!(tx.send(()));
443             });
444         });
445 
446         rt.block_on(async move {
447             // Spin hard
448             tokio::spawn(async {
449                 loop {
450                     yield_once().await;
451                 }
452             });
453 
454             assert_ok!(rx.await);
455         });
456     }
457 
458     #[test]
459     fn delay_at_root() {
460         let mut rt = rt();
461 
462         let now = Instant::now();
463         let dur = Duration::from_millis(50);
464 
465         rt.block_on(async move {
466             time::delay_for(dur).await;
467         });
468 
469         assert!(now.elapsed() >= dur);
470     }
471 
472     #[test]
473     fn delay_in_spawn() {
474         let mut rt = rt();
475 
476         let now = Instant::now();
477         let dur = Duration::from_millis(50);
478 
479         rt.block_on(async move {
480             let (tx, rx) = oneshot::channel();
481 
482             tokio::spawn(async move {
483                 time::delay_for(dur).await;
484                 assert_ok!(tx.send(()));
485             });
486 
487             assert_ok!(rx.await);
488         });
489 
490         assert!(now.elapsed() >= dur);
491     }
492 
493     #[test]
494     fn block_on_socket() {
495         let mut rt = rt();
496 
497         rt.block_on(async move {
498             let (tx, rx) = oneshot::channel();
499 
500             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
501             let addr = listener.local_addr().unwrap();
502 
503             tokio::spawn(async move {
504                 let _ = listener.accept().await;
505                 tx.send(()).unwrap();
506             });
507 
508             TcpStream::connect(&addr).await.unwrap();
509             rx.await.unwrap();
510         });
511     }
512 
513     #[test]
514     fn spawn_from_blocking() {
515         let mut rt = rt();
516 
517         let out = rt.block_on(async move {
518             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
519                 tokio::spawn(async move { "hello" })
520             }).await);
521 
522             assert_ok!(inner.await)
523         });
524 
525         assert_eq!(out, "hello")
526     }
527 
528     #[test]
529     fn spawn_blocking_from_blocking() {
530         let mut rt = rt();
531 
532         let out = rt.block_on(async move {
533             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
534                 tokio::task::spawn_blocking(|| "hello")
535             }).await);
536 
537             assert_ok!(inner.await)
538         });
539 
540         assert_eq!(out, "hello")
541     }
542 
543     #[test]
544     fn delay_from_blocking() {
545         let mut rt = rt();
546 
547         rt.block_on(async move {
548             assert_ok!(tokio::task::spawn_blocking(|| {
549                 let now = std::time::Instant::now();
550                 let dur = Duration::from_millis(1);
551 
552                 // use the futures' block_on fn to make sure we aren't setting
553                 // any Tokio context
554                 futures::executor::block_on(async {
555                     tokio::time::delay_for(dur).await;
556                 });
557 
558                 assert!(now.elapsed() >= dur);
559             }).await);
560         });
561     }
562 
563     #[test]
564     fn socket_from_blocking() {
565         let mut rt = rt();
566 
567         rt.block_on(async move {
568             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
569             let addr = assert_ok!(listener.local_addr());
570 
571             let peer = tokio::task::spawn_blocking(move || {
572                 // use the futures' block_on fn to make sure we aren't setting
573                 // any Tokio context
574                 futures::executor::block_on(async {
575                     assert_ok!(TcpStream::connect(addr).await);
576                 });
577             });
578 
579             // Wait for the client to connect
580             let _ = assert_ok!(listener.accept().await);
581 
582             assert_ok!(peer.await);
583         });
584     }
585 
586     #[test]
587     fn spawn_blocking_after_shutdown() {
588         let rt = rt();
589         let handle = rt.handle().clone();
590 
591         // Shutdown
592         drop(rt);
593 
594         handle.enter(|| {
595             let res = task::spawn_blocking(|| unreachable!());
596 
597             // Avoid using a tokio runtime
598             let out = futures::executor::block_on(res);
599             assert!(out.is_err());
600         });
601     }
602 
603     #[test]
604     // IOCP requires setting the "max thread" concurrency value. The sane,
605     // default, is to set this to the number of cores. Threads that poll I/O
606     // become associated with the IOCP handle. Once those threads sleep for any
607     // reason (mutex), they yield their ownership.
608     //
609     // This test hits an edge case on windows where more threads than cores are
610     // created, none of those threads ever yield due to being at capacity, so
611     // IOCP gets "starved".
612     //
613     // For now, this is a very edge case that is probably not a real production
614     // concern. There also isn't a great/obvious solution to take. For now, the
615     // test is disabled.
616     #[cfg(not(windows))]
617     fn io_driver_called_when_under_load() {
618         let mut rt = rt();
619 
620         // Create a lot of constant load. The scheduler will always be busy.
621         for _ in 0..100 {
622             rt.spawn(async {
623                 loop {
624                     tokio::task::yield_now().await;
625                 }
626             });
627         }
628 
629         // Do some I/O work
630         rt.block_on(async {
631             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
632             let addr = assert_ok!(listener.local_addr());
633 
634             let srv = tokio::spawn(async move {
635                 let (mut stream, _) = assert_ok!(listener.accept().await);
636                 assert_ok!(stream.write_all(b"hello world").await);
637             });
638 
639             let cli = tokio::spawn(async move {
640                 let mut stream = assert_ok!(TcpStream::connect(addr).await);
641                 let mut dst = vec![0; 11];
642 
643                 assert_ok!(stream.read_exact(&mut dst).await);
644                 assert_eq!(dst, b"hello world");
645             });
646 
647             assert_ok!(srv.await);
648             assert_ok!(cli.await);
649         });
650     }
651 
652     #[test]
653     fn client_server_block_on() {
654         let mut rt = rt();
655         let (tx, rx) = mpsc::channel();
656 
657         rt.block_on(async move { client_server(tx).await });
658 
659         assert_ok!(rx.try_recv());
660         assert_err!(rx.try_recv());
661     }
662 
663     #[test]
664     fn panic_in_task() {
665         let mut rt = rt();
666         let (tx, rx) = oneshot::channel();
667 
668         struct Boom(Option<oneshot::Sender<()>>);
669 
670         impl Future for Boom {
671             type Output = ();
672 
673             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
674                 panic!();
675             }
676         }
677 
678         impl Drop for Boom {
679             fn drop(&mut self) {
680                 assert!(std::thread::panicking());
681                 self.0.take().unwrap().send(()).unwrap();
682             }
683         }
684 
685         rt.spawn(Boom(Some(tx)));
686         assert_ok!(rt.block_on(rx));
687     }
688 
689     #[test]
690     #[should_panic]
691     fn panic_in_block_on() {
692         let mut rt = rt();
693         rt.block_on(async { panic!() });
694     }
695 
696     async fn yield_once() {
697         let mut yielded = false;
698         poll_fn(|cx| {
699             if yielded {
700                 Poll::Ready(())
701             } else {
702                 yielded = true;
703                 cx.waker().wake_by_ref();
704                 Poll::Pending
705             }
706         })
707         .await
708     }
709 
710     #[test]
711     fn enter_and_spawn() {
712         let mut rt = rt();
713         let handle = rt.enter(|| {
714             tokio::spawn(async {})
715         });
716 
717         assert_ok!(rt.block_on(handle));
718     }
719 
720     #[test]
721     fn eagerly_drops_futures_on_shutdown() {
722         use std::sync::mpsc;
723 
724         struct Never {
725             drop_tx: mpsc::Sender<()>,
726         }
727 
728         impl Future for Never {
729             type Output = ();
730 
731             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
732                 Poll::Pending
733             }
734         }
735 
736         impl Drop for Never {
737             fn drop(&mut self) {
738                 self.drop_tx.send(()).unwrap();
739             }
740         }
741 
742         let mut rt = rt();
743 
744         let (drop_tx, drop_rx) = mpsc::channel();
745         let (run_tx, run_rx) = oneshot::channel();
746 
747         rt.block_on(async move {
748             tokio::spawn(async move {
749                 assert_ok!(run_tx.send(()));
750 
751                 Never { drop_tx }.await
752             });
753 
754             assert_ok!(run_rx.await);
755         });
756 
757         drop(rt);
758 
759         assert_ok!(drop_rx.recv());
760     }
761 
762     #[test]
763     fn wake_while_rt_is_dropping() {
764         use tokio::task;
765 
766         struct OnDrop<F: FnMut()>(F);
767 
768         impl<F: FnMut()> Drop for OnDrop<F> {
769             fn drop(&mut self) {
770                 (self.0)()
771             }
772         }
773 
774         let (tx1, rx1) = oneshot::channel();
775         let (tx2, rx2) = oneshot::channel();
776         let (tx3, rx3) = oneshot::channel();
777 
778         let mut rt = rt();
779 
780         let h1 = rt.handle().clone();
781 
782         rt.handle().spawn(async move {
783             // Ensure a waker gets stored in oneshot 1.
784             let _ = rx1.await;
785             tx3.send(()).unwrap();
786         });
787 
788         rt.handle().spawn(async move {
789             // When this task is dropped, we'll be "closing remotes".
790             // We spawn a new task that owns the `tx1`, to move its Drop
791             // out of here.
792             //
793             // Importantly, the oneshot 1 has a waker already stored, so
794             // the eventual drop here will try to re-schedule again.
795             let mut opt_tx1 = Some(tx1);
796             let _d = OnDrop(move || {
797                 let tx1 = opt_tx1.take().unwrap();
798                 h1.spawn(async move {
799                     tx1.send(()).unwrap();
800                 });
801             });
802             let _ = rx2.await;
803         });
804 
805         rt.handle().spawn(async move {
806             let _ = rx3.await;
807             // We'll never get here, but once task 3 drops, this will
808             // force task 2 to re-schedule since it's waiting on oneshot 2.
809             tx2.send(()).unwrap();
810         });
811 
812         // Tick the loop
813         rt.block_on(async {
814             task::yield_now().await;
815         });
816 
817         // Drop the rt
818         drop(rt);
819     }
820 
821     #[test]
822     fn io_notify_while_shutting_down() {
823         use std::net::Ipv6Addr;
824 
825         for _ in 1..10 {
826             let mut runtime = rt();
827 
828             runtime.block_on(async {
829                 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
830                 let addr = socket.local_addr().unwrap();
831                 let (mut recv_half, mut send_half) = socket.split();
832 
833                 tokio::spawn(async move {
834                     let mut buf = [0];
835                     loop {
836                         recv_half.recv_from(&mut buf).await.unwrap();
837                         std::thread::sleep(Duration::from_millis(2));
838                     }
839                 });
840 
841                 tokio::spawn(async move {
842                     let buf = [0];
843                     loop {
844                         send_half.send_to(&buf, &addr).await.unwrap();
845                         tokio::time::delay_for(Duration::from_millis(1)).await;
846                     }
847                 });
848 
849                 tokio::time::delay_for(Duration::from_millis(5)).await;
850             });
851         }
852     }
853 
854     #[test]
855     fn shutdown_timeout() {
856         let (tx, rx) = oneshot::channel();
857         let mut runtime = rt();
858 
859         runtime.block_on(async move {
860             task::spawn_blocking(move || {
861                 tx.send(()).unwrap();
862                 thread::sleep(Duration::from_secs(10_000));
863             });
864 
865             rx.await.unwrap();
866         });
867 
868         runtime.shutdown_timeout(Duration::from_millis(100));
869     }
870 
871     #[test]
872     fn shutdown_wakeup_time() {
873         let mut runtime = rt();
874 
875         runtime.block_on(async move {
876             tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
877         });
878 
879         runtime.shutdown_timeout(Duration::from_secs(10_000));
880     }
881 
882     // This test is currently ignored on Windows because of a
883     // rust-lang issue in thread local storage destructors.
884     // See https://github.com/rust-lang/rust/issues/74875
885     #[test]
886     #[cfg(not(windows))]
887     fn runtime_in_thread_local() {
888         use std::cell::RefCell;
889         use std::thread;
890 
891         thread_local!(
892             static R: RefCell<Option<Runtime>> = RefCell::new(None);
893         );
894 
895         thread::spawn(|| {
896             R.with(|cell| {
897                 *cell.borrow_mut() = Some(rt());
898             });
899 
900             let _rt = rt();
901         }).join().unwrap();
902     }
903 
904     async fn client_server(tx: mpsc::Sender<()>) {
905         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
906 
907         // Get the assigned address
908         let addr = assert_ok!(server.local_addr());
909 
910         // Spawn the server
911         tokio::spawn(async move {
912             // Accept a socket
913             let (mut socket, _) = server.accept().await.unwrap();
914 
915             // Write some data
916             socket.write_all(b"hello").await.unwrap();
917         });
918 
919         let mut client = TcpStream::connect(&addr).await.unwrap();
920 
921         let mut buf = vec![];
922         client.read_to_end(&mut buf).await.unwrap();
923 
924         assert_eq!(buf, b"hello");
925         tx.send(()).unwrap();
926     }
927 
928     #[test]
929     fn local_set_block_on_socket() {
930         let mut rt = rt();
931         let local = task::LocalSet::new();
932 
933         local.block_on(&mut rt, async move {
934             let (tx, rx) = oneshot::channel();
935 
936             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
937             let addr = listener.local_addr().unwrap();
938 
939             task::spawn_local(async move {
940                 let _ = listener.accept().await;
941                 tx.send(()).unwrap();
942             });
943 
944             TcpStream::connect(&addr).await.unwrap();
945             rx.await.unwrap();
946         });
947     }
948 
949     #[test]
950     fn local_set_client_server_block_on() {
951         let mut rt = rt();
952         let (tx, rx) = mpsc::channel();
953 
954         let local = task::LocalSet::new();
955 
956         local.block_on(&mut rt, async move { client_server_local(tx).await });
957 
958         assert_ok!(rx.try_recv());
959         assert_err!(rx.try_recv());
960     }
961 
962     async fn client_server_local(tx: mpsc::Sender<()>) {
963         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
964 
965         // Get the assigned address
966         let addr = assert_ok!(server.local_addr());
967 
968         // Spawn the server
969         task::spawn_local(async move {
970             // Accept a socket
971             let (mut socket, _) = server.accept().await.unwrap();
972 
973             // Write some data
974             socket.write_all(b"hello").await.unwrap();
975         });
976 
977         let mut client = TcpStream::connect(&addr).await.unwrap();
978 
979         let mut buf = vec![];
980         client.read_to_end(&mut buf).await.unwrap();
981 
982         assert_eq!(buf, b"hello");
983         tx.send(()).unwrap();
984     }
985 
986     #[test]
987     fn coop() {
988         use std::task::Poll::Ready;
989 
990         let mut rt = rt();
991 
992         rt.block_on(async {
993             // Create a bunch of tasks
994             let mut tasks = (0..1_000).map(|_| {
995                 tokio::spawn(async { })
996             }).collect::<Vec<_>>();
997 
998             // Hope that all the tasks complete...
999             time::delay_for(Duration::from_millis(100)).await;
1000 
1001             poll_fn(|cx| {
1002                 // At least one task should not be ready
1003                 for task in &mut tasks {
1004                     if Pin::new(task).poll(cx).is_pending() {
1005                         return Ready(());
1006                     }
1007                 }
1008 
1009                 panic!("did not yield");
1010             }).await;
1011         });
1012     }
1013 
1014     // Tests that the "next task" scheduler optimization is not able to starve
1015     // other tasks.
1016     #[test]
1017     fn ping_pong_saturation() {
1018         use tokio::sync::mpsc;
1019 
1020         const NUM: usize = 100;
1021 
1022         let mut rt = rt();
1023 
1024         rt.block_on(async {
1025             let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1026 
1027             // Spawn a bunch of tasks that ping ping between each other to
1028             // saturate the runtime.
1029             for _ in 0..NUM {
1030                 let (tx1, mut rx1) = mpsc::unbounded_channel();
1031                 let (tx2, mut rx2) = mpsc::unbounded_channel();
1032                 let spawned_tx = spawned_tx.clone();
1033 
1034                 task::spawn(async move {
1035                     spawned_tx.send(()).unwrap();
1036 
1037                     tx1.send(()).unwrap();
1038 
1039                     loop {
1040                         rx2.recv().await.unwrap();
1041                         tx1.send(()).unwrap();
1042                     }
1043                 });
1044 
1045                 task::spawn(async move {
1046                     loop {
1047                         rx1.recv().await.unwrap();
1048                         tx2.send(()).unwrap();
1049                     }
1050                 });
1051             }
1052 
1053             for _ in 0..NUM {
1054                 spawned_rx.recv().await.unwrap();
1055             }
1056 
1057             // spawn another task and wait for it to complete
1058             let handle = task::spawn(async {
1059                 for _ in 0..5 {
1060                     // Yielding forces it back into the local queue.
1061                     task::yield_now().await;
1062                 }
1063             });
1064             handle.await.unwrap();
1065         });
1066     }
1067 }
1068