1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 // Tests to run on both current-thread & therad-pool runtime variants.
6
7 macro_rules! rt_test {
8 ($($t:tt)*) => {
9 mod basic_scheduler {
10 $($t)*
11
12 fn rt() -> Runtime {
13 tokio::runtime::Builder::new()
14 .basic_scheduler()
15 .enable_all()
16 .build()
17 .unwrap()
18 }
19 }
20
21 mod threaded_scheduler_4_threads {
22 $($t)*
23
24 fn rt() -> Runtime {
25 tokio::runtime::Builder::new()
26 .threaded_scheduler()
27 .core_threads(4)
28 .enable_all()
29 .build()
30 .unwrap()
31 }
32 }
33
34 mod threaded_scheduler_1_thread {
35 $($t)*
36
37 fn rt() -> Runtime {
38 tokio::runtime::Builder::new()
39 .threaded_scheduler()
40 .core_threads(1)
41 .enable_all()
42 .build()
43 .unwrap()
44 }
45 }
46 }
47 }
48
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51 use tokio::runtime::Runtime;
52 fn is_send<T: Send + Sync>() {}
53
54 is_send::<Runtime>();
55 }
56
57 rt_test! {
58 use tokio::net::{TcpListener, TcpStream, UdpSocket};
59 use tokio::prelude::*;
60 use tokio::runtime::Runtime;
61 use tokio::sync::oneshot;
62 use tokio::{task, time};
63 use tokio_test::{assert_err, assert_ok};
64
65 use futures::future::poll_fn;
66 use std::future::Future;
67 use std::pin::Pin;
68 use std::sync::{mpsc, Arc};
69 use std::task::{Context, Poll};
70 use std::thread;
71 use std::time::{Duration, Instant};
72
73 #[test]
74 fn block_on_sync() {
75 let mut rt = rt();
76
77 let mut win = false;
78 rt.block_on(async {
79 win = true;
80 });
81
82 assert!(win);
83 }
84
85 #[test]
86 fn block_on_handle_sync() {
87 let rt = rt();
88
89 let mut win = false;
90 rt.handle().block_on(async {
91 win = true;
92 });
93
94 assert!(win);
95 }
96
97 #[test]
98 fn block_on_async() {
99 let mut rt = rt();
100
101 let out = rt.block_on(async {
102 let (tx, rx) = oneshot::channel();
103
104 thread::spawn(move || {
105 thread::sleep(Duration::from_millis(50));
106 tx.send("ZOMG").unwrap();
107 });
108
109 assert_ok!(rx.await)
110 });
111
112 assert_eq!(out, "ZOMG");
113 }
114
115 #[test]
116 fn block_on_handle_async() {
117 let rt = rt();
118
119 let out = rt.handle().block_on(async {
120 let (tx, rx) = oneshot::channel();
121
122 thread::spawn(move || {
123 thread::sleep(Duration::from_millis(50));
124 tx.send("ZOMG").unwrap();
125 });
126
127 assert_ok!(rx.await)
128 });
129
130 assert_eq!(out, "ZOMG");
131 }
132
133 #[test]
134 fn spawn_one_bg() {
135 let mut rt = rt();
136
137 let out = rt.block_on(async {
138 let (tx, rx) = oneshot::channel();
139
140 tokio::spawn(async move {
141 tx.send("ZOMG").unwrap();
142 });
143
144 assert_ok!(rx.await)
145 });
146
147 assert_eq!(out, "ZOMG");
148 }
149
150 #[test]
151 fn spawn_one_join() {
152 let mut rt = rt();
153
154 let out = rt.block_on(async {
155 let (tx, rx) = oneshot::channel();
156
157 let handle = tokio::spawn(async move {
158 tx.send("ZOMG").unwrap();
159 "DONE"
160 });
161
162 let msg = assert_ok!(rx.await);
163
164 let out = assert_ok!(handle.await);
165 assert_eq!(out, "DONE");
166
167 msg
168 });
169
170 assert_eq!(out, "ZOMG");
171 }
172
173 #[test]
174 fn spawn_two() {
175 let mut rt = rt();
176
177 let out = rt.block_on(async {
178 let (tx1, rx1) = oneshot::channel();
179 let (tx2, rx2) = oneshot::channel();
180
181 tokio::spawn(async move {
182 assert_ok!(tx1.send("ZOMG"));
183 });
184
185 tokio::spawn(async move {
186 let msg = assert_ok!(rx1.await);
187 assert_ok!(tx2.send(msg));
188 });
189
190 assert_ok!(rx2.await)
191 });
192
193 assert_eq!(out, "ZOMG");
194 }
195
196 #[test]
197 fn spawn_many_from_block_on() {
198 use tokio::sync::mpsc;
199
200 const ITER: usize = 200;
201
202 let mut rt = rt();
203
204 let out = rt.block_on(async {
205 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
206
207 let mut txs = (0..ITER)
208 .map(|i| {
209 let (tx, rx) = oneshot::channel();
210 let done_tx = done_tx.clone();
211
212 tokio::spawn(async move {
213 let msg = assert_ok!(rx.await);
214 assert_eq!(i, msg);
215 assert_ok!(done_tx.send(msg));
216 });
217
218 tx
219 })
220 .collect::<Vec<_>>();
221
222 drop(done_tx);
223
224 thread::spawn(move || {
225 for (i, tx) in txs.drain(..).enumerate() {
226 assert_ok!(tx.send(i));
227 }
228 });
229
230 let mut out = vec![];
231 while let Some(i) = done_rx.recv().await {
232 out.push(i);
233 }
234
235 out.sort();
236 out
237 });
238
239 assert_eq!(ITER, out.len());
240
241 for i in 0..ITER {
242 assert_eq!(i, out[i]);
243 }
244 }
245
246 #[test]
247 fn spawn_many_from_task() {
248 use tokio::sync::mpsc;
249
250 const ITER: usize = 500;
251
252 let mut rt = rt();
253
254 let out = rt.block_on(async {
255 tokio::spawn(async move {
256 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
257
258 /*
259 for _ in 0..100 {
260 tokio::spawn(async move { });
261 }
262
263 tokio::task::yield_now().await;
264 */
265
266 let mut txs = (0..ITER)
267 .map(|i| {
268 let (tx, rx) = oneshot::channel();
269 let done_tx = done_tx.clone();
270
271 tokio::spawn(async move {
272 let msg = assert_ok!(rx.await);
273 assert_eq!(i, msg);
274 assert_ok!(done_tx.send(msg));
275 });
276
277 tx
278 })
279 .collect::<Vec<_>>();
280
281 drop(done_tx);
282
283 thread::spawn(move || {
284 for (i, tx) in txs.drain(..).enumerate() {
285 assert_ok!(tx.send(i));
286 }
287 });
288
289 let mut out = vec![];
290 while let Some(i) = done_rx.recv().await {
291 out.push(i);
292 }
293
294 out.sort();
295 out
296 }).await.unwrap()
297 });
298
299 assert_eq!(ITER, out.len());
300
301 for i in 0..ITER {
302 assert_eq!(i, out[i]);
303 }
304 }
305
306 #[test]
307 fn spawn_await_chain() {
308 let mut rt = rt();
309
310 let out = rt.block_on(async {
311 assert_ok!(tokio::spawn(async {
312 assert_ok!(tokio::spawn(async {
313 "hello"
314 }).await)
315 }).await)
316 });
317
318 assert_eq!(out, "hello");
319 }
320
321 #[test]
322 fn outstanding_tasks_dropped() {
323 let mut rt = rt();
324
325 let cnt = Arc::new(());
326
327 rt.block_on(async {
328 let cnt = cnt.clone();
329
330 tokio::spawn(poll_fn(move |_| {
331 assert_eq!(2, Arc::strong_count(&cnt));
332 Poll::<()>::Pending
333 }));
334 });
335
336 assert_eq!(2, Arc::strong_count(&cnt));
337
338 drop(rt);
339
340 assert_eq!(1, Arc::strong_count(&cnt));
341 }
342
343 #[test]
344 #[should_panic]
345 fn nested_rt() {
346 let mut rt1 = rt();
347 let mut rt2 = rt();
348
349 rt1.block_on(async { rt2.block_on(async { "hello" }) });
350 }
351
352 #[test]
353 fn create_rt_in_block_on() {
354 let mut rt1 = rt();
355 let mut rt2 = rt1.block_on(async { rt() });
356 let out = rt2.block_on(async { "ZOMG" });
357
358 assert_eq!(out, "ZOMG");
359 }
360
361 #[test]
362 fn complete_block_on_under_load() {
363 let mut rt = rt();
364
365 rt.block_on(async {
366 let (tx, rx) = oneshot::channel();
367
368 // Spin hard
369 tokio::spawn(async {
370 loop {
371 yield_once().await;
372 }
373 });
374
375 thread::spawn(move || {
376 thread::sleep(Duration::from_millis(50));
377 assert_ok!(tx.send(()));
378 });
379
380 assert_ok!(rx.await);
381 });
382 }
383
384 #[test]
385 fn complete_task_under_load() {
386 let mut rt = rt();
387
388 rt.block_on(async {
389 let (tx1, rx1) = oneshot::channel();
390 let (tx2, rx2) = oneshot::channel();
391
392 // Spin hard
393 tokio::spawn(async {
394 loop {
395 yield_once().await;
396 }
397 });
398
399 thread::spawn(move || {
400 thread::sleep(Duration::from_millis(50));
401 assert_ok!(tx1.send(()));
402 });
403
404 tokio::spawn(async move {
405 assert_ok!(rx1.await);
406 assert_ok!(tx2.send(()));
407 });
408
409 assert_ok!(rx2.await);
410 });
411 }
412
413 #[test]
414 fn spawn_from_other_thread_idle() {
415 let mut rt = rt();
416 let handle = rt.handle().clone();
417
418 let (tx, rx) = oneshot::channel();
419
420 thread::spawn(move || {
421 thread::sleep(Duration::from_millis(50));
422
423 handle.spawn(async move {
424 assert_ok!(tx.send(()));
425 });
426 });
427
428 rt.block_on(async move {
429 assert_ok!(rx.await);
430 });
431 }
432
433 #[test]
434 fn spawn_from_other_thread_under_load() {
435 let mut rt = rt();
436 let handle = rt.handle().clone();
437
438 let (tx, rx) = oneshot::channel();
439
440 thread::spawn(move || {
441 handle.spawn(async move {
442 assert_ok!(tx.send(()));
443 });
444 });
445
446 rt.block_on(async move {
447 // Spin hard
448 tokio::spawn(async {
449 loop {
450 yield_once().await;
451 }
452 });
453
454 assert_ok!(rx.await);
455 });
456 }
457
458 #[test]
459 fn delay_at_root() {
460 let mut rt = rt();
461
462 let now = Instant::now();
463 let dur = Duration::from_millis(50);
464
465 rt.block_on(async move {
466 time::delay_for(dur).await;
467 });
468
469 assert!(now.elapsed() >= dur);
470 }
471
472 #[test]
473 fn delay_in_spawn() {
474 let mut rt = rt();
475
476 let now = Instant::now();
477 let dur = Duration::from_millis(50);
478
479 rt.block_on(async move {
480 let (tx, rx) = oneshot::channel();
481
482 tokio::spawn(async move {
483 time::delay_for(dur).await;
484 assert_ok!(tx.send(()));
485 });
486
487 assert_ok!(rx.await);
488 });
489
490 assert!(now.elapsed() >= dur);
491 }
492
493 #[test]
494 fn block_on_socket() {
495 let mut rt = rt();
496
497 rt.block_on(async move {
498 let (tx, rx) = oneshot::channel();
499
500 let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
501 let addr = listener.local_addr().unwrap();
502
503 tokio::spawn(async move {
504 let _ = listener.accept().await;
505 tx.send(()).unwrap();
506 });
507
508 TcpStream::connect(&addr).await.unwrap();
509 rx.await.unwrap();
510 });
511 }
512
513 #[test]
514 fn spawn_from_blocking() {
515 let mut rt = rt();
516
517 let out = rt.block_on(async move {
518 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
519 tokio::spawn(async move { "hello" })
520 }).await);
521
522 assert_ok!(inner.await)
523 });
524
525 assert_eq!(out, "hello")
526 }
527
528 #[test]
529 fn spawn_blocking_from_blocking() {
530 let mut rt = rt();
531
532 let out = rt.block_on(async move {
533 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
534 tokio::task::spawn_blocking(|| "hello")
535 }).await);
536
537 assert_ok!(inner.await)
538 });
539
540 assert_eq!(out, "hello")
541 }
542
543 #[test]
544 fn delay_from_blocking() {
545 let mut rt = rt();
546
547 rt.block_on(async move {
548 assert_ok!(tokio::task::spawn_blocking(|| {
549 let now = std::time::Instant::now();
550 let dur = Duration::from_millis(1);
551
552 // use the futures' block_on fn to make sure we aren't setting
553 // any Tokio context
554 futures::executor::block_on(async {
555 tokio::time::delay_for(dur).await;
556 });
557
558 assert!(now.elapsed() >= dur);
559 }).await);
560 });
561 }
562
563 #[test]
564 fn socket_from_blocking() {
565 let mut rt = rt();
566
567 rt.block_on(async move {
568 let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
569 let addr = assert_ok!(listener.local_addr());
570
571 let peer = tokio::task::spawn_blocking(move || {
572 // use the futures' block_on fn to make sure we aren't setting
573 // any Tokio context
574 futures::executor::block_on(async {
575 assert_ok!(TcpStream::connect(addr).await);
576 });
577 });
578
579 // Wait for the client to connect
580 let _ = assert_ok!(listener.accept().await);
581
582 assert_ok!(peer.await);
583 });
584 }
585
586 #[test]
587 fn spawn_blocking_after_shutdown() {
588 let rt = rt();
589 let handle = rt.handle().clone();
590
591 // Shutdown
592 drop(rt);
593
594 handle.enter(|| {
595 let res = task::spawn_blocking(|| unreachable!());
596
597 // Avoid using a tokio runtime
598 let out = futures::executor::block_on(res);
599 assert!(out.is_err());
600 });
601 }
602
603 #[test]
604 fn io_driver_called_when_under_load() {
605 let mut rt = rt();
606
607 // Create a lot of constant load. The scheduler will always be busy.
608 for _ in 0..100 {
609 rt.spawn(async {
610 loop {
611 tokio::task::yield_now().await;
612 }
613 });
614 }
615
616 // Do some I/O work
617 rt.block_on(async {
618 let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
619 let addr = assert_ok!(listener.local_addr());
620
621 let srv = tokio::spawn(async move {
622 let (mut stream, _) = assert_ok!(listener.accept().await);
623 assert_ok!(stream.write_all(b"hello world").await);
624 });
625
626 let cli = tokio::spawn(async move {
627 let mut stream = assert_ok!(TcpStream::connect(addr).await);
628 let mut dst = vec![0; 11];
629
630 assert_ok!(stream.read_exact(&mut dst).await);
631 assert_eq!(dst, b"hello world");
632 });
633
634 assert_ok!(srv.await);
635 assert_ok!(cli.await);
636 });
637 }
638
639 #[test]
640 fn client_server_block_on() {
641 let mut rt = rt();
642 let (tx, rx) = mpsc::channel();
643
644 rt.block_on(async move { client_server(tx).await });
645
646 assert_ok!(rx.try_recv());
647 assert_err!(rx.try_recv());
648 }
649
650 #[test]
651 fn panic_in_task() {
652 let mut rt = rt();
653 let (tx, rx) = oneshot::channel();
654
655 struct Boom(Option<oneshot::Sender<()>>);
656
657 impl Future for Boom {
658 type Output = ();
659
660 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
661 panic!();
662 }
663 }
664
665 impl Drop for Boom {
666 fn drop(&mut self) {
667 assert!(std::thread::panicking());
668 self.0.take().unwrap().send(()).unwrap();
669 }
670 }
671
672 rt.spawn(Boom(Some(tx)));
673 assert_ok!(rt.block_on(rx));
674 }
675
676 #[test]
677 #[should_panic]
678 fn panic_in_block_on() {
679 let mut rt = rt();
680 rt.block_on(async { panic!() });
681 }
682
683 async fn yield_once() {
684 let mut yielded = false;
685 poll_fn(|cx| {
686 if yielded {
687 Poll::Ready(())
688 } else {
689 yielded = true;
690 cx.waker().wake_by_ref();
691 Poll::Pending
692 }
693 })
694 .await
695 }
696
697 #[test]
698 fn enter_and_spawn() {
699 let mut rt = rt();
700 let handle = rt.enter(|| {
701 tokio::spawn(async {})
702 });
703
704 assert_ok!(rt.block_on(handle));
705 }
706
707 #[test]
708 fn eagerly_drops_futures_on_shutdown() {
709 use std::sync::mpsc;
710
711 struct Never {
712 drop_tx: mpsc::Sender<()>,
713 }
714
715 impl Future for Never {
716 type Output = ();
717
718 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
719 Poll::Pending
720 }
721 }
722
723 impl Drop for Never {
724 fn drop(&mut self) {
725 self.drop_tx.send(()).unwrap();
726 }
727 }
728
729 let mut rt = rt();
730
731 let (drop_tx, drop_rx) = mpsc::channel();
732 let (run_tx, run_rx) = oneshot::channel();
733
734 rt.block_on(async move {
735 tokio::spawn(async move {
736 assert_ok!(run_tx.send(()));
737
738 Never { drop_tx }.await
739 });
740
741 assert_ok!(run_rx.await);
742 });
743
744 drop(rt);
745
746 assert_ok!(drop_rx.recv());
747 }
748
749 #[test]
750 fn wake_while_rt_is_dropping() {
751 use tokio::task;
752
753 struct OnDrop<F: FnMut()>(F);
754
755 impl<F: FnMut()> Drop for OnDrop<F> {
756 fn drop(&mut self) {
757 (self.0)()
758 }
759 }
760
761 let (tx1, rx1) = oneshot::channel();
762 let (tx2, rx2) = oneshot::channel();
763 let (tx3, rx3) = oneshot::channel();
764
765 let mut rt = rt();
766
767 let h1 = rt.handle().clone();
768
769 rt.handle().spawn(async move {
770 // Ensure a waker gets stored in oneshot 1.
771 let _ = rx1.await;
772 tx3.send(()).unwrap();
773 });
774
775 rt.handle().spawn(async move {
776 // When this task is dropped, we'll be "closing remotes".
777 // We spawn a new task that owns the `tx1`, to move its Drop
778 // out of here.
779 //
780 // Importantly, the oneshot 1 has a waker already stored, so
781 // the eventual drop here will try to re-schedule again.
782 let mut opt_tx1 = Some(tx1);
783 let _d = OnDrop(move || {
784 let tx1 = opt_tx1.take().unwrap();
785 h1.spawn(async move {
786 tx1.send(()).unwrap();
787 });
788 });
789 let _ = rx2.await;
790 });
791
792 rt.handle().spawn(async move {
793 let _ = rx3.await;
794 // We'll never get here, but once task 3 drops, this will
795 // force task 2 to re-schedule since it's waiting on oneshot 2.
796 tx2.send(()).unwrap();
797 });
798
799 // Tick the loop
800 rt.block_on(async {
801 task::yield_now().await;
802 });
803
804 // Drop the rt
805 drop(rt);
806 }
807
808 #[test]
809 fn io_notify_while_shutting_down() {
810 use std::net::Ipv6Addr;
811
812 for _ in 1..10 {
813 let mut runtime = rt();
814
815 runtime.block_on(async {
816 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
817 let addr = socket.local_addr().unwrap();
818 let (mut recv_half, mut send_half) = socket.split();
819
820 tokio::spawn(async move {
821 let mut buf = [0];
822 loop {
823 recv_half.recv_from(&mut buf).await.unwrap();
824 std::thread::sleep(Duration::from_millis(2));
825 }
826 });
827
828 tokio::spawn(async move {
829 let buf = [0];
830 loop {
831 send_half.send_to(&buf, &addr).await.unwrap();
832 tokio::time::delay_for(Duration::from_millis(1)).await;
833 }
834 });
835
836 tokio::time::delay_for(Duration::from_millis(5)).await;
837 });
838 }
839 }
840
841 #[test]
842 fn shutdown_timeout() {
843 let (tx, rx) = oneshot::channel();
844 let mut runtime = rt();
845
846 runtime.block_on(async move {
847 task::spawn_blocking(move || {
848 tx.send(()).unwrap();
849 thread::sleep(Duration::from_secs(10_000));
850 });
851
852 rx.await.unwrap();
853 });
854
855 runtime.shutdown_timeout(Duration::from_millis(100));
856 }
857
858 #[test]
859 fn runtime_in_thread_local() {
860 use std::cell::RefCell;
861 use std::thread;
862
863 thread_local!(
864 static R: RefCell<Option<Runtime>> = RefCell::new(None);
865 );
866
867 thread::spawn(|| {
868 R.with(|cell| {
869 *cell.borrow_mut() = Some(rt());
870 });
871
872 let _rt = rt();
873 }).join().unwrap();
874 }
875
876 async fn client_server(tx: mpsc::Sender<()>) {
877 let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
878
879 // Get the assigned address
880 let addr = assert_ok!(server.local_addr());
881
882 // Spawn the server
883 tokio::spawn(async move {
884 // Accept a socket
885 let (mut socket, _) = server.accept().await.unwrap();
886
887 // Write some data
888 socket.write_all(b"hello").await.unwrap();
889 });
890
891 let mut client = TcpStream::connect(&addr).await.unwrap();
892
893 let mut buf = vec![];
894 client.read_to_end(&mut buf).await.unwrap();
895
896 assert_eq!(buf, b"hello");
897 tx.send(()).unwrap();
898 }
899
900 #[test]
901 fn local_set_block_on_socket() {
902 let mut rt = rt();
903 let local = task::LocalSet::new();
904
905 local.block_on(&mut rt, async move {
906 let (tx, rx) = oneshot::channel();
907
908 let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
909 let addr = listener.local_addr().unwrap();
910
911 task::spawn_local(async move {
912 let _ = listener.accept().await;
913 tx.send(()).unwrap();
914 });
915
916 TcpStream::connect(&addr).await.unwrap();
917 rx.await.unwrap();
918 });
919 }
920
921 #[test]
922 fn local_set_client_server_block_on() {
923 let mut rt = rt();
924 let (tx, rx) = mpsc::channel();
925
926 let local = task::LocalSet::new();
927
928 local.block_on(&mut rt, async move { client_server_local(tx).await });
929
930 assert_ok!(rx.try_recv());
931 assert_err!(rx.try_recv());
932 }
933
934 async fn client_server_local(tx: mpsc::Sender<()>) {
935 let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
936
937 // Get the assigned address
938 let addr = assert_ok!(server.local_addr());
939
940 // Spawn the server
941 task::spawn_local(async move {
942 // Accept a socket
943 let (mut socket, _) = server.accept().await.unwrap();
944
945 // Write some data
946 socket.write_all(b"hello").await.unwrap();
947 });
948
949 let mut client = TcpStream::connect(&addr).await.unwrap();
950
951 let mut buf = vec![];
952 client.read_to_end(&mut buf).await.unwrap();
953
954 assert_eq!(buf, b"hello");
955 tx.send(()).unwrap();
956 }
957
958 #[test]
959 fn coop() {
960 use std::task::Poll::Ready;
961
962 let mut rt = rt();
963
964 rt.block_on(async {
965 // Create a bunch of tasks
966 let mut tasks = (0..1_000).map(|_| {
967 tokio::spawn(async { })
968 }).collect::<Vec<_>>();
969
970 // Hope that all the tasks complete...
971 time::delay_for(Duration::from_millis(100)).await;
972
973 poll_fn(|cx| {
974 // At least one task should not be ready
975 for task in &mut tasks {
976 if Pin::new(task).poll(cx).is_pending() {
977 return Ready(());
978 }
979 }
980
981 panic!("did not yield");
982 }).await;
983 });
984 }
985
986 // Tests that the "next task" scheduler optimization is not able to starve
987 // other tasks.
988 #[test]
989 fn ping_pong_saturation() {
990 use tokio::sync::mpsc;
991
992 const NUM: usize = 100;
993
994 let mut rt = rt();
995
996 rt.block_on(async {
997 let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
998
999 // Spawn a bunch of tasks that ping ping between each other to
1000 // saturate the runtime.
1001 for _ in 0..NUM {
1002 let (tx1, mut rx1) = mpsc::unbounded_channel();
1003 let (tx2, mut rx2) = mpsc::unbounded_channel();
1004 let spawned_tx = spawned_tx.clone();
1005
1006 task::spawn(async move {
1007 spawned_tx.send(()).unwrap();
1008
1009 tx1.send(()).unwrap();
1010
1011 loop {
1012 rx2.recv().await.unwrap();
1013 tx1.send(()).unwrap();
1014 }
1015 });
1016
1017 task::spawn(async move {
1018 loop {
1019 rx1.recv().await.unwrap();
1020 tx2.send(()).unwrap();
1021 }
1022 });
1023 }
1024
1025 for _ in 0..NUM {
1026 spawned_rx.recv().await.unwrap();
1027 }
1028
1029 // spawn another task and wait for it to complete
1030 let handle = task::spawn(async {
1031 for _ in 0..5 {
1032 // Yielding forces it back into the local queue.
1033 task::yield_now().await;
1034 }
1035 });
1036 handle.await.unwrap();
1037 });
1038 }
1039 }
1040