1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 // Tests to run on both current-thread & therad-pool runtime variants.
6 
7 macro_rules! rt_test {
8     ($($t:tt)*) => {
9         mod basic_scheduler {
10             $($t)*
11 
12             fn rt() -> Runtime {
13                 tokio::runtime::Builder::new()
14                     .basic_scheduler()
15                     .enable_all()
16                     .build()
17                     .unwrap()
18             }
19         }
20 
21         mod threaded_scheduler_4_threads {
22             $($t)*
23 
24             fn rt() -> Runtime {
25                 tokio::runtime::Builder::new()
26                     .threaded_scheduler()
27                     .core_threads(4)
28                     .enable_all()
29                     .build()
30                     .unwrap()
31             }
32         }
33 
34         mod threaded_scheduler_1_thread {
35             $($t)*
36 
37             fn rt() -> Runtime {
38                 tokio::runtime::Builder::new()
39                     .threaded_scheduler()
40                     .core_threads(1)
41                     .enable_all()
42                     .build()
43                     .unwrap()
44             }
45         }
46     }
47 }
48 
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51     use tokio::runtime::Runtime;
52     fn is_send<T: Send + Sync>() {}
53 
54     is_send::<Runtime>();
55 }
56 
57 rt_test! {
58     use tokio::net::{TcpListener, TcpStream, UdpSocket};
59     use tokio::prelude::*;
60     use tokio::runtime::Runtime;
61     use tokio::sync::oneshot;
62     use tokio::{task, time};
63     use tokio_test::{assert_err, assert_ok};
64 
65     use futures::future::poll_fn;
66     use std::future::Future;
67     use std::pin::Pin;
68     use std::sync::{mpsc, Arc};
69     use std::task::{Context, Poll};
70     use std::thread;
71     use std::time::{Duration, Instant};
72 
73     #[test]
74     fn block_on_sync() {
75         let mut rt = rt();
76 
77         let mut win = false;
78         rt.block_on(async {
79             win = true;
80         });
81 
82         assert!(win);
83     }
84 
85     #[test]
86     fn block_on_handle_sync() {
87         let rt = rt();
88 
89         let mut win = false;
90         rt.handle().block_on(async {
91             win = true;
92         });
93 
94         assert!(win);
95     }
96 
97     #[test]
98     fn block_on_async() {
99         let mut rt = rt();
100 
101         let out = rt.block_on(async {
102             let (tx, rx) = oneshot::channel();
103 
104             thread::spawn(move || {
105                 thread::sleep(Duration::from_millis(50));
106                 tx.send("ZOMG").unwrap();
107             });
108 
109             assert_ok!(rx.await)
110         });
111 
112         assert_eq!(out, "ZOMG");
113     }
114 
115     #[test]
116     fn block_on_handle_async() {
117         let rt = rt();
118 
119         let out = rt.handle().block_on(async {
120             let (tx, rx) = oneshot::channel();
121 
122             thread::spawn(move || {
123                 thread::sleep(Duration::from_millis(50));
124                 tx.send("ZOMG").unwrap();
125             });
126 
127             assert_ok!(rx.await)
128         });
129 
130         assert_eq!(out, "ZOMG");
131     }
132 
133     #[test]
134     fn spawn_one_bg() {
135         let mut rt = rt();
136 
137         let out = rt.block_on(async {
138             let (tx, rx) = oneshot::channel();
139 
140             tokio::spawn(async move {
141                 tx.send("ZOMG").unwrap();
142             });
143 
144             assert_ok!(rx.await)
145         });
146 
147         assert_eq!(out, "ZOMG");
148     }
149 
150     #[test]
151     fn spawn_one_join() {
152         let mut rt = rt();
153 
154         let out = rt.block_on(async {
155             let (tx, rx) = oneshot::channel();
156 
157             let handle = tokio::spawn(async move {
158                 tx.send("ZOMG").unwrap();
159                 "DONE"
160             });
161 
162             let msg = assert_ok!(rx.await);
163 
164             let out = assert_ok!(handle.await);
165             assert_eq!(out, "DONE");
166 
167             msg
168         });
169 
170         assert_eq!(out, "ZOMG");
171     }
172 
173     #[test]
174     fn spawn_two() {
175         let mut rt = rt();
176 
177         let out = rt.block_on(async {
178             let (tx1, rx1) = oneshot::channel();
179             let (tx2, rx2) = oneshot::channel();
180 
181             tokio::spawn(async move {
182                 assert_ok!(tx1.send("ZOMG"));
183             });
184 
185             tokio::spawn(async move {
186                 let msg = assert_ok!(rx1.await);
187                 assert_ok!(tx2.send(msg));
188             });
189 
190             assert_ok!(rx2.await)
191         });
192 
193         assert_eq!(out, "ZOMG");
194     }
195 
196     #[test]
197     fn spawn_many_from_block_on() {
198         use tokio::sync::mpsc;
199 
200         const ITER: usize = 200;
201 
202         let mut rt = rt();
203 
204         let out = rt.block_on(async {
205             let (done_tx, mut done_rx) = mpsc::unbounded_channel();
206 
207             let mut txs = (0..ITER)
208                 .map(|i| {
209                     let (tx, rx) = oneshot::channel();
210                     let done_tx = done_tx.clone();
211 
212                     tokio::spawn(async move {
213                         let msg = assert_ok!(rx.await);
214                         assert_eq!(i, msg);
215                         assert_ok!(done_tx.send(msg));
216                     });
217 
218                     tx
219                 })
220                 .collect::<Vec<_>>();
221 
222             drop(done_tx);
223 
224             thread::spawn(move || {
225                 for (i, tx) in txs.drain(..).enumerate() {
226                     assert_ok!(tx.send(i));
227                 }
228             });
229 
230             let mut out = vec![];
231             while let Some(i) = done_rx.recv().await {
232                 out.push(i);
233             }
234 
235             out.sort();
236             out
237         });
238 
239         assert_eq!(ITER, out.len());
240 
241         for i in 0..ITER {
242             assert_eq!(i, out[i]);
243         }
244     }
245 
246     #[test]
247     fn spawn_many_from_task() {
248         use tokio::sync::mpsc;
249 
250         const ITER: usize = 500;
251 
252         let mut rt = rt();
253 
254         let out = rt.block_on(async {
255             tokio::spawn(async move {
256                 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
257 
258                 /*
259                 for _ in 0..100 {
260                     tokio::spawn(async move { });
261                 }
262 
263                 tokio::task::yield_now().await;
264                 */
265 
266                 let mut txs = (0..ITER)
267                     .map(|i| {
268                         let (tx, rx) = oneshot::channel();
269                         let done_tx = done_tx.clone();
270 
271                         tokio::spawn(async move {
272                             let msg = assert_ok!(rx.await);
273                             assert_eq!(i, msg);
274                             assert_ok!(done_tx.send(msg));
275                         });
276 
277                         tx
278                     })
279                     .collect::<Vec<_>>();
280 
281                 drop(done_tx);
282 
283                 thread::spawn(move || {
284                     for (i, tx) in txs.drain(..).enumerate() {
285                         assert_ok!(tx.send(i));
286                     }
287                 });
288 
289                 let mut out = vec![];
290                 while let Some(i) = done_rx.recv().await {
291                     out.push(i);
292                 }
293 
294                 out.sort();
295                 out
296             }).await.unwrap()
297         });
298 
299         assert_eq!(ITER, out.len());
300 
301         for i in 0..ITER {
302             assert_eq!(i, out[i]);
303         }
304     }
305 
306     #[test]
307     fn spawn_await_chain() {
308         let mut rt = rt();
309 
310         let out = rt.block_on(async {
311             assert_ok!(tokio::spawn(async {
312                 assert_ok!(tokio::spawn(async {
313                     "hello"
314                 }).await)
315             }).await)
316         });
317 
318         assert_eq!(out, "hello");
319     }
320 
321     #[test]
322     fn outstanding_tasks_dropped() {
323         let mut rt = rt();
324 
325         let cnt = Arc::new(());
326 
327         rt.block_on(async {
328             let cnt = cnt.clone();
329 
330             tokio::spawn(poll_fn(move |_| {
331                 assert_eq!(2, Arc::strong_count(&cnt));
332                 Poll::<()>::Pending
333             }));
334         });
335 
336         assert_eq!(2, Arc::strong_count(&cnt));
337 
338         drop(rt);
339 
340         assert_eq!(1, Arc::strong_count(&cnt));
341     }
342 
343     #[test]
344     #[should_panic]
345     fn nested_rt() {
346         let mut rt1 = rt();
347         let mut rt2 = rt();
348 
349         rt1.block_on(async { rt2.block_on(async { "hello" }) });
350     }
351 
352     #[test]
353     fn create_rt_in_block_on() {
354         let mut rt1 = rt();
355         let mut rt2 = rt1.block_on(async { rt() });
356         let out = rt2.block_on(async { "ZOMG" });
357 
358         assert_eq!(out, "ZOMG");
359     }
360 
361     #[test]
362     fn complete_block_on_under_load() {
363         let mut rt = rt();
364 
365         rt.block_on(async {
366             let (tx, rx) = oneshot::channel();
367 
368             // Spin hard
369             tokio::spawn(async {
370                 loop {
371                     yield_once().await;
372                 }
373             });
374 
375             thread::spawn(move || {
376                 thread::sleep(Duration::from_millis(50));
377                 assert_ok!(tx.send(()));
378             });
379 
380             assert_ok!(rx.await);
381         });
382     }
383 
384     #[test]
385     fn complete_task_under_load() {
386         let mut rt = rt();
387 
388         rt.block_on(async {
389             let (tx1, rx1) = oneshot::channel();
390             let (tx2, rx2) = oneshot::channel();
391 
392             // Spin hard
393             tokio::spawn(async {
394                 loop {
395                     yield_once().await;
396                 }
397             });
398 
399             thread::spawn(move || {
400                 thread::sleep(Duration::from_millis(50));
401                 assert_ok!(tx1.send(()));
402             });
403 
404             tokio::spawn(async move {
405                 assert_ok!(rx1.await);
406                 assert_ok!(tx2.send(()));
407             });
408 
409             assert_ok!(rx2.await);
410         });
411     }
412 
413     #[test]
414     fn spawn_from_other_thread_idle() {
415         let mut rt = rt();
416         let handle = rt.handle().clone();
417 
418         let (tx, rx) = oneshot::channel();
419 
420         thread::spawn(move || {
421             thread::sleep(Duration::from_millis(50));
422 
423             handle.spawn(async move {
424                 assert_ok!(tx.send(()));
425             });
426         });
427 
428         rt.block_on(async move {
429             assert_ok!(rx.await);
430         });
431     }
432 
433     #[test]
434     fn spawn_from_other_thread_under_load() {
435         let mut rt = rt();
436         let handle = rt.handle().clone();
437 
438         let (tx, rx) = oneshot::channel();
439 
440         thread::spawn(move || {
441             handle.spawn(async move {
442                 assert_ok!(tx.send(()));
443             });
444         });
445 
446         rt.block_on(async move {
447             // Spin hard
448             tokio::spawn(async {
449                 loop {
450                     yield_once().await;
451                 }
452             });
453 
454             assert_ok!(rx.await);
455         });
456     }
457 
458     #[test]
459     fn delay_at_root() {
460         let mut rt = rt();
461 
462         let now = Instant::now();
463         let dur = Duration::from_millis(50);
464 
465         rt.block_on(async move {
466             time::delay_for(dur).await;
467         });
468 
469         assert!(now.elapsed() >= dur);
470     }
471 
472     #[test]
473     fn delay_in_spawn() {
474         let mut rt = rt();
475 
476         let now = Instant::now();
477         let dur = Duration::from_millis(50);
478 
479         rt.block_on(async move {
480             let (tx, rx) = oneshot::channel();
481 
482             tokio::spawn(async move {
483                 time::delay_for(dur).await;
484                 assert_ok!(tx.send(()));
485             });
486 
487             assert_ok!(rx.await);
488         });
489 
490         assert!(now.elapsed() >= dur);
491     }
492 
493     #[test]
494     fn block_on_socket() {
495         let mut rt = rt();
496 
497         rt.block_on(async move {
498             let (tx, rx) = oneshot::channel();
499 
500             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
501             let addr = listener.local_addr().unwrap();
502 
503             tokio::spawn(async move {
504                 let _ = listener.accept().await;
505                 tx.send(()).unwrap();
506             });
507 
508             TcpStream::connect(&addr).await.unwrap();
509             rx.await.unwrap();
510         });
511     }
512 
513     #[test]
514     fn spawn_from_blocking() {
515         let mut rt = rt();
516 
517         let out = rt.block_on(async move {
518             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
519                 tokio::spawn(async move { "hello" })
520             }).await);
521 
522             assert_ok!(inner.await)
523         });
524 
525         assert_eq!(out, "hello")
526     }
527 
528     #[test]
529     fn spawn_blocking_from_blocking() {
530         let mut rt = rt();
531 
532         let out = rt.block_on(async move {
533             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
534                 tokio::task::spawn_blocking(|| "hello")
535             }).await);
536 
537             assert_ok!(inner.await)
538         });
539 
540         assert_eq!(out, "hello")
541     }
542 
543     #[test]
544     fn delay_from_blocking() {
545         let mut rt = rt();
546 
547         rt.block_on(async move {
548             assert_ok!(tokio::task::spawn_blocking(|| {
549                 let now = std::time::Instant::now();
550                 let dur = Duration::from_millis(1);
551 
552                 // use the futures' block_on fn to make sure we aren't setting
553                 // any Tokio context
554                 futures::executor::block_on(async {
555                     tokio::time::delay_for(dur).await;
556                 });
557 
558                 assert!(now.elapsed() >= dur);
559             }).await);
560         });
561     }
562 
563     #[test]
564     fn socket_from_blocking() {
565         let mut rt = rt();
566 
567         rt.block_on(async move {
568             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
569             let addr = assert_ok!(listener.local_addr());
570 
571             let peer = tokio::task::spawn_blocking(move || {
572                 // use the futures' block_on fn to make sure we aren't setting
573                 // any Tokio context
574                 futures::executor::block_on(async {
575                     assert_ok!(TcpStream::connect(addr).await);
576                 });
577             });
578 
579             // Wait for the client to connect
580             let _ = assert_ok!(listener.accept().await);
581 
582             assert_ok!(peer.await);
583         });
584     }
585 
586     #[test]
587     fn spawn_blocking_after_shutdown() {
588         let rt = rt();
589         let handle = rt.handle().clone();
590 
591         // Shutdown
592         drop(rt);
593 
594         handle.enter(|| {
595             let res = task::spawn_blocking(|| unreachable!());
596 
597             // Avoid using a tokio runtime
598             let out = futures::executor::block_on(res);
599             assert!(out.is_err());
600         });
601     }
602 
603     #[test]
604     fn io_driver_called_when_under_load() {
605         let mut rt = rt();
606 
607         // Create a lot of constant load. The scheduler will always be busy.
608         for _ in 0..100 {
609             rt.spawn(async {
610                 loop {
611                     tokio::task::yield_now().await;
612                 }
613             });
614         }
615 
616         // Do some I/O work
617         rt.block_on(async {
618             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
619             let addr = assert_ok!(listener.local_addr());
620 
621             let srv = tokio::spawn(async move {
622                 let (mut stream, _) = assert_ok!(listener.accept().await);
623                 assert_ok!(stream.write_all(b"hello world").await);
624             });
625 
626             let cli = tokio::spawn(async move {
627                 let mut stream = assert_ok!(TcpStream::connect(addr).await);
628                 let mut dst = vec![0; 11];
629 
630                 assert_ok!(stream.read_exact(&mut dst).await);
631                 assert_eq!(dst, b"hello world");
632             });
633 
634             assert_ok!(srv.await);
635             assert_ok!(cli.await);
636         });
637     }
638 
639     #[test]
640     fn client_server_block_on() {
641         let mut rt = rt();
642         let (tx, rx) = mpsc::channel();
643 
644         rt.block_on(async move { client_server(tx).await });
645 
646         assert_ok!(rx.try_recv());
647         assert_err!(rx.try_recv());
648     }
649 
650     #[test]
651     fn panic_in_task() {
652         let mut rt = rt();
653         let (tx, rx) = oneshot::channel();
654 
655         struct Boom(Option<oneshot::Sender<()>>);
656 
657         impl Future for Boom {
658             type Output = ();
659 
660             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
661                 panic!();
662             }
663         }
664 
665         impl Drop for Boom {
666             fn drop(&mut self) {
667                 assert!(std::thread::panicking());
668                 self.0.take().unwrap().send(()).unwrap();
669             }
670         }
671 
672         rt.spawn(Boom(Some(tx)));
673         assert_ok!(rt.block_on(rx));
674     }
675 
676     #[test]
677     #[should_panic]
678     fn panic_in_block_on() {
679         let mut rt = rt();
680         rt.block_on(async { panic!() });
681     }
682 
683     async fn yield_once() {
684         let mut yielded = false;
685         poll_fn(|cx| {
686             if yielded {
687                 Poll::Ready(())
688             } else {
689                 yielded = true;
690                 cx.waker().wake_by_ref();
691                 Poll::Pending
692             }
693         })
694         .await
695     }
696 
697     #[test]
698     fn enter_and_spawn() {
699         let mut rt = rt();
700         let handle = rt.enter(|| {
701             tokio::spawn(async {})
702         });
703 
704         assert_ok!(rt.block_on(handle));
705     }
706 
707     #[test]
708     fn eagerly_drops_futures_on_shutdown() {
709         use std::sync::mpsc;
710 
711         struct Never {
712             drop_tx: mpsc::Sender<()>,
713         }
714 
715         impl Future for Never {
716             type Output = ();
717 
718             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
719                 Poll::Pending
720             }
721         }
722 
723         impl Drop for Never {
724             fn drop(&mut self) {
725                 self.drop_tx.send(()).unwrap();
726             }
727         }
728 
729         let mut rt = rt();
730 
731         let (drop_tx, drop_rx) = mpsc::channel();
732         let (run_tx, run_rx) = oneshot::channel();
733 
734         rt.block_on(async move {
735             tokio::spawn(async move {
736                 assert_ok!(run_tx.send(()));
737 
738                 Never { drop_tx }.await
739             });
740 
741             assert_ok!(run_rx.await);
742         });
743 
744         drop(rt);
745 
746         assert_ok!(drop_rx.recv());
747     }
748 
749     #[test]
750     fn wake_while_rt_is_dropping() {
751         use tokio::task;
752 
753         struct OnDrop<F: FnMut()>(F);
754 
755         impl<F: FnMut()> Drop for OnDrop<F> {
756             fn drop(&mut self) {
757                 (self.0)()
758             }
759         }
760 
761         let (tx1, rx1) = oneshot::channel();
762         let (tx2, rx2) = oneshot::channel();
763         let (tx3, rx3) = oneshot::channel();
764 
765         let mut rt = rt();
766 
767         let h1 = rt.handle().clone();
768 
769         rt.handle().spawn(async move {
770             // Ensure a waker gets stored in oneshot 1.
771             let _ = rx1.await;
772             tx3.send(()).unwrap();
773         });
774 
775         rt.handle().spawn(async move {
776             // When this task is dropped, we'll be "closing remotes".
777             // We spawn a new task that owns the `tx1`, to move its Drop
778             // out of here.
779             //
780             // Importantly, the oneshot 1 has a waker already stored, so
781             // the eventual drop here will try to re-schedule again.
782             let mut opt_tx1 = Some(tx1);
783             let _d = OnDrop(move || {
784                 let tx1 = opt_tx1.take().unwrap();
785                 h1.spawn(async move {
786                     tx1.send(()).unwrap();
787                 });
788             });
789             let _ = rx2.await;
790         });
791 
792         rt.handle().spawn(async move {
793             let _ = rx3.await;
794             // We'll never get here, but once task 3 drops, this will
795             // force task 2 to re-schedule since it's waiting on oneshot 2.
796             tx2.send(()).unwrap();
797         });
798 
799         // Tick the loop
800         rt.block_on(async {
801             task::yield_now().await;
802         });
803 
804         // Drop the rt
805         drop(rt);
806     }
807 
808     #[test]
809     fn io_notify_while_shutting_down() {
810         use std::net::Ipv6Addr;
811 
812         for _ in 1..10 {
813             let mut runtime = rt();
814 
815             runtime.block_on(async {
816                 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
817                 let addr = socket.local_addr().unwrap();
818                 let (mut recv_half, mut send_half) = socket.split();
819 
820                 tokio::spawn(async move {
821                     let mut buf = [0];
822                     loop {
823                         recv_half.recv_from(&mut buf).await.unwrap();
824                         std::thread::sleep(Duration::from_millis(2));
825                     }
826                 });
827 
828                 tokio::spawn(async move {
829                     let buf = [0];
830                     loop {
831                         send_half.send_to(&buf, &addr).await.unwrap();
832                         tokio::time::delay_for(Duration::from_millis(1)).await;
833                     }
834                 });
835 
836                 tokio::time::delay_for(Duration::from_millis(5)).await;
837             });
838         }
839     }
840 
841     #[test]
842     fn shutdown_timeout() {
843         let (tx, rx) = oneshot::channel();
844         let mut runtime = rt();
845 
846         runtime.block_on(async move {
847             task::spawn_blocking(move || {
848                 tx.send(()).unwrap();
849                 thread::sleep(Duration::from_secs(10_000));
850             });
851 
852             rx.await.unwrap();
853         });
854 
855         runtime.shutdown_timeout(Duration::from_millis(100));
856     }
857 
858     #[test]
859     fn runtime_in_thread_local() {
860         use std::cell::RefCell;
861         use std::thread;
862 
863         thread_local!(
864             static R: RefCell<Option<Runtime>> = RefCell::new(None);
865         );
866 
867         thread::spawn(|| {
868             R.with(|cell| {
869                 *cell.borrow_mut() = Some(rt());
870             });
871 
872             let _rt = rt();
873         }).join().unwrap();
874     }
875 
876     async fn client_server(tx: mpsc::Sender<()>) {
877         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
878 
879         // Get the assigned address
880         let addr = assert_ok!(server.local_addr());
881 
882         // Spawn the server
883         tokio::spawn(async move {
884             // Accept a socket
885             let (mut socket, _) = server.accept().await.unwrap();
886 
887             // Write some data
888             socket.write_all(b"hello").await.unwrap();
889         });
890 
891         let mut client = TcpStream::connect(&addr).await.unwrap();
892 
893         let mut buf = vec![];
894         client.read_to_end(&mut buf).await.unwrap();
895 
896         assert_eq!(buf, b"hello");
897         tx.send(()).unwrap();
898     }
899 
900     #[test]
901     fn local_set_block_on_socket() {
902         let mut rt = rt();
903         let local = task::LocalSet::new();
904 
905         local.block_on(&mut rt, async move {
906             let (tx, rx) = oneshot::channel();
907 
908             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
909             let addr = listener.local_addr().unwrap();
910 
911             task::spawn_local(async move {
912                 let _ = listener.accept().await;
913                 tx.send(()).unwrap();
914             });
915 
916             TcpStream::connect(&addr).await.unwrap();
917             rx.await.unwrap();
918         });
919     }
920 
921     #[test]
922     fn local_set_client_server_block_on() {
923         let mut rt = rt();
924         let (tx, rx) = mpsc::channel();
925 
926         let local = task::LocalSet::new();
927 
928         local.block_on(&mut rt, async move { client_server_local(tx).await });
929 
930         assert_ok!(rx.try_recv());
931         assert_err!(rx.try_recv());
932     }
933 
934     async fn client_server_local(tx: mpsc::Sender<()>) {
935         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
936 
937         // Get the assigned address
938         let addr = assert_ok!(server.local_addr());
939 
940         // Spawn the server
941         task::spawn_local(async move {
942             // Accept a socket
943             let (mut socket, _) = server.accept().await.unwrap();
944 
945             // Write some data
946             socket.write_all(b"hello").await.unwrap();
947         });
948 
949         let mut client = TcpStream::connect(&addr).await.unwrap();
950 
951         let mut buf = vec![];
952         client.read_to_end(&mut buf).await.unwrap();
953 
954         assert_eq!(buf, b"hello");
955         tx.send(()).unwrap();
956     }
957 
958     #[test]
959     fn coop() {
960         use std::task::Poll::Ready;
961 
962         let mut rt = rt();
963 
964         rt.block_on(async {
965             // Create a bunch of tasks
966             let mut tasks = (0..1_000).map(|_| {
967                 tokio::spawn(async { })
968             }).collect::<Vec<_>>();
969 
970             // Hope that all the tasks complete...
971             time::delay_for(Duration::from_millis(100)).await;
972 
973             poll_fn(|cx| {
974                 // At least one task should not be ready
975                 for task in &mut tasks {
976                     if Pin::new(task).poll(cx).is_pending() {
977                         return Ready(());
978                     }
979                 }
980 
981                 panic!("did not yield");
982             }).await;
983         });
984     }
985 
986     // Tests that the "next task" scheduler optimization is not able to starve
987     // other tasks.
988     #[test]
989     fn ping_pong_saturation() {
990         use tokio::sync::mpsc;
991 
992         const NUM: usize = 100;
993 
994         let mut rt = rt();
995 
996         rt.block_on(async {
997             let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
998 
999             // Spawn a bunch of tasks that ping ping between each other to
1000             // saturate the runtime.
1001             for _ in 0..NUM {
1002                 let (tx1, mut rx1) = mpsc::unbounded_channel();
1003                 let (tx2, mut rx2) = mpsc::unbounded_channel();
1004                 let spawned_tx = spawned_tx.clone();
1005 
1006                 task::spawn(async move {
1007                     spawned_tx.send(()).unwrap();
1008 
1009                     tx1.send(()).unwrap();
1010 
1011                     loop {
1012                         rx2.recv().await.unwrap();
1013                         tx1.send(()).unwrap();
1014                     }
1015                 });
1016 
1017                 task::spawn(async move {
1018                     loop {
1019                         rx1.recv().await.unwrap();
1020                         tx2.send(()).unwrap();
1021                     }
1022                 });
1023             }
1024 
1025             for _ in 0..NUM {
1026                 spawned_rx.recv().await.unwrap();
1027             }
1028 
1029             // spawn another task and wait for it to complete
1030             let handle = task::spawn(async {
1031                 for _ in 0..5 {
1032                     // Yielding forces it back into the local queue.
1033                     task::yield_now().await;
1034                 }
1035             });
1036             handle.await.unwrap();
1037         });
1038     }
1039 }
1040