1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 // Tests to run on both current-thread & therad-pool runtime variants.
6 
7 macro_rules! rt_test {
8     ($($t:tt)*) => {
9         mod basic_scheduler {
10             $($t)*
11 
12             fn rt() -> Runtime {
13                 tokio::runtime::Builder::new()
14                     .basic_scheduler()
15                     .enable_all()
16                     .build()
17                     .unwrap()
18             }
19         }
20 
21         mod threaded_scheduler_4_threads {
22             $($t)*
23 
24             fn rt() -> Runtime {
25                 tokio::runtime::Builder::new()
26                     .threaded_scheduler()
27                     .core_threads(4)
28                     .enable_all()
29                     .build()
30                     .unwrap()
31             }
32         }
33 
34         mod threaded_scheduler_1_thread {
35             $($t)*
36 
37             fn rt() -> Runtime {
38                 tokio::runtime::Builder::new()
39                     .threaded_scheduler()
40                     .core_threads(1)
41                     .enable_all()
42                     .build()
43                     .unwrap()
44             }
45         }
46     }
47 }
48 
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51     use tokio::runtime::Runtime;
52     fn is_send<T: Send + Sync>() {}
53 
54     is_send::<Runtime>();
55 }
56 
57 rt_test! {
58     use tokio::net::{TcpListener, TcpStream, UdpSocket};
59     use tokio::prelude::*;
60     use tokio::runtime::Runtime;
61     use tokio::sync::oneshot;
62     use tokio::{task, time};
63     use tokio_test::{assert_err, assert_ok};
64 
65     use futures::future::poll_fn;
66     use std::future::Future;
67     use std::pin::Pin;
68     use std::sync::{mpsc, Arc};
69     use std::task::{Context, Poll};
70     use std::thread;
71     use std::time::{Duration, Instant};
72 
73     #[test]
74     fn block_on_sync() {
75         let mut rt = rt();
76 
77         let mut win = false;
78         rt.block_on(async {
79             win = true;
80         });
81 
82         assert!(win);
83     }
84 
85     #[test]
86     fn block_on_handle_sync() {
87         let rt = rt();
88 
89         let mut win = false;
90         rt.handle().block_on(async {
91             win = true;
92         });
93 
94         assert!(win);
95     }
96 
97     #[test]
98     fn block_on_async() {
99         let mut rt = rt();
100 
101         let out = rt.block_on(async {
102             let (tx, rx) = oneshot::channel();
103 
104             thread::spawn(move || {
105                 thread::sleep(Duration::from_millis(50));
106                 tx.send("ZOMG").unwrap();
107             });
108 
109             assert_ok!(rx.await)
110         });
111 
112         assert_eq!(out, "ZOMG");
113     }
114 
115     #[test]
116     fn block_on_handle_async() {
117         let rt = rt();
118 
119         let out = rt.handle().block_on(async {
120             let (tx, rx) = oneshot::channel();
121 
122             thread::spawn(move || {
123                 thread::sleep(Duration::from_millis(50));
124                 tx.send("ZOMG").unwrap();
125             });
126 
127             assert_ok!(rx.await)
128         });
129 
130         assert_eq!(out, "ZOMG");
131     }
132 
133     #[test]
134     fn spawn_one_bg() {
135         let mut rt = rt();
136 
137         let out = rt.block_on(async {
138             let (tx, rx) = oneshot::channel();
139 
140             tokio::spawn(async move {
141                 tx.send("ZOMG").unwrap();
142             });
143 
144             assert_ok!(rx.await)
145         });
146 
147         assert_eq!(out, "ZOMG");
148     }
149 
150     #[test]
151     fn spawn_one_join() {
152         let mut rt = rt();
153 
154         let out = rt.block_on(async {
155             let (tx, rx) = oneshot::channel();
156 
157             let handle = tokio::spawn(async move {
158                 tx.send("ZOMG").unwrap();
159                 "DONE"
160             });
161 
162             let msg = assert_ok!(rx.await);
163 
164             let out = assert_ok!(handle.await);
165             assert_eq!(out, "DONE");
166 
167             msg
168         });
169 
170         assert_eq!(out, "ZOMG");
171     }
172 
173     #[test]
174     fn spawn_two() {
175         let mut rt = rt();
176 
177         let out = rt.block_on(async {
178             let (tx1, rx1) = oneshot::channel();
179             let (tx2, rx2) = oneshot::channel();
180 
181             tokio::spawn(async move {
182                 assert_ok!(tx1.send("ZOMG"));
183             });
184 
185             tokio::spawn(async move {
186                 let msg = assert_ok!(rx1.await);
187                 assert_ok!(tx2.send(msg));
188             });
189 
190             assert_ok!(rx2.await)
191         });
192 
193         assert_eq!(out, "ZOMG");
194     }
195 
196     #[test]
197     fn spawn_many_from_block_on() {
198         use tokio::sync::mpsc;
199 
200         const ITER: usize = 200;
201 
202         let mut rt = rt();
203 
204         let out = rt.block_on(async {
205             let (done_tx, mut done_rx) = mpsc::unbounded_channel();
206 
207             let mut txs = (0..ITER)
208                 .map(|i| {
209                     let (tx, rx) = oneshot::channel();
210                     let done_tx = done_tx.clone();
211 
212                     tokio::spawn(async move {
213                         let msg = assert_ok!(rx.await);
214                         assert_eq!(i, msg);
215                         assert_ok!(done_tx.send(msg));
216                     });
217 
218                     tx
219                 })
220                 .collect::<Vec<_>>();
221 
222             drop(done_tx);
223 
224             thread::spawn(move || {
225                 for (i, tx) in txs.drain(..).enumerate() {
226                     assert_ok!(tx.send(i));
227                 }
228             });
229 
230             let mut out = vec![];
231             while let Some(i) = done_rx.recv().await {
232                 out.push(i);
233             }
234 
235             out.sort();
236             out
237         });
238 
239         assert_eq!(ITER, out.len());
240 
241         for i in 0..ITER {
242             assert_eq!(i, out[i]);
243         }
244     }
245 
246     #[test]
247     fn spawn_many_from_task() {
248         use tokio::sync::mpsc;
249 
250         const ITER: usize = 500;
251 
252         let mut rt = rt();
253 
254         let out = rt.block_on(async {
255             tokio::spawn(async move {
256                 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
257 
258                 /*
259                 for _ in 0..100 {
260                     tokio::spawn(async move { });
261                 }
262 
263                 tokio::task::yield_now().await;
264                 */
265 
266                 let mut txs = (0..ITER)
267                     .map(|i| {
268                         let (tx, rx) = oneshot::channel();
269                         let done_tx = done_tx.clone();
270 
271                         tokio::spawn(async move {
272                             let msg = assert_ok!(rx.await);
273                             assert_eq!(i, msg);
274                             assert_ok!(done_tx.send(msg));
275                         });
276 
277                         tx
278                     })
279                     .collect::<Vec<_>>();
280 
281                 drop(done_tx);
282 
283                 thread::spawn(move || {
284                     for (i, tx) in txs.drain(..).enumerate() {
285                         assert_ok!(tx.send(i));
286                     }
287                 });
288 
289                 let mut out = vec![];
290                 while let Some(i) = done_rx.recv().await {
291                     out.push(i);
292                 }
293 
294                 out.sort();
295                 out
296             }).await.unwrap()
297         });
298 
299         assert_eq!(ITER, out.len());
300 
301         for i in 0..ITER {
302             assert_eq!(i, out[i]);
303         }
304     }
305 
306     #[test]
307     fn spawn_await_chain() {
308         let mut rt = rt();
309 
310         let out = rt.block_on(async {
311             assert_ok!(tokio::spawn(async {
312                 assert_ok!(tokio::spawn(async {
313                     "hello"
314                 }).await)
315             }).await)
316         });
317 
318         assert_eq!(out, "hello");
319     }
320 
321     #[test]
322     fn outstanding_tasks_dropped() {
323         let mut rt = rt();
324 
325         let cnt = Arc::new(());
326 
327         rt.block_on(async {
328             let cnt = cnt.clone();
329 
330             tokio::spawn(poll_fn(move |_| {
331                 assert_eq!(2, Arc::strong_count(&cnt));
332                 Poll::<()>::Pending
333             }));
334         });
335 
336         assert_eq!(2, Arc::strong_count(&cnt));
337 
338         drop(rt);
339 
340         assert_eq!(1, Arc::strong_count(&cnt));
341     }
342 
343     #[test]
344     #[should_panic]
345     fn nested_rt() {
346         let mut rt1 = rt();
347         let mut rt2 = rt();
348 
349         rt1.block_on(async { rt2.block_on(async { "hello" }) });
350     }
351 
352     #[test]
353     fn create_rt_in_block_on() {
354         let mut rt1 = rt();
355         let mut rt2 = rt1.block_on(async { rt() });
356         let out = rt2.block_on(async { "ZOMG" });
357 
358         assert_eq!(out, "ZOMG");
359     }
360 
361     #[test]
362     fn complete_block_on_under_load() {
363         let mut rt = rt();
364 
365         rt.block_on(async {
366             let (tx, rx) = oneshot::channel();
367 
368             // Spin hard
369             tokio::spawn(async {
370                 loop {
371                     yield_once().await;
372                 }
373             });
374 
375             thread::spawn(move || {
376                 thread::sleep(Duration::from_millis(50));
377                 assert_ok!(tx.send(()));
378             });
379 
380             assert_ok!(rx.await);
381         });
382     }
383 
384     #[test]
385     fn complete_task_under_load() {
386         let mut rt = rt();
387 
388         rt.block_on(async {
389             let (tx1, rx1) = oneshot::channel();
390             let (tx2, rx2) = oneshot::channel();
391 
392             // Spin hard
393             tokio::spawn(async {
394                 loop {
395                     yield_once().await;
396                 }
397             });
398 
399             thread::spawn(move || {
400                 thread::sleep(Duration::from_millis(50));
401                 assert_ok!(tx1.send(()));
402             });
403 
404             tokio::spawn(async move {
405                 assert_ok!(rx1.await);
406                 assert_ok!(tx2.send(()));
407             });
408 
409             assert_ok!(rx2.await);
410         });
411     }
412 
413     #[test]
414     fn spawn_from_other_thread_idle() {
415         let mut rt = rt();
416         let handle = rt.handle().clone();
417 
418         let (tx, rx) = oneshot::channel();
419 
420         thread::spawn(move || {
421             thread::sleep(Duration::from_millis(50));
422 
423             handle.spawn(async move {
424                 assert_ok!(tx.send(()));
425             });
426         });
427 
428         rt.block_on(async move {
429             assert_ok!(rx.await);
430         });
431     }
432 
433     #[test]
434     fn spawn_from_other_thread_under_load() {
435         let mut rt = rt();
436         let handle = rt.handle().clone();
437 
438         let (tx, rx) = oneshot::channel();
439 
440         thread::spawn(move || {
441             handle.spawn(async move {
442                 assert_ok!(tx.send(()));
443             });
444         });
445 
446         rt.block_on(async move {
447             // Spin hard
448             tokio::spawn(async {
449                 loop {
450                     yield_once().await;
451                 }
452             });
453 
454             assert_ok!(rx.await);
455         });
456     }
457 
458     #[test]
459     fn delay_at_root() {
460         let mut rt = rt();
461 
462         let now = Instant::now();
463         let dur = Duration::from_millis(50);
464 
465         rt.block_on(async move {
466             time::delay_for(dur).await;
467         });
468 
469         assert!(now.elapsed() >= dur);
470     }
471 
472     #[test]
473     fn delay_in_spawn() {
474         let mut rt = rt();
475 
476         let now = Instant::now();
477         let dur = Duration::from_millis(50);
478 
479         rt.block_on(async move {
480             let (tx, rx) = oneshot::channel();
481 
482             tokio::spawn(async move {
483                 time::delay_for(dur).await;
484                 assert_ok!(tx.send(()));
485             });
486 
487             assert_ok!(rx.await);
488         });
489 
490         assert!(now.elapsed() >= dur);
491     }
492 
493     #[test]
494     fn block_on_socket() {
495         let mut rt = rt();
496 
497         rt.block_on(async move {
498             let (tx, rx) = oneshot::channel();
499 
500             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
501             let addr = listener.local_addr().unwrap();
502 
503             tokio::spawn(async move {
504                 let _ = listener.accept().await;
505                 tx.send(()).unwrap();
506             });
507 
508             TcpStream::connect(&addr).await.unwrap();
509             rx.await.unwrap();
510         });
511     }
512 
513     #[test]
514     fn spawn_from_blocking() {
515         let mut rt = rt();
516 
517         let out = rt.block_on(async move {
518             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
519                 tokio::spawn(async move { "hello" })
520             }).await);
521 
522             assert_ok!(inner.await)
523         });
524 
525         assert_eq!(out, "hello")
526     }
527 
528     #[test]
529     fn spawn_blocking_from_blocking() {
530         let mut rt = rt();
531 
532         let out = rt.block_on(async move {
533             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
534                 tokio::task::spawn_blocking(|| "hello")
535             }).await);
536 
537             assert_ok!(inner.await)
538         });
539 
540         assert_eq!(out, "hello")
541     }
542 
543     #[test]
544     fn delay_from_blocking() {
545         let mut rt = rt();
546 
547         rt.block_on(async move {
548             assert_ok!(tokio::task::spawn_blocking(|| {
549                 let now = std::time::Instant::now();
550                 let dur = Duration::from_millis(1);
551 
552                 // use the futures' block_on fn to make sure we aren't setting
553                 // any Tokio context
554                 futures::executor::block_on(async {
555                     tokio::time::delay_for(dur).await;
556                 });
557 
558                 assert!(now.elapsed() >= dur);
559             }).await);
560         });
561     }
562 
563     #[test]
564     fn socket_from_blocking() {
565         let mut rt = rt();
566 
567         rt.block_on(async move {
568             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
569             let addr = assert_ok!(listener.local_addr());
570 
571             let peer = tokio::task::spawn_blocking(move || {
572                 // use the futures' block_on fn to make sure we aren't setting
573                 // any Tokio context
574                 futures::executor::block_on(async {
575                     assert_ok!(TcpStream::connect(addr).await);
576                 });
577             });
578 
579             // Wait for the client to connect
580             let _ = assert_ok!(listener.accept().await);
581 
582             assert_ok!(peer.await);
583         });
584     }
585 
586     #[test]
587     fn spawn_blocking_after_shutdown() {
588         let rt = rt();
589         let handle = rt.handle().clone();
590 
591         // Shutdown
592         drop(rt);
593 
594         handle.enter(|| {
595             let res = task::spawn_blocking(|| unreachable!());
596 
597             // Avoid using a tokio runtime
598             let out = futures::executor::block_on(res);
599             assert!(out.is_err());
600         });
601     }
602 
603     #[test]
604     // IOCP requires setting the "max thread" concurrency value. The sane,
605     // default, is to set this to the number of cores. Threads that poll I/O
606     // become associated with the IOCP handle. Once those threads sleep for any
607     // reason (mutex), they yield their ownership.
608     //
609     // This test hits an edge case on windows where more threads than cores are
610     // created, none of those threads ever yield due to being at capacity, so
611     // IOCP gets "starved".
612     //
613     // For now, this is a very edge case that is probably not a real production
614     // concern. There also isn't a great/obvious solution to take. For now, the
615     // test is disabled.
616     #[cfg(not(windows))]
617     fn io_driver_called_when_under_load() {
618         let mut rt = rt();
619 
620         // Create a lot of constant load. The scheduler will always be busy.
621         for _ in 0..100 {
622             rt.spawn(async {
623                 loop {
624                     tokio::task::yield_now().await;
625                 }
626             });
627         }
628 
629         // Do some I/O work
630         rt.block_on(async {
631             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
632             let addr = assert_ok!(listener.local_addr());
633 
634             let srv = tokio::spawn(async move {
635                 let (mut stream, _) = assert_ok!(listener.accept().await);
636                 assert_ok!(stream.write_all(b"hello world").await);
637             });
638 
639             let cli = tokio::spawn(async move {
640                 let mut stream = assert_ok!(TcpStream::connect(addr).await);
641                 let mut dst = vec![0; 11];
642 
643                 assert_ok!(stream.read_exact(&mut dst).await);
644                 assert_eq!(dst, b"hello world");
645             });
646 
647             assert_ok!(srv.await);
648             assert_ok!(cli.await);
649         });
650     }
651 
652     #[test]
653     fn client_server_block_on() {
654         let mut rt = rt();
655         let (tx, rx) = mpsc::channel();
656 
657         rt.block_on(async move { client_server(tx).await });
658 
659         assert_ok!(rx.try_recv());
660         assert_err!(rx.try_recv());
661     }
662 
663     #[test]
664     fn panic_in_task() {
665         let mut rt = rt();
666         let (tx, rx) = oneshot::channel();
667 
668         struct Boom(Option<oneshot::Sender<()>>);
669 
670         impl Future for Boom {
671             type Output = ();
672 
673             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
674                 panic!();
675             }
676         }
677 
678         impl Drop for Boom {
679             fn drop(&mut self) {
680                 assert!(std::thread::panicking());
681                 self.0.take().unwrap().send(()).unwrap();
682             }
683         }
684 
685         rt.spawn(Boom(Some(tx)));
686         assert_ok!(rt.block_on(rx));
687     }
688 
689     #[test]
690     #[should_panic]
691     fn panic_in_block_on() {
692         let mut rt = rt();
693         rt.block_on(async { panic!() });
694     }
695 
696     async fn yield_once() {
697         let mut yielded = false;
698         poll_fn(|cx| {
699             if yielded {
700                 Poll::Ready(())
701             } else {
702                 yielded = true;
703                 cx.waker().wake_by_ref();
704                 Poll::Pending
705             }
706         })
707         .await
708     }
709 
710     #[test]
711     fn enter_and_spawn() {
712         let mut rt = rt();
713         let handle = rt.enter(|| {
714             tokio::spawn(async {})
715         });
716 
717         assert_ok!(rt.block_on(handle));
718     }
719 
720     #[test]
721     fn eagerly_drops_futures_on_shutdown() {
722         use std::sync::mpsc;
723 
724         struct Never {
725             drop_tx: mpsc::Sender<()>,
726         }
727 
728         impl Future for Never {
729             type Output = ();
730 
731             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
732                 Poll::Pending
733             }
734         }
735 
736         impl Drop for Never {
737             fn drop(&mut self) {
738                 self.drop_tx.send(()).unwrap();
739             }
740         }
741 
742         let mut rt = rt();
743 
744         let (drop_tx, drop_rx) = mpsc::channel();
745         let (run_tx, run_rx) = oneshot::channel();
746 
747         rt.block_on(async move {
748             tokio::spawn(async move {
749                 assert_ok!(run_tx.send(()));
750 
751                 Never { drop_tx }.await
752             });
753 
754             assert_ok!(run_rx.await);
755         });
756 
757         drop(rt);
758 
759         assert_ok!(drop_rx.recv());
760     }
761 
762     #[test]
763     fn wake_while_rt_is_dropping() {
764         use tokio::task;
765 
766         struct OnDrop<F: FnMut()>(F);
767 
768         impl<F: FnMut()> Drop for OnDrop<F> {
769             fn drop(&mut self) {
770                 (self.0)()
771             }
772         }
773 
774         let (tx1, rx1) = oneshot::channel();
775         let (tx2, rx2) = oneshot::channel();
776         let (tx3, rx3) = oneshot::channel();
777 
778         let mut rt = rt();
779 
780         let h1 = rt.handle().clone();
781 
782         rt.handle().spawn(async move {
783             // Ensure a waker gets stored in oneshot 1.
784             let _ = rx1.await;
785             tx3.send(()).unwrap();
786         });
787 
788         rt.handle().spawn(async move {
789             // When this task is dropped, we'll be "closing remotes".
790             // We spawn a new task that owns the `tx1`, to move its Drop
791             // out of here.
792             //
793             // Importantly, the oneshot 1 has a waker already stored, so
794             // the eventual drop here will try to re-schedule again.
795             let mut opt_tx1 = Some(tx1);
796             let _d = OnDrop(move || {
797                 let tx1 = opt_tx1.take().unwrap();
798                 h1.spawn(async move {
799                     tx1.send(()).unwrap();
800                 });
801             });
802             let _ = rx2.await;
803         });
804 
805         rt.handle().spawn(async move {
806             let _ = rx3.await;
807             // We'll never get here, but once task 3 drops, this will
808             // force task 2 to re-schedule since it's waiting on oneshot 2.
809             tx2.send(()).unwrap();
810         });
811 
812         // Tick the loop
813         rt.block_on(async {
814             task::yield_now().await;
815         });
816 
817         // Drop the rt
818         drop(rt);
819     }
820 
821     #[test]
822     fn io_notify_while_shutting_down() {
823         use std::net::Ipv6Addr;
824 
825         for _ in 1..10 {
826             let mut runtime = rt();
827 
828             runtime.block_on(async {
829                 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
830                 let addr = socket.local_addr().unwrap();
831                 let (mut recv_half, mut send_half) = socket.split();
832 
833                 tokio::spawn(async move {
834                     let mut buf = [0];
835                     loop {
836                         recv_half.recv_from(&mut buf).await.unwrap();
837                         std::thread::sleep(Duration::from_millis(2));
838                     }
839                 });
840 
841                 tokio::spawn(async move {
842                     let buf = [0];
843                     loop {
844                         send_half.send_to(&buf, &addr).await.unwrap();
845                         tokio::time::delay_for(Duration::from_millis(1)).await;
846                     }
847                 });
848 
849                 tokio::time::delay_for(Duration::from_millis(5)).await;
850             });
851         }
852     }
853 
854     #[test]
855     fn shutdown_timeout() {
856         let (tx, rx) = oneshot::channel();
857         let mut runtime = rt();
858 
859         runtime.block_on(async move {
860             task::spawn_blocking(move || {
861                 tx.send(()).unwrap();
862                 thread::sleep(Duration::from_secs(10_000));
863             });
864 
865             rx.await.unwrap();
866         });
867 
868         runtime.shutdown_timeout(Duration::from_millis(100));
869     }
870 
871     #[test]
872     fn runtime_in_thread_local() {
873         use std::cell::RefCell;
874         use std::thread;
875 
876         thread_local!(
877             static R: RefCell<Option<Runtime>> = RefCell::new(None);
878         );
879 
880         thread::spawn(|| {
881             R.with(|cell| {
882                 *cell.borrow_mut() = Some(rt());
883             });
884 
885             let _rt = rt();
886         }).join().unwrap();
887     }
888 
889     async fn client_server(tx: mpsc::Sender<()>) {
890         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
891 
892         // Get the assigned address
893         let addr = assert_ok!(server.local_addr());
894 
895         // Spawn the server
896         tokio::spawn(async move {
897             // Accept a socket
898             let (mut socket, _) = server.accept().await.unwrap();
899 
900             // Write some data
901             socket.write_all(b"hello").await.unwrap();
902         });
903 
904         let mut client = TcpStream::connect(&addr).await.unwrap();
905 
906         let mut buf = vec![];
907         client.read_to_end(&mut buf).await.unwrap();
908 
909         assert_eq!(buf, b"hello");
910         tx.send(()).unwrap();
911     }
912 
913     #[test]
914     fn local_set_block_on_socket() {
915         let mut rt = rt();
916         let local = task::LocalSet::new();
917 
918         local.block_on(&mut rt, async move {
919             let (tx, rx) = oneshot::channel();
920 
921             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
922             let addr = listener.local_addr().unwrap();
923 
924             task::spawn_local(async move {
925                 let _ = listener.accept().await;
926                 tx.send(()).unwrap();
927             });
928 
929             TcpStream::connect(&addr).await.unwrap();
930             rx.await.unwrap();
931         });
932     }
933 
934     #[test]
935     fn local_set_client_server_block_on() {
936         let mut rt = rt();
937         let (tx, rx) = mpsc::channel();
938 
939         let local = task::LocalSet::new();
940 
941         local.block_on(&mut rt, async move { client_server_local(tx).await });
942 
943         assert_ok!(rx.try_recv());
944         assert_err!(rx.try_recv());
945     }
946 
947     async fn client_server_local(tx: mpsc::Sender<()>) {
948         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
949 
950         // Get the assigned address
951         let addr = assert_ok!(server.local_addr());
952 
953         // Spawn the server
954         task::spawn_local(async move {
955             // Accept a socket
956             let (mut socket, _) = server.accept().await.unwrap();
957 
958             // Write some data
959             socket.write_all(b"hello").await.unwrap();
960         });
961 
962         let mut client = TcpStream::connect(&addr).await.unwrap();
963 
964         let mut buf = vec![];
965         client.read_to_end(&mut buf).await.unwrap();
966 
967         assert_eq!(buf, b"hello");
968         tx.send(()).unwrap();
969     }
970 
971     #[test]
972     fn coop() {
973         use std::task::Poll::Ready;
974 
975         let mut rt = rt();
976 
977         rt.block_on(async {
978             // Create a bunch of tasks
979             let mut tasks = (0..1_000).map(|_| {
980                 tokio::spawn(async { })
981             }).collect::<Vec<_>>();
982 
983             // Hope that all the tasks complete...
984             time::delay_for(Duration::from_millis(100)).await;
985 
986             poll_fn(|cx| {
987                 // At least one task should not be ready
988                 for task in &mut tasks {
989                     if Pin::new(task).poll(cx).is_pending() {
990                         return Ready(());
991                     }
992                 }
993 
994                 panic!("did not yield");
995             }).await;
996         });
997     }
998 
999     // Tests that the "next task" scheduler optimization is not able to starve
1000     // other tasks.
1001     #[test]
1002     fn ping_pong_saturation() {
1003         use tokio::sync::mpsc;
1004 
1005         const NUM: usize = 100;
1006 
1007         let mut rt = rt();
1008 
1009         rt.block_on(async {
1010             let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1011 
1012             // Spawn a bunch of tasks that ping ping between each other to
1013             // saturate the runtime.
1014             for _ in 0..NUM {
1015                 let (tx1, mut rx1) = mpsc::unbounded_channel();
1016                 let (tx2, mut rx2) = mpsc::unbounded_channel();
1017                 let spawned_tx = spawned_tx.clone();
1018 
1019                 task::spawn(async move {
1020                     spawned_tx.send(()).unwrap();
1021 
1022                     tx1.send(()).unwrap();
1023 
1024                     loop {
1025                         rx2.recv().await.unwrap();
1026                         tx1.send(()).unwrap();
1027                     }
1028                 });
1029 
1030                 task::spawn(async move {
1031                     loop {
1032                         rx1.recv().await.unwrap();
1033                         tx2.send(()).unwrap();
1034                     }
1035                 });
1036             }
1037 
1038             for _ in 0..NUM {
1039                 spawned_rx.recv().await.unwrap();
1040             }
1041 
1042             // spawn another task and wait for it to complete
1043             let handle = task::spawn(async {
1044                 for _ in 0..5 {
1045                     // Yielding forces it back into the local queue.
1046                     task::yield_now().await;
1047                 }
1048             });
1049             handle.await.unwrap();
1050         });
1051     }
1052 }
1053