1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 // Tests to run on both current-thread & therad-pool runtime variants.
6
7 macro_rules! rt_test {
8 ($($t:tt)*) => {
9 mod basic_scheduler {
10 $($t)*
11
12 fn rt() -> Runtime {
13 tokio::runtime::Builder::new()
14 .basic_scheduler()
15 .enable_all()
16 .build()
17 .unwrap()
18 }
19 }
20
21 mod threaded_scheduler_4_threads {
22 $($t)*
23
24 fn rt() -> Runtime {
25 tokio::runtime::Builder::new()
26 .threaded_scheduler()
27 .core_threads(4)
28 .enable_all()
29 .build()
30 .unwrap()
31 }
32 }
33
34 mod threaded_scheduler_1_thread {
35 $($t)*
36
37 fn rt() -> Runtime {
38 tokio::runtime::Builder::new()
39 .threaded_scheduler()
40 .core_threads(1)
41 .enable_all()
42 .build()
43 .unwrap()
44 }
45 }
46 }
47 }
48
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51 use tokio::runtime::Runtime;
52 fn is_send<T: Send + Sync>() {}
53
54 is_send::<Runtime>();
55 }
56
57 rt_test! {
58 use tokio::net::{TcpListener, TcpStream, UdpSocket};
59 use tokio::prelude::*;
60 use tokio::runtime::Runtime;
61 use tokio::sync::oneshot;
62 use tokio::{task, time};
63 use tokio_test::{assert_err, assert_ok};
64
65 use futures::future::poll_fn;
66 use std::future::Future;
67 use std::pin::Pin;
68 use std::sync::{mpsc, Arc};
69 use std::task::{Context, Poll};
70 use std::thread;
71 use std::time::{Duration, Instant};
72
73 #[test]
74 fn block_on_sync() {
75 let mut rt = rt();
76
77 let mut win = false;
78 rt.block_on(async {
79 win = true;
80 });
81
82 assert!(win);
83 }
84
85 #[test]
86 fn block_on_handle_sync() {
87 let rt = rt();
88
89 let mut win = false;
90 rt.handle().block_on(async {
91 win = true;
92 });
93
94 assert!(win);
95 }
96
97 #[test]
98 fn block_on_async() {
99 let mut rt = rt();
100
101 let out = rt.block_on(async {
102 let (tx, rx) = oneshot::channel();
103
104 thread::spawn(move || {
105 thread::sleep(Duration::from_millis(50));
106 tx.send("ZOMG").unwrap();
107 });
108
109 assert_ok!(rx.await)
110 });
111
112 assert_eq!(out, "ZOMG");
113 }
114
115 #[test]
116 fn block_on_handle_async() {
117 let rt = rt();
118
119 let out = rt.handle().block_on(async {
120 let (tx, rx) = oneshot::channel();
121
122 thread::spawn(move || {
123 thread::sleep(Duration::from_millis(50));
124 tx.send("ZOMG").unwrap();
125 });
126
127 assert_ok!(rx.await)
128 });
129
130 assert_eq!(out, "ZOMG");
131 }
132
133 #[test]
134 fn spawn_one_bg() {
135 let mut rt = rt();
136
137 let out = rt.block_on(async {
138 let (tx, rx) = oneshot::channel();
139
140 tokio::spawn(async move {
141 tx.send("ZOMG").unwrap();
142 });
143
144 assert_ok!(rx.await)
145 });
146
147 assert_eq!(out, "ZOMG");
148 }
149
150 #[test]
151 fn spawn_one_join() {
152 let mut rt = rt();
153
154 let out = rt.block_on(async {
155 let (tx, rx) = oneshot::channel();
156
157 let handle = tokio::spawn(async move {
158 tx.send("ZOMG").unwrap();
159 "DONE"
160 });
161
162 let msg = assert_ok!(rx.await);
163
164 let out = assert_ok!(handle.await);
165 assert_eq!(out, "DONE");
166
167 msg
168 });
169
170 assert_eq!(out, "ZOMG");
171 }
172
173 #[test]
174 fn spawn_two() {
175 let mut rt = rt();
176
177 let out = rt.block_on(async {
178 let (tx1, rx1) = oneshot::channel();
179 let (tx2, rx2) = oneshot::channel();
180
181 tokio::spawn(async move {
182 assert_ok!(tx1.send("ZOMG"));
183 });
184
185 tokio::spawn(async move {
186 let msg = assert_ok!(rx1.await);
187 assert_ok!(tx2.send(msg));
188 });
189
190 assert_ok!(rx2.await)
191 });
192
193 assert_eq!(out, "ZOMG");
194 }
195
196 #[test]
197 fn spawn_many_from_block_on() {
198 use tokio::sync::mpsc;
199
200 const ITER: usize = 200;
201
202 let mut rt = rt();
203
204 let out = rt.block_on(async {
205 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
206
207 let mut txs = (0..ITER)
208 .map(|i| {
209 let (tx, rx) = oneshot::channel();
210 let done_tx = done_tx.clone();
211
212 tokio::spawn(async move {
213 let msg = assert_ok!(rx.await);
214 assert_eq!(i, msg);
215 assert_ok!(done_tx.send(msg));
216 });
217
218 tx
219 })
220 .collect::<Vec<_>>();
221
222 drop(done_tx);
223
224 thread::spawn(move || {
225 for (i, tx) in txs.drain(..).enumerate() {
226 assert_ok!(tx.send(i));
227 }
228 });
229
230 let mut out = vec![];
231 while let Some(i) = done_rx.recv().await {
232 out.push(i);
233 }
234
235 out.sort();
236 out
237 });
238
239 assert_eq!(ITER, out.len());
240
241 for i in 0..ITER {
242 assert_eq!(i, out[i]);
243 }
244 }
245
246 #[test]
247 fn spawn_many_from_task() {
248 use tokio::sync::mpsc;
249
250 const ITER: usize = 500;
251
252 let mut rt = rt();
253
254 let out = rt.block_on(async {
255 tokio::spawn(async move {
256 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
257
258 /*
259 for _ in 0..100 {
260 tokio::spawn(async move { });
261 }
262
263 tokio::task::yield_now().await;
264 */
265
266 let mut txs = (0..ITER)
267 .map(|i| {
268 let (tx, rx) = oneshot::channel();
269 let done_tx = done_tx.clone();
270
271 tokio::spawn(async move {
272 let msg = assert_ok!(rx.await);
273 assert_eq!(i, msg);
274 assert_ok!(done_tx.send(msg));
275 });
276
277 tx
278 })
279 .collect::<Vec<_>>();
280
281 drop(done_tx);
282
283 thread::spawn(move || {
284 for (i, tx) in txs.drain(..).enumerate() {
285 assert_ok!(tx.send(i));
286 }
287 });
288
289 let mut out = vec![];
290 while let Some(i) = done_rx.recv().await {
291 out.push(i);
292 }
293
294 out.sort();
295 out
296 }).await.unwrap()
297 });
298
299 assert_eq!(ITER, out.len());
300
301 for i in 0..ITER {
302 assert_eq!(i, out[i]);
303 }
304 }
305
306 #[test]
307 fn spawn_await_chain() {
308 let mut rt = rt();
309
310 let out = rt.block_on(async {
311 assert_ok!(tokio::spawn(async {
312 assert_ok!(tokio::spawn(async {
313 "hello"
314 }).await)
315 }).await)
316 });
317
318 assert_eq!(out, "hello");
319 }
320
321 #[test]
322 fn outstanding_tasks_dropped() {
323 let mut rt = rt();
324
325 let cnt = Arc::new(());
326
327 rt.block_on(async {
328 let cnt = cnt.clone();
329
330 tokio::spawn(poll_fn(move |_| {
331 assert_eq!(2, Arc::strong_count(&cnt));
332 Poll::<()>::Pending
333 }));
334 });
335
336 assert_eq!(2, Arc::strong_count(&cnt));
337
338 drop(rt);
339
340 assert_eq!(1, Arc::strong_count(&cnt));
341 }
342
343 #[test]
344 #[should_panic]
345 fn nested_rt() {
346 let mut rt1 = rt();
347 let mut rt2 = rt();
348
349 rt1.block_on(async { rt2.block_on(async { "hello" }) });
350 }
351
352 #[test]
353 fn create_rt_in_block_on() {
354 let mut rt1 = rt();
355 let mut rt2 = rt1.block_on(async { rt() });
356 let out = rt2.block_on(async { "ZOMG" });
357
358 assert_eq!(out, "ZOMG");
359 }
360
361 #[test]
362 fn complete_block_on_under_load() {
363 let mut rt = rt();
364
365 rt.block_on(async {
366 let (tx, rx) = oneshot::channel();
367
368 // Spin hard
369 tokio::spawn(async {
370 loop {
371 yield_once().await;
372 }
373 });
374
375 thread::spawn(move || {
376 thread::sleep(Duration::from_millis(50));
377 assert_ok!(tx.send(()));
378 });
379
380 assert_ok!(rx.await);
381 });
382 }
383
384 #[test]
385 fn complete_task_under_load() {
386 let mut rt = rt();
387
388 rt.block_on(async {
389 let (tx1, rx1) = oneshot::channel();
390 let (tx2, rx2) = oneshot::channel();
391
392 // Spin hard
393 tokio::spawn(async {
394 loop {
395 yield_once().await;
396 }
397 });
398
399 thread::spawn(move || {
400 thread::sleep(Duration::from_millis(50));
401 assert_ok!(tx1.send(()));
402 });
403
404 tokio::spawn(async move {
405 assert_ok!(rx1.await);
406 assert_ok!(tx2.send(()));
407 });
408
409 assert_ok!(rx2.await);
410 });
411 }
412
413 #[test]
414 fn spawn_from_other_thread_idle() {
415 let mut rt = rt();
416 let handle = rt.handle().clone();
417
418 let (tx, rx) = oneshot::channel();
419
420 thread::spawn(move || {
421 thread::sleep(Duration::from_millis(50));
422
423 handle.spawn(async move {
424 assert_ok!(tx.send(()));
425 });
426 });
427
428 rt.block_on(async move {
429 assert_ok!(rx.await);
430 });
431 }
432
433 #[test]
434 fn spawn_from_other_thread_under_load() {
435 let mut rt = rt();
436 let handle = rt.handle().clone();
437
438 let (tx, rx) = oneshot::channel();
439
440 thread::spawn(move || {
441 handle.spawn(async move {
442 assert_ok!(tx.send(()));
443 });
444 });
445
446 rt.block_on(async move {
447 // Spin hard
448 tokio::spawn(async {
449 loop {
450 yield_once().await;
451 }
452 });
453
454 assert_ok!(rx.await);
455 });
456 }
457
458 #[test]
459 fn delay_at_root() {
460 let mut rt = rt();
461
462 let now = Instant::now();
463 let dur = Duration::from_millis(50);
464
465 rt.block_on(async move {
466 time::delay_for(dur).await;
467 });
468
469 assert!(now.elapsed() >= dur);
470 }
471
472 #[test]
473 fn delay_in_spawn() {
474 let mut rt = rt();
475
476 let now = Instant::now();
477 let dur = Duration::from_millis(50);
478
479 rt.block_on(async move {
480 let (tx, rx) = oneshot::channel();
481
482 tokio::spawn(async move {
483 time::delay_for(dur).await;
484 assert_ok!(tx.send(()));
485 });
486
487 assert_ok!(rx.await);
488 });
489
490 assert!(now.elapsed() >= dur);
491 }
492
493 #[test]
494 fn block_on_socket() {
495 let mut rt = rt();
496
497 rt.block_on(async move {
498 let (tx, rx) = oneshot::channel();
499
500 let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
501 let addr = listener.local_addr().unwrap();
502
503 tokio::spawn(async move {
504 let _ = listener.accept().await;
505 tx.send(()).unwrap();
506 });
507
508 TcpStream::connect(&addr).await.unwrap();
509 rx.await.unwrap();
510 });
511 }
512
513 #[test]
514 fn spawn_from_blocking() {
515 let mut rt = rt();
516
517 let out = rt.block_on(async move {
518 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
519 tokio::spawn(async move { "hello" })
520 }).await);
521
522 assert_ok!(inner.await)
523 });
524
525 assert_eq!(out, "hello")
526 }
527
528 #[test]
529 fn spawn_blocking_from_blocking() {
530 let mut rt = rt();
531
532 let out = rt.block_on(async move {
533 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
534 tokio::task::spawn_blocking(|| "hello")
535 }).await);
536
537 assert_ok!(inner.await)
538 });
539
540 assert_eq!(out, "hello")
541 }
542
543 #[test]
544 fn delay_from_blocking() {
545 let mut rt = rt();
546
547 rt.block_on(async move {
548 assert_ok!(tokio::task::spawn_blocking(|| {
549 let now = std::time::Instant::now();
550 let dur = Duration::from_millis(1);
551
552 // use the futures' block_on fn to make sure we aren't setting
553 // any Tokio context
554 futures::executor::block_on(async {
555 tokio::time::delay_for(dur).await;
556 });
557
558 assert!(now.elapsed() >= dur);
559 }).await);
560 });
561 }
562
563 #[test]
564 fn socket_from_blocking() {
565 let mut rt = rt();
566
567 rt.block_on(async move {
568 let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
569 let addr = assert_ok!(listener.local_addr());
570
571 let peer = tokio::task::spawn_blocking(move || {
572 // use the futures' block_on fn to make sure we aren't setting
573 // any Tokio context
574 futures::executor::block_on(async {
575 assert_ok!(TcpStream::connect(addr).await);
576 });
577 });
578
579 // Wait for the client to connect
580 let _ = assert_ok!(listener.accept().await);
581
582 assert_ok!(peer.await);
583 });
584 }
585
586 #[test]
587 fn spawn_blocking_after_shutdown() {
588 let rt = rt();
589 let handle = rt.handle().clone();
590
591 // Shutdown
592 drop(rt);
593
594 handle.enter(|| {
595 let res = task::spawn_blocking(|| unreachable!());
596
597 // Avoid using a tokio runtime
598 let out = futures::executor::block_on(res);
599 assert!(out.is_err());
600 });
601 }
602
603 #[test]
604 // IOCP requires setting the "max thread" concurrency value. The sane,
605 // default, is to set this to the number of cores. Threads that poll I/O
606 // become associated with the IOCP handle. Once those threads sleep for any
607 // reason (mutex), they yield their ownership.
608 //
609 // This test hits an edge case on windows where more threads than cores are
610 // created, none of those threads ever yield due to being at capacity, so
611 // IOCP gets "starved".
612 //
613 // For now, this is a very edge case that is probably not a real production
614 // concern. There also isn't a great/obvious solution to take. For now, the
615 // test is disabled.
616 #[cfg(not(windows))]
617 fn io_driver_called_when_under_load() {
618 let mut rt = rt();
619
620 // Create a lot of constant load. The scheduler will always be busy.
621 for _ in 0..100 {
622 rt.spawn(async {
623 loop {
624 tokio::task::yield_now().await;
625 }
626 });
627 }
628
629 // Do some I/O work
630 rt.block_on(async {
631 let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
632 let addr = assert_ok!(listener.local_addr());
633
634 let srv = tokio::spawn(async move {
635 let (mut stream, _) = assert_ok!(listener.accept().await);
636 assert_ok!(stream.write_all(b"hello world").await);
637 });
638
639 let cli = tokio::spawn(async move {
640 let mut stream = assert_ok!(TcpStream::connect(addr).await);
641 let mut dst = vec![0; 11];
642
643 assert_ok!(stream.read_exact(&mut dst).await);
644 assert_eq!(dst, b"hello world");
645 });
646
647 assert_ok!(srv.await);
648 assert_ok!(cli.await);
649 });
650 }
651
652 #[test]
653 fn client_server_block_on() {
654 let mut rt = rt();
655 let (tx, rx) = mpsc::channel();
656
657 rt.block_on(async move { client_server(tx).await });
658
659 assert_ok!(rx.try_recv());
660 assert_err!(rx.try_recv());
661 }
662
663 #[test]
664 fn panic_in_task() {
665 let mut rt = rt();
666 let (tx, rx) = oneshot::channel();
667
668 struct Boom(Option<oneshot::Sender<()>>);
669
670 impl Future for Boom {
671 type Output = ();
672
673 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
674 panic!();
675 }
676 }
677
678 impl Drop for Boom {
679 fn drop(&mut self) {
680 assert!(std::thread::panicking());
681 self.0.take().unwrap().send(()).unwrap();
682 }
683 }
684
685 rt.spawn(Boom(Some(tx)));
686 assert_ok!(rt.block_on(rx));
687 }
688
689 #[test]
690 #[should_panic]
691 fn panic_in_block_on() {
692 let mut rt = rt();
693 rt.block_on(async { panic!() });
694 }
695
696 async fn yield_once() {
697 let mut yielded = false;
698 poll_fn(|cx| {
699 if yielded {
700 Poll::Ready(())
701 } else {
702 yielded = true;
703 cx.waker().wake_by_ref();
704 Poll::Pending
705 }
706 })
707 .await
708 }
709
710 #[test]
711 fn enter_and_spawn() {
712 let mut rt = rt();
713 let handle = rt.enter(|| {
714 tokio::spawn(async {})
715 });
716
717 assert_ok!(rt.block_on(handle));
718 }
719
720 #[test]
721 fn eagerly_drops_futures_on_shutdown() {
722 use std::sync::mpsc;
723
724 struct Never {
725 drop_tx: mpsc::Sender<()>,
726 }
727
728 impl Future for Never {
729 type Output = ();
730
731 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
732 Poll::Pending
733 }
734 }
735
736 impl Drop for Never {
737 fn drop(&mut self) {
738 self.drop_tx.send(()).unwrap();
739 }
740 }
741
742 let mut rt = rt();
743
744 let (drop_tx, drop_rx) = mpsc::channel();
745 let (run_tx, run_rx) = oneshot::channel();
746
747 rt.block_on(async move {
748 tokio::spawn(async move {
749 assert_ok!(run_tx.send(()));
750
751 Never { drop_tx }.await
752 });
753
754 assert_ok!(run_rx.await);
755 });
756
757 drop(rt);
758
759 assert_ok!(drop_rx.recv());
760 }
761
762 #[test]
763 fn wake_while_rt_is_dropping() {
764 use tokio::task;
765
766 struct OnDrop<F: FnMut()>(F);
767
768 impl<F: FnMut()> Drop for OnDrop<F> {
769 fn drop(&mut self) {
770 (self.0)()
771 }
772 }
773
774 let (tx1, rx1) = oneshot::channel();
775 let (tx2, rx2) = oneshot::channel();
776 let (tx3, rx3) = oneshot::channel();
777
778 let mut rt = rt();
779
780 let h1 = rt.handle().clone();
781
782 rt.handle().spawn(async move {
783 // Ensure a waker gets stored in oneshot 1.
784 let _ = rx1.await;
785 tx3.send(()).unwrap();
786 });
787
788 rt.handle().spawn(async move {
789 // When this task is dropped, we'll be "closing remotes".
790 // We spawn a new task that owns the `tx1`, to move its Drop
791 // out of here.
792 //
793 // Importantly, the oneshot 1 has a waker already stored, so
794 // the eventual drop here will try to re-schedule again.
795 let mut opt_tx1 = Some(tx1);
796 let _d = OnDrop(move || {
797 let tx1 = opt_tx1.take().unwrap();
798 h1.spawn(async move {
799 tx1.send(()).unwrap();
800 });
801 });
802 let _ = rx2.await;
803 });
804
805 rt.handle().spawn(async move {
806 let _ = rx3.await;
807 // We'll never get here, but once task 3 drops, this will
808 // force task 2 to re-schedule since it's waiting on oneshot 2.
809 tx2.send(()).unwrap();
810 });
811
812 // Tick the loop
813 rt.block_on(async {
814 task::yield_now().await;
815 });
816
817 // Drop the rt
818 drop(rt);
819 }
820
821 #[test]
822 fn io_notify_while_shutting_down() {
823 use std::net::Ipv6Addr;
824
825 for _ in 1..10 {
826 let mut runtime = rt();
827
828 runtime.block_on(async {
829 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
830 let addr = socket.local_addr().unwrap();
831 let (mut recv_half, mut send_half) = socket.split();
832
833 tokio::spawn(async move {
834 let mut buf = [0];
835 loop {
836 recv_half.recv_from(&mut buf).await.unwrap();
837 std::thread::sleep(Duration::from_millis(2));
838 }
839 });
840
841 tokio::spawn(async move {
842 let buf = [0];
843 loop {
844 send_half.send_to(&buf, &addr).await.unwrap();
845 tokio::time::delay_for(Duration::from_millis(1)).await;
846 }
847 });
848
849 tokio::time::delay_for(Duration::from_millis(5)).await;
850 });
851 }
852 }
853
854 #[test]
855 fn shutdown_timeout() {
856 let (tx, rx) = oneshot::channel();
857 let mut runtime = rt();
858
859 runtime.block_on(async move {
860 task::spawn_blocking(move || {
861 tx.send(()).unwrap();
862 thread::sleep(Duration::from_secs(10_000));
863 });
864
865 rx.await.unwrap();
866 });
867
868 runtime.shutdown_timeout(Duration::from_millis(100));
869 }
870
871 #[test]
872 fn runtime_in_thread_local() {
873 use std::cell::RefCell;
874 use std::thread;
875
876 thread_local!(
877 static R: RefCell<Option<Runtime>> = RefCell::new(None);
878 );
879
880 thread::spawn(|| {
881 R.with(|cell| {
882 *cell.borrow_mut() = Some(rt());
883 });
884
885 let _rt = rt();
886 }).join().unwrap();
887 }
888
889 async fn client_server(tx: mpsc::Sender<()>) {
890 let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
891
892 // Get the assigned address
893 let addr = assert_ok!(server.local_addr());
894
895 // Spawn the server
896 tokio::spawn(async move {
897 // Accept a socket
898 let (mut socket, _) = server.accept().await.unwrap();
899
900 // Write some data
901 socket.write_all(b"hello").await.unwrap();
902 });
903
904 let mut client = TcpStream::connect(&addr).await.unwrap();
905
906 let mut buf = vec![];
907 client.read_to_end(&mut buf).await.unwrap();
908
909 assert_eq!(buf, b"hello");
910 tx.send(()).unwrap();
911 }
912
913 #[test]
914 fn local_set_block_on_socket() {
915 let mut rt = rt();
916 let local = task::LocalSet::new();
917
918 local.block_on(&mut rt, async move {
919 let (tx, rx) = oneshot::channel();
920
921 let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
922 let addr = listener.local_addr().unwrap();
923
924 task::spawn_local(async move {
925 let _ = listener.accept().await;
926 tx.send(()).unwrap();
927 });
928
929 TcpStream::connect(&addr).await.unwrap();
930 rx.await.unwrap();
931 });
932 }
933
934 #[test]
935 fn local_set_client_server_block_on() {
936 let mut rt = rt();
937 let (tx, rx) = mpsc::channel();
938
939 let local = task::LocalSet::new();
940
941 local.block_on(&mut rt, async move { client_server_local(tx).await });
942
943 assert_ok!(rx.try_recv());
944 assert_err!(rx.try_recv());
945 }
946
947 async fn client_server_local(tx: mpsc::Sender<()>) {
948 let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
949
950 // Get the assigned address
951 let addr = assert_ok!(server.local_addr());
952
953 // Spawn the server
954 task::spawn_local(async move {
955 // Accept a socket
956 let (mut socket, _) = server.accept().await.unwrap();
957
958 // Write some data
959 socket.write_all(b"hello").await.unwrap();
960 });
961
962 let mut client = TcpStream::connect(&addr).await.unwrap();
963
964 let mut buf = vec![];
965 client.read_to_end(&mut buf).await.unwrap();
966
967 assert_eq!(buf, b"hello");
968 tx.send(()).unwrap();
969 }
970
971 #[test]
972 fn coop() {
973 use std::task::Poll::Ready;
974
975 let mut rt = rt();
976
977 rt.block_on(async {
978 // Create a bunch of tasks
979 let mut tasks = (0..1_000).map(|_| {
980 tokio::spawn(async { })
981 }).collect::<Vec<_>>();
982
983 // Hope that all the tasks complete...
984 time::delay_for(Duration::from_millis(100)).await;
985
986 poll_fn(|cx| {
987 // At least one task should not be ready
988 for task in &mut tasks {
989 if Pin::new(task).poll(cx).is_pending() {
990 return Ready(());
991 }
992 }
993
994 panic!("did not yield");
995 }).await;
996 });
997 }
998
999 // Tests that the "next task" scheduler optimization is not able to starve
1000 // other tasks.
1001 #[test]
1002 fn ping_pong_saturation() {
1003 use tokio::sync::mpsc;
1004
1005 const NUM: usize = 100;
1006
1007 let mut rt = rt();
1008
1009 rt.block_on(async {
1010 let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1011
1012 // Spawn a bunch of tasks that ping ping between each other to
1013 // saturate the runtime.
1014 for _ in 0..NUM {
1015 let (tx1, mut rx1) = mpsc::unbounded_channel();
1016 let (tx2, mut rx2) = mpsc::unbounded_channel();
1017 let spawned_tx = spawned_tx.clone();
1018
1019 task::spawn(async move {
1020 spawned_tx.send(()).unwrap();
1021
1022 tx1.send(()).unwrap();
1023
1024 loop {
1025 rx2.recv().await.unwrap();
1026 tx1.send(()).unwrap();
1027 }
1028 });
1029
1030 task::spawn(async move {
1031 loop {
1032 rx1.recv().await.unwrap();
1033 tx2.send(()).unwrap();
1034 }
1035 });
1036 }
1037
1038 for _ in 0..NUM {
1039 spawned_rx.recv().await.unwrap();
1040 }
1041
1042 // spawn another task and wait for it to complete
1043 let handle = task::spawn(async {
1044 for _ in 0..5 {
1045 // Yielding forces it back into the local queue.
1046 task::yield_now().await;
1047 }
1048 });
1049 handle.await.unwrap();
1050 });
1051 }
1052 }
1053