1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 // Tests to run on both current-thread & therad-pool runtime variants.
6 
7 macro_rules! rt_test {
8     ($($t:tt)*) => {
9         mod basic_scheduler {
10             $($t)*
11 
12             fn rt() -> Runtime {
13                 tokio::runtime::Builder::new()
14                     .basic_scheduler()
15                     .enable_all()
16                     .build()
17                     .unwrap()
18             }
19         }
20 
21         mod threaded_scheduler_4_threads {
22             $($t)*
23 
24             fn rt() -> Runtime {
25                 tokio::runtime::Builder::new()
26                     .threaded_scheduler()
27                     .core_threads(4)
28                     .enable_all()
29                     .build()
30                     .unwrap()
31             }
32         }
33 
34         mod threaded_scheduler_1_thread {
35             $($t)*
36 
37             fn rt() -> Runtime {
38                 tokio::runtime::Builder::new()
39                     .threaded_scheduler()
40                     .core_threads(1)
41                     .enable_all()
42                     .build()
43                     .unwrap()
44             }
45         }
46     }
47 }
48 
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51     use tokio::runtime::Runtime;
52     fn is_send<T: Send + Sync>() {}
53 
54     is_send::<Runtime>();
55 }
56 
57 rt_test! {
58     use tokio::net::{TcpListener, TcpStream, UdpSocket};
59     use tokio::prelude::*;
60     use tokio::runtime::Runtime;
61     use tokio::sync::oneshot;
62     use tokio::{task, time};
63     use tokio_test::{assert_err, assert_ok};
64 
65     use futures::future::poll_fn;
66     use std::future::Future;
67     use std::pin::Pin;
68     use std::sync::{mpsc, Arc};
69     use std::task::{Context, Poll};
70     use std::thread;
71     use std::time::{Duration, Instant};
72 
73     #[test]
74     fn block_on_sync() {
75         let mut rt = rt();
76 
77         let mut win = false;
78         rt.block_on(async {
79             win = true;
80         });
81 
82         assert!(win);
83     }
84 
85     #[test]
86     fn block_on_async() {
87         let mut rt = rt();
88 
89         let out = rt.block_on(async {
90             let (tx, rx) = oneshot::channel();
91 
92             thread::spawn(move || {
93                 thread::sleep(Duration::from_millis(50));
94                 tx.send("ZOMG").unwrap();
95             });
96 
97             assert_ok!(rx.await)
98         });
99 
100         assert_eq!(out, "ZOMG");
101     }
102 
103     #[test]
104     fn spawn_one_bg() {
105         let mut rt = rt();
106 
107         let out = rt.block_on(async {
108             let (tx, rx) = oneshot::channel();
109 
110             tokio::spawn(async move {
111                 tx.send("ZOMG").unwrap();
112             });
113 
114             assert_ok!(rx.await)
115         });
116 
117         assert_eq!(out, "ZOMG");
118     }
119 
120     #[test]
121     fn spawn_one_join() {
122         let mut rt = rt();
123 
124         let out = rt.block_on(async {
125             let (tx, rx) = oneshot::channel();
126 
127             let handle = tokio::spawn(async move {
128                 tx.send("ZOMG").unwrap();
129                 "DONE"
130             });
131 
132             let msg = assert_ok!(rx.await);
133 
134             let out = assert_ok!(handle.await);
135             assert_eq!(out, "DONE");
136 
137             msg
138         });
139 
140         assert_eq!(out, "ZOMG");
141     }
142 
143     #[test]
144     fn spawn_two() {
145         let mut rt = rt();
146 
147         let out = rt.block_on(async {
148             let (tx1, rx1) = oneshot::channel();
149             let (tx2, rx2) = oneshot::channel();
150 
151             tokio::spawn(async move {
152                 assert_ok!(tx1.send("ZOMG"));
153             });
154 
155             tokio::spawn(async move {
156                 let msg = assert_ok!(rx1.await);
157                 assert_ok!(tx2.send(msg));
158             });
159 
160             assert_ok!(rx2.await)
161         });
162 
163         assert_eq!(out, "ZOMG");
164     }
165 
166     #[test]
167     fn spawn_many_from_block_on() {
168         use tokio::sync::mpsc;
169 
170         const ITER: usize = 200;
171 
172         let mut rt = rt();
173 
174         let out = rt.block_on(async {
175             let (done_tx, mut done_rx) = mpsc::unbounded_channel();
176 
177             let mut txs = (0..ITER)
178                 .map(|i| {
179                     let (tx, rx) = oneshot::channel();
180                     let done_tx = done_tx.clone();
181 
182                     tokio::spawn(async move {
183                         let msg = assert_ok!(rx.await);
184                         assert_eq!(i, msg);
185                         assert_ok!(done_tx.send(msg));
186                     });
187 
188                     tx
189                 })
190                 .collect::<Vec<_>>();
191 
192             drop(done_tx);
193 
194             thread::spawn(move || {
195                 for (i, tx) in txs.drain(..).enumerate() {
196                     assert_ok!(tx.send(i));
197                 }
198             });
199 
200             let mut out = vec![];
201             while let Some(i) = done_rx.recv().await {
202                 out.push(i);
203             }
204 
205             out.sort();
206             out
207         });
208 
209         assert_eq!(ITER, out.len());
210 
211         for i in 0..ITER {
212             assert_eq!(i, out[i]);
213         }
214     }
215 
216     #[test]
217     fn spawn_many_from_task() {
218         use tokio::sync::mpsc;
219 
220         const ITER: usize = 500;
221 
222         let mut rt = rt();
223 
224         let out = rt.block_on(async {
225             tokio::spawn(async move {
226                 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
227 
228                 /*
229                 for _ in 0..100 {
230                     tokio::spawn(async move { });
231                 }
232 
233                 tokio::task::yield_now().await;
234                 */
235 
236                 let mut txs = (0..ITER)
237                     .map(|i| {
238                         let (tx, rx) = oneshot::channel();
239                         let done_tx = done_tx.clone();
240 
241                         tokio::spawn(async move {
242                             let msg = assert_ok!(rx.await);
243                             assert_eq!(i, msg);
244                             assert_ok!(done_tx.send(msg));
245                         });
246 
247                         tx
248                     })
249                     .collect::<Vec<_>>();
250 
251                 drop(done_tx);
252 
253                 thread::spawn(move || {
254                     for (i, tx) in txs.drain(..).enumerate() {
255                         assert_ok!(tx.send(i));
256                     }
257                 });
258 
259                 let mut out = vec![];
260                 while let Some(i) = done_rx.recv().await {
261                     out.push(i);
262                 }
263 
264                 out.sort();
265                 out
266             }).await.unwrap()
267         });
268 
269         assert_eq!(ITER, out.len());
270 
271         for i in 0..ITER {
272             assert_eq!(i, out[i]);
273         }
274     }
275 
276     #[test]
277     fn spawn_await_chain() {
278         let mut rt = rt();
279 
280         let out = rt.block_on(async {
281             assert_ok!(tokio::spawn(async {
282                 assert_ok!(tokio::spawn(async {
283                     "hello"
284                 }).await)
285             }).await)
286         });
287 
288         assert_eq!(out, "hello");
289     }
290 
291     #[test]
292     fn outstanding_tasks_dropped() {
293         let mut rt = rt();
294 
295         let cnt = Arc::new(());
296 
297         rt.block_on(async {
298             let cnt = cnt.clone();
299 
300             tokio::spawn(poll_fn(move |_| {
301                 assert_eq!(2, Arc::strong_count(&cnt));
302                 Poll::<()>::Pending
303             }));
304         });
305 
306         assert_eq!(2, Arc::strong_count(&cnt));
307 
308         drop(rt);
309 
310         assert_eq!(1, Arc::strong_count(&cnt));
311     }
312 
313     #[test]
314     #[should_panic]
315     fn nested_rt() {
316         let mut rt1 = rt();
317         let mut rt2 = rt();
318 
319         rt1.block_on(async { rt2.block_on(async { "hello" }) });
320     }
321 
322     #[test]
323     fn create_rt_in_block_on() {
324         let mut rt1 = rt();
325         let mut rt2 = rt1.block_on(async { rt() });
326         let out = rt2.block_on(async { "ZOMG" });
327 
328         assert_eq!(out, "ZOMG");
329     }
330 
331     #[test]
332     fn complete_block_on_under_load() {
333         let mut rt = rt();
334 
335         rt.block_on(async {
336             let (tx, rx) = oneshot::channel();
337 
338             // Spin hard
339             tokio::spawn(async {
340                 loop {
341                     yield_once().await;
342                 }
343             });
344 
345             thread::spawn(move || {
346                 thread::sleep(Duration::from_millis(50));
347                 assert_ok!(tx.send(()));
348             });
349 
350             assert_ok!(rx.await);
351         });
352     }
353 
354     #[test]
355     fn complete_task_under_load() {
356         let mut rt = rt();
357 
358         rt.block_on(async {
359             let (tx1, rx1) = oneshot::channel();
360             let (tx2, rx2) = oneshot::channel();
361 
362             // Spin hard
363             tokio::spawn(async {
364                 loop {
365                     yield_once().await;
366                 }
367             });
368 
369             thread::spawn(move || {
370                 thread::sleep(Duration::from_millis(50));
371                 assert_ok!(tx1.send(()));
372             });
373 
374             tokio::spawn(async move {
375                 assert_ok!(rx1.await);
376                 assert_ok!(tx2.send(()));
377             });
378 
379             assert_ok!(rx2.await);
380         });
381     }
382 
383     #[test]
384     fn spawn_from_other_thread_idle() {
385         let mut rt = rt();
386         let handle = rt.handle().clone();
387 
388         let (tx, rx) = oneshot::channel();
389 
390         thread::spawn(move || {
391             thread::sleep(Duration::from_millis(50));
392 
393             handle.spawn(async move {
394                 assert_ok!(tx.send(()));
395             });
396         });
397 
398         rt.block_on(async move {
399             assert_ok!(rx.await);
400         });
401     }
402 
403     #[test]
404     fn spawn_from_other_thread_under_load() {
405         let mut rt = rt();
406         let handle = rt.handle().clone();
407 
408         let (tx, rx) = oneshot::channel();
409 
410         thread::spawn(move || {
411             handle.spawn(async move {
412                 assert_ok!(tx.send(()));
413             });
414         });
415 
416         rt.block_on(async move {
417             // Spin hard
418             tokio::spawn(async {
419                 loop {
420                     yield_once().await;
421                 }
422             });
423 
424             assert_ok!(rx.await);
425         });
426     }
427 
428     #[test]
429     fn delay_at_root() {
430         let mut rt = rt();
431 
432         let now = Instant::now();
433         let dur = Duration::from_millis(50);
434 
435         rt.block_on(async move {
436             time::delay_for(dur).await;
437         });
438 
439         assert!(now.elapsed() >= dur);
440     }
441 
442     #[test]
443     fn delay_in_spawn() {
444         let mut rt = rt();
445 
446         let now = Instant::now();
447         let dur = Duration::from_millis(50);
448 
449         rt.block_on(async move {
450             let (tx, rx) = oneshot::channel();
451 
452             tokio::spawn(async move {
453                 time::delay_for(dur).await;
454                 assert_ok!(tx.send(()));
455             });
456 
457             assert_ok!(rx.await);
458         });
459 
460         assert!(now.elapsed() >= dur);
461     }
462 
463     #[test]
464     fn block_on_socket() {
465         let mut rt = rt();
466 
467         rt.block_on(async move {
468             let (tx, rx) = oneshot::channel();
469 
470             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
471             let addr = listener.local_addr().unwrap();
472 
473             tokio::spawn(async move {
474                 let _ = listener.accept().await;
475                 tx.send(()).unwrap();
476             });
477 
478             TcpStream::connect(&addr).await.unwrap();
479             rx.await.unwrap();
480         });
481     }
482 
483     #[test]
484     fn spawn_from_blocking() {
485         let mut rt = rt();
486 
487         let out = rt.block_on(async move {
488             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
489                 tokio::spawn(async move { "hello" })
490             }).await);
491 
492             assert_ok!(inner.await)
493         });
494 
495         assert_eq!(out, "hello")
496     }
497 
498     #[test]
499     fn spawn_blocking_from_blocking() {
500         let mut rt = rt();
501 
502         let out = rt.block_on(async move {
503             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
504                 tokio::task::spawn_blocking(|| "hello")
505             }).await);
506 
507             assert_ok!(inner.await)
508         });
509 
510         assert_eq!(out, "hello")
511     }
512 
513     #[test]
514     fn delay_from_blocking() {
515         let mut rt = rt();
516 
517         rt.block_on(async move {
518             assert_ok!(tokio::task::spawn_blocking(|| {
519                 let now = std::time::Instant::now();
520                 let dur = Duration::from_millis(1);
521 
522                 // use the futures' block_on fn to make sure we aren't setting
523                 // any Tokio context
524                 futures::executor::block_on(async {
525                     tokio::time::delay_for(dur).await;
526                 });
527 
528                 assert!(now.elapsed() >= dur);
529             }).await);
530         });
531     }
532 
533     #[test]
534     fn socket_from_blocking() {
535         let mut rt = rt();
536 
537         rt.block_on(async move {
538             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
539             let addr = assert_ok!(listener.local_addr());
540 
541             let peer = tokio::task::spawn_blocking(move || {
542                 // use the futures' block_on fn to make sure we aren't setting
543                 // any Tokio context
544                 futures::executor::block_on(async {
545                     assert_ok!(TcpStream::connect(addr).await);
546                 });
547             });
548 
549             // Wait for the client to connect
550             let _ = assert_ok!(listener.accept().await);
551 
552             assert_ok!(peer.await);
553         });
554     }
555 
556     #[test]
557     fn spawn_blocking_after_shutdown() {
558         let rt = rt();
559         let handle = rt.handle().clone();
560 
561         // Shutdown
562         drop(rt);
563 
564         handle.enter(|| {
565             let res = task::spawn_blocking(|| unreachable!());
566 
567             // Avoid using a tokio runtime
568             let out = futures::executor::block_on(res);
569             assert!(out.is_err());
570         });
571     }
572 
573     #[test]
574     fn io_driver_called_when_under_load() {
575         let mut rt = rt();
576 
577         // Create a lot of constant load. The scheduler will always be busy.
578         for _ in 0..100 {
579             rt.spawn(async {
580                 loop {
581                     tokio::task::yield_now().await;
582                 }
583             });
584         }
585 
586         // Do some I/O work
587         rt.block_on(async {
588             let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
589             let addr = assert_ok!(listener.local_addr());
590 
591             let srv = tokio::spawn(async move {
592                 let (mut stream, _) = assert_ok!(listener.accept().await);
593                 assert_ok!(stream.write_all(b"hello world").await);
594             });
595 
596             let cli = tokio::spawn(async move {
597                 let mut stream = assert_ok!(TcpStream::connect(addr).await);
598                 let mut dst = vec![0; 11];
599 
600                 assert_ok!(stream.read_exact(&mut dst).await);
601                 assert_eq!(dst, b"hello world");
602             });
603 
604             assert_ok!(srv.await);
605             assert_ok!(cli.await);
606         });
607     }
608 
609     #[test]
610     fn client_server_block_on() {
611         let mut rt = rt();
612         let (tx, rx) = mpsc::channel();
613 
614         rt.block_on(async move { client_server(tx).await });
615 
616         assert_ok!(rx.try_recv());
617         assert_err!(rx.try_recv());
618     }
619 
620     #[test]
621     fn panic_in_task() {
622         let mut rt = rt();
623         let (tx, rx) = oneshot::channel();
624 
625         struct Boom(Option<oneshot::Sender<()>>);
626 
627         impl Future for Boom {
628             type Output = ();
629 
630             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
631                 panic!();
632             }
633         }
634 
635         impl Drop for Boom {
636             fn drop(&mut self) {
637                 assert!(std::thread::panicking());
638                 self.0.take().unwrap().send(()).unwrap();
639             }
640         }
641 
642         rt.spawn(Boom(Some(tx)));
643         assert_ok!(rt.block_on(rx));
644     }
645 
646     #[test]
647     #[should_panic]
648     fn panic_in_block_on() {
649         let mut rt = rt();
650         rt.block_on(async { panic!() });
651     }
652 
653     async fn yield_once() {
654         let mut yielded = false;
655         poll_fn(|cx| {
656             if yielded {
657                 Poll::Ready(())
658             } else {
659                 yielded = true;
660                 cx.waker().wake_by_ref();
661                 Poll::Pending
662             }
663         })
664         .await
665     }
666 
667     #[test]
668     fn enter_and_spawn() {
669         let mut rt = rt();
670         let handle = rt.enter(|| {
671             tokio::spawn(async {})
672         });
673 
674         assert_ok!(rt.block_on(handle));
675     }
676 
677     #[test]
678     fn eagerly_drops_futures_on_shutdown() {
679         use std::sync::mpsc;
680 
681         struct Never {
682             drop_tx: mpsc::Sender<()>,
683         }
684 
685         impl Future for Never {
686             type Output = ();
687 
688             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
689                 Poll::Pending
690             }
691         }
692 
693         impl Drop for Never {
694             fn drop(&mut self) {
695                 self.drop_tx.send(()).unwrap();
696             }
697         }
698 
699         let mut rt = rt();
700 
701         let (drop_tx, drop_rx) = mpsc::channel();
702         let (run_tx, run_rx) = oneshot::channel();
703 
704         rt.block_on(async move {
705             tokio::spawn(async move {
706                 assert_ok!(run_tx.send(()));
707 
708                 Never { drop_tx }.await
709             });
710 
711             assert_ok!(run_rx.await);
712         });
713 
714         drop(rt);
715 
716         assert_ok!(drop_rx.recv());
717     }
718 
719     #[test]
720     fn wake_while_rt_is_dropping() {
721         use tokio::task;
722 
723         struct OnDrop<F: FnMut()>(F);
724 
725         impl<F: FnMut()> Drop for OnDrop<F> {
726             fn drop(&mut self) {
727                 (self.0)()
728             }
729         }
730 
731         let (tx1, rx1) = oneshot::channel();
732         let (tx2, rx2) = oneshot::channel();
733         let (tx3, rx3) = oneshot::channel();
734 
735         let mut rt = rt();
736 
737         let h1 = rt.handle().clone();
738 
739         rt.handle().spawn(async move {
740             // Ensure a waker gets stored in oneshot 1.
741             let _ = rx1.await;
742             tx3.send(()).unwrap();
743         });
744 
745         rt.handle().spawn(async move {
746             // When this task is dropped, we'll be "closing remotes".
747             // We spawn a new task that owns the `tx1`, to move its Drop
748             // out of here.
749             //
750             // Importantly, the oneshot 1 has a waker already stored, so
751             // the eventual drop here will try to re-schedule again.
752             let mut opt_tx1 = Some(tx1);
753             let _d = OnDrop(move || {
754                 let tx1 = opt_tx1.take().unwrap();
755                 h1.spawn(async move {
756                     tx1.send(()).unwrap();
757                 });
758             });
759             let _ = rx2.await;
760         });
761 
762         rt.handle().spawn(async move {
763             let _ = rx3.await;
764             // We'll never get here, but once task 3 drops, this will
765             // force task 2 to re-schedule since it's waiting on oneshot 2.
766             tx2.send(()).unwrap();
767         });
768 
769         // Tick the loop
770         rt.block_on(async {
771             task::yield_now().await;
772         });
773 
774         // Drop the rt
775         drop(rt);
776     }
777 
778     #[test]
779     fn io_notify_while_shutting_down() {
780         use std::net::Ipv6Addr;
781 
782         for _ in 1..10 {
783             let mut runtime = rt();
784 
785             runtime.block_on(async {
786                 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
787                 let addr = socket.local_addr().unwrap();
788                 let (mut recv_half, mut send_half) = socket.split();
789 
790                 tokio::spawn(async move {
791                     let mut buf = [0];
792                     loop {
793                         recv_half.recv_from(&mut buf).await.unwrap();
794                         std::thread::sleep(Duration::from_millis(2));
795                     }
796                 });
797 
798                 tokio::spawn(async move {
799                     let buf = [0];
800                     loop {
801                         send_half.send_to(&buf, &addr).await.unwrap();
802                         tokio::time::delay_for(Duration::from_millis(1)).await;
803                     }
804                 });
805 
806                 tokio::time::delay_for(Duration::from_millis(5)).await;
807             });
808         }
809     }
810 
811     #[test]
812     fn shutdown_timeout() {
813         let (tx, rx) = oneshot::channel();
814         let mut runtime = rt();
815 
816         runtime.block_on(async move {
817             task::spawn_blocking(move || {
818                 tx.send(()).unwrap();
819                 thread::sleep(Duration::from_secs(10_000));
820             });
821 
822             rx.await.unwrap();
823         });
824 
825         runtime.shutdown_timeout(Duration::from_millis(100));
826     }
827 
828     #[test]
829     fn runtime_in_thread_local() {
830         use std::cell::RefCell;
831         use std::thread;
832 
833         thread_local!(
834             static R: RefCell<Option<Runtime>> = RefCell::new(None);
835         );
836 
837         thread::spawn(|| {
838             R.with(|cell| {
839                 *cell.borrow_mut() = Some(rt());
840             });
841 
842             let _rt = rt();
843         }).join().unwrap();
844     }
845 
846     async fn client_server(tx: mpsc::Sender<()>) {
847         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
848 
849         // Get the assigned address
850         let addr = assert_ok!(server.local_addr());
851 
852         // Spawn the server
853         tokio::spawn(async move {
854             // Accept a socket
855             let (mut socket, _) = server.accept().await.unwrap();
856 
857             // Write some data
858             socket.write_all(b"hello").await.unwrap();
859         });
860 
861         let mut client = TcpStream::connect(&addr).await.unwrap();
862 
863         let mut buf = vec![];
864         client.read_to_end(&mut buf).await.unwrap();
865 
866         assert_eq!(buf, b"hello");
867         tx.send(()).unwrap();
868     }
869 
870     #[test]
871     fn local_set_block_on_socket() {
872         let mut rt = rt();
873         let local = task::LocalSet::new();
874 
875         local.block_on(&mut rt, async move {
876             let (tx, rx) = oneshot::channel();
877 
878             let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
879             let addr = listener.local_addr().unwrap();
880 
881             task::spawn_local(async move {
882                 let _ = listener.accept().await;
883                 tx.send(()).unwrap();
884             });
885 
886             TcpStream::connect(&addr).await.unwrap();
887             rx.await.unwrap();
888         });
889     }
890 
891     #[test]
892     fn local_set_client_server_block_on() {
893         let mut rt = rt();
894         let (tx, rx) = mpsc::channel();
895 
896         let local = task::LocalSet::new();
897 
898         local.block_on(&mut rt, async move { client_server_local(tx).await });
899 
900         assert_ok!(rx.try_recv());
901         assert_err!(rx.try_recv());
902     }
903 
904     async fn client_server_local(tx: mpsc::Sender<()>) {
905         let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
906 
907         // Get the assigned address
908         let addr = assert_ok!(server.local_addr());
909 
910         // Spawn the server
911         task::spawn_local(async move {
912             // Accept a socket
913             let (mut socket, _) = server.accept().await.unwrap();
914 
915             // Write some data
916             socket.write_all(b"hello").await.unwrap();
917         });
918 
919         let mut client = TcpStream::connect(&addr).await.unwrap();
920 
921         let mut buf = vec![];
922         client.read_to_end(&mut buf).await.unwrap();
923 
924         assert_eq!(buf, b"hello");
925         tx.send(()).unwrap();
926     }
927 
928     #[test]
929     fn coop() {
930         use std::task::Poll::Ready;
931 
932         let mut rt = rt();
933 
934         rt.block_on(async {
935             // Create a bunch of tasks
936             let mut tasks = (0..1_000).map(|_| {
937                 tokio::spawn(async { })
938             }).collect::<Vec<_>>();
939 
940             // Hope that all the tasks complete...
941             time::delay_for(Duration::from_millis(100)).await;
942 
943             poll_fn(|cx| {
944                 // At least one task should not be ready
945                 for task in &mut tasks {
946                     if Pin::new(task).poll(cx).is_pending() {
947                         return Ready(());
948                     }
949                 }
950 
951                 panic!("did not yield");
952             }).await;
953         });
954     }
955 
956     // Tests that the "next task" scheduler optimization is not able to starve
957     // other tasks.
958     #[test]
959     fn ping_pong_saturation() {
960         use tokio::sync::mpsc;
961 
962         const NUM: usize = 100;
963 
964         let mut rt = rt();
965 
966         rt.block_on(async {
967             let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
968 
969             // Spawn a bunch of tasks that ping ping between each other to
970             // saturate the runtime.
971             for _ in 0..NUM {
972                 let (tx1, mut rx1) = mpsc::unbounded_channel();
973                 let (tx2, mut rx2) = mpsc::unbounded_channel();
974                 let spawned_tx = spawned_tx.clone();
975 
976                 task::spawn(async move {
977                     spawned_tx.send(()).unwrap();
978 
979                     tx1.send(()).unwrap();
980 
981                     loop {
982                         rx2.recv().await.unwrap();
983                         tx1.send(()).unwrap();
984                     }
985                 });
986 
987                 task::spawn(async move {
988                     loop {
989                         rx1.recv().await.unwrap();
990                         tx2.send(()).unwrap();
991                     }
992                 });
993             }
994 
995             for _ in 0..NUM {
996                 spawned_rx.recv().await.unwrap();
997             }
998 
999             // spawn another task and wait for it to complete
1000             let handle = task::spawn(async {
1001                 for _ in 0..5 {
1002                     // Yielding forces it back into the local queue.
1003                     task::yield_now().await;
1004                 }
1005             });
1006             handle.await.unwrap();
1007         });
1008     }
1009 }
1010