1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 // Tests to run on both current-thread & thread-pool runtime variants.
6
7 macro_rules! rt_test {
8 ($($t:tt)*) => {
9 mod current_thread_scheduler {
10 $($t)*
11
12 fn rt() -> Arc<Runtime> {
13 tokio::runtime::Builder::new_current_thread()
14 .enable_all()
15 .build()
16 .unwrap()
17 .into()
18 }
19 }
20
21 mod threaded_scheduler_4_threads {
22 $($t)*
23
24 fn rt() -> Arc<Runtime> {
25 tokio::runtime::Builder::new_multi_thread()
26 .worker_threads(4)
27 .enable_all()
28 .build()
29 .unwrap()
30 .into()
31 }
32 }
33
34 mod threaded_scheduler_1_thread {
35 $($t)*
36
37 fn rt() -> Arc<Runtime> {
38 tokio::runtime::Builder::new_multi_thread()
39 .worker_threads(1)
40 .enable_all()
41 .build()
42 .unwrap()
43 .into()
44 }
45 }
46 }
47 }
48
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51 use tokio::runtime::Runtime;
52 fn is_send<T: Send + Sync>() {}
53
54 is_send::<Runtime>();
55 }
56
57 rt_test! {
58 use tokio::net::{TcpListener, TcpStream, UdpSocket};
59 use tokio::io::{AsyncReadExt, AsyncWriteExt};
60 use tokio::runtime::Runtime;
61 use tokio::sync::oneshot;
62 use tokio::{task, time};
63 use tokio_test::{assert_err, assert_ok};
64
65 use futures::future::poll_fn;
66 use std::future::Future;
67 use std::pin::Pin;
68 use std::sync::{mpsc, Arc};
69 use std::task::{Context, Poll};
70 use std::thread;
71 use std::time::{Duration, Instant};
72
73 #[test]
74 fn block_on_sync() {
75 let rt = rt();
76
77 let mut win = false;
78 rt.block_on(async {
79 win = true;
80 });
81
82 assert!(win);
83 }
84
85
86 #[test]
87 fn block_on_async() {
88 let rt = rt();
89
90 let out = rt.block_on(async {
91 let (tx, rx) = oneshot::channel();
92
93 thread::spawn(move || {
94 thread::sleep(Duration::from_millis(50));
95 tx.send("ZOMG").unwrap();
96 });
97
98 assert_ok!(rx.await)
99 });
100
101 assert_eq!(out, "ZOMG");
102 }
103
104 #[test]
105 fn spawn_one_bg() {
106 let rt = rt();
107
108 let out = rt.block_on(async {
109 let (tx, rx) = oneshot::channel();
110
111 tokio::spawn(async move {
112 tx.send("ZOMG").unwrap();
113 });
114
115 assert_ok!(rx.await)
116 });
117
118 assert_eq!(out, "ZOMG");
119 }
120
121 #[test]
122 fn spawn_one_join() {
123 let rt = rt();
124
125 let out = rt.block_on(async {
126 let (tx, rx) = oneshot::channel();
127
128 let handle = tokio::spawn(async move {
129 tx.send("ZOMG").unwrap();
130 "DONE"
131 });
132
133 let msg = assert_ok!(rx.await);
134
135 let out = assert_ok!(handle.await);
136 assert_eq!(out, "DONE");
137
138 msg
139 });
140
141 assert_eq!(out, "ZOMG");
142 }
143
144 #[test]
145 fn spawn_two() {
146 let rt = rt();
147
148 let out = rt.block_on(async {
149 let (tx1, rx1) = oneshot::channel();
150 let (tx2, rx2) = oneshot::channel();
151
152 tokio::spawn(async move {
153 assert_ok!(tx1.send("ZOMG"));
154 });
155
156 tokio::spawn(async move {
157 let msg = assert_ok!(rx1.await);
158 assert_ok!(tx2.send(msg));
159 });
160
161 assert_ok!(rx2.await)
162 });
163
164 assert_eq!(out, "ZOMG");
165 }
166
167 #[test]
168 fn spawn_many_from_block_on() {
169 use tokio::sync::mpsc;
170
171 const ITER: usize = 200;
172
173 let rt = rt();
174
175 let out = rt.block_on(async {
176 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
177
178 let mut txs = (0..ITER)
179 .map(|i| {
180 let (tx, rx) = oneshot::channel();
181 let done_tx = done_tx.clone();
182
183 tokio::spawn(async move {
184 let msg = assert_ok!(rx.await);
185 assert_eq!(i, msg);
186 assert_ok!(done_tx.send(msg));
187 });
188
189 tx
190 })
191 .collect::<Vec<_>>();
192
193 drop(done_tx);
194
195 thread::spawn(move || {
196 for (i, tx) in txs.drain(..).enumerate() {
197 assert_ok!(tx.send(i));
198 }
199 });
200
201 let mut out = vec![];
202 while let Some(i) = done_rx.recv().await {
203 out.push(i);
204 }
205
206 out.sort_unstable();
207 out
208 });
209
210 assert_eq!(ITER, out.len());
211
212 for i in 0..ITER {
213 assert_eq!(i, out[i]);
214 }
215 }
216
217 #[test]
218 fn spawn_many_from_task() {
219 use tokio::sync::mpsc;
220
221 const ITER: usize = 500;
222
223 let rt = rt();
224
225 let out = rt.block_on(async {
226 tokio::spawn(async move {
227 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
228
229 /*
230 for _ in 0..100 {
231 tokio::spawn(async move { });
232 }
233
234 tokio::task::yield_now().await;
235 */
236
237 let mut txs = (0..ITER)
238 .map(|i| {
239 let (tx, rx) = oneshot::channel();
240 let done_tx = done_tx.clone();
241
242 tokio::spawn(async move {
243 let msg = assert_ok!(rx.await);
244 assert_eq!(i, msg);
245 assert_ok!(done_tx.send(msg));
246 });
247
248 tx
249 })
250 .collect::<Vec<_>>();
251
252 drop(done_tx);
253
254 thread::spawn(move || {
255 for (i, tx) in txs.drain(..).enumerate() {
256 assert_ok!(tx.send(i));
257 }
258 });
259
260 let mut out = vec![];
261 while let Some(i) = done_rx.recv().await {
262 out.push(i);
263 }
264
265 out.sort_unstable();
266 out
267 }).await.unwrap()
268 });
269
270 assert_eq!(ITER, out.len());
271
272 for i in 0..ITER {
273 assert_eq!(i, out[i]);
274 }
275 }
276
277 #[test]
278 fn spawn_await_chain() {
279 let rt = rt();
280
281 let out = rt.block_on(async {
282 assert_ok!(tokio::spawn(async {
283 assert_ok!(tokio::spawn(async {
284 "hello"
285 }).await)
286 }).await)
287 });
288
289 assert_eq!(out, "hello");
290 }
291
292 #[test]
293 fn outstanding_tasks_dropped() {
294 let rt = rt();
295
296 let cnt = Arc::new(());
297
298 rt.block_on(async {
299 let cnt = cnt.clone();
300
301 tokio::spawn(poll_fn(move |_| {
302 assert_eq!(2, Arc::strong_count(&cnt));
303 Poll::<()>::Pending
304 }));
305 });
306
307 assert_eq!(2, Arc::strong_count(&cnt));
308
309 drop(rt);
310
311 assert_eq!(1, Arc::strong_count(&cnt));
312 }
313
314 #[test]
315 #[should_panic]
316 fn nested_rt() {
317 let rt1 = rt();
318 let rt2 = rt();
319
320 rt1.block_on(async { rt2.block_on(async { "hello" }) });
321 }
322
323 #[test]
324 fn create_rt_in_block_on() {
325 let rt1 = rt();
326 let rt2 = rt1.block_on(async { rt() });
327 let out = rt2.block_on(async { "ZOMG" });
328
329 assert_eq!(out, "ZOMG");
330 }
331
332 #[test]
333 fn complete_block_on_under_load() {
334 let rt = rt();
335
336 rt.block_on(async {
337 let (tx, rx) = oneshot::channel();
338
339 // Spin hard
340 tokio::spawn(async {
341 loop {
342 yield_once().await;
343 }
344 });
345
346 thread::spawn(move || {
347 thread::sleep(Duration::from_millis(50));
348 assert_ok!(tx.send(()));
349 });
350
351 assert_ok!(rx.await);
352 });
353 }
354
355 #[test]
356 fn complete_task_under_load() {
357 let rt = rt();
358
359 rt.block_on(async {
360 let (tx1, rx1) = oneshot::channel();
361 let (tx2, rx2) = oneshot::channel();
362
363 // Spin hard
364 tokio::spawn(async {
365 loop {
366 yield_once().await;
367 }
368 });
369
370 thread::spawn(move || {
371 thread::sleep(Duration::from_millis(50));
372 assert_ok!(tx1.send(()));
373 });
374
375 tokio::spawn(async move {
376 assert_ok!(rx1.await);
377 assert_ok!(tx2.send(()));
378 });
379
380 assert_ok!(rx2.await);
381 });
382 }
383
384 #[test]
385 fn spawn_from_other_thread_idle() {
386 let rt = rt();
387 let handle = rt.clone();
388
389 let (tx, rx) = oneshot::channel();
390
391 thread::spawn(move || {
392 thread::sleep(Duration::from_millis(50));
393
394 handle.spawn(async move {
395 assert_ok!(tx.send(()));
396 });
397 });
398
399 rt.block_on(async move {
400 assert_ok!(rx.await);
401 });
402 }
403
404 #[test]
405 fn spawn_from_other_thread_under_load() {
406 let rt = rt();
407 let handle = rt.clone();
408
409 let (tx, rx) = oneshot::channel();
410
411 thread::spawn(move || {
412 handle.spawn(async move {
413 assert_ok!(tx.send(()));
414 });
415 });
416
417 rt.block_on(async move {
418 // Spin hard
419 tokio::spawn(async {
420 loop {
421 yield_once().await;
422 }
423 });
424
425 assert_ok!(rx.await);
426 });
427 }
428
429 #[test]
430 fn sleep_at_root() {
431 let rt = rt();
432
433 let now = Instant::now();
434 let dur = Duration::from_millis(50);
435
436 rt.block_on(async move {
437 time::sleep(dur).await;
438 });
439
440 assert!(now.elapsed() >= dur);
441 }
442
443 #[test]
444 fn sleep_in_spawn() {
445 let rt = rt();
446
447 let now = Instant::now();
448 let dur = Duration::from_millis(50);
449
450 rt.block_on(async move {
451 let (tx, rx) = oneshot::channel();
452
453 tokio::spawn(async move {
454 time::sleep(dur).await;
455 assert_ok!(tx.send(()));
456 });
457
458 assert_ok!(rx.await);
459 });
460
461 assert!(now.elapsed() >= dur);
462 }
463
464 #[test]
465 fn block_on_socket() {
466 let rt = rt();
467
468 rt.block_on(async move {
469 let (tx, rx) = oneshot::channel();
470
471 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
472 let addr = listener.local_addr().unwrap();
473
474 tokio::spawn(async move {
475 let _ = listener.accept().await;
476 tx.send(()).unwrap();
477 });
478
479 TcpStream::connect(&addr).await.unwrap();
480 rx.await.unwrap();
481 });
482 }
483
484 #[test]
485 fn spawn_from_blocking() {
486 let rt = rt();
487
488 let out = rt.block_on(async move {
489 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
490 tokio::spawn(async move { "hello" })
491 }).await);
492
493 assert_ok!(inner.await)
494 });
495
496 assert_eq!(out, "hello")
497 }
498
499 #[test]
500 fn spawn_blocking_from_blocking() {
501 let rt = rt();
502
503 let out = rt.block_on(async move {
504 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
505 tokio::task::spawn_blocking(|| "hello")
506 }).await);
507
508 assert_ok!(inner.await)
509 });
510
511 assert_eq!(out, "hello")
512 }
513
514 #[test]
515 fn sleep_from_blocking() {
516 let rt = rt();
517
518 rt.block_on(async move {
519 assert_ok!(tokio::task::spawn_blocking(|| {
520 let now = std::time::Instant::now();
521 let dur = Duration::from_millis(1);
522
523 // use the futures' block_on fn to make sure we aren't setting
524 // any Tokio context
525 futures::executor::block_on(async {
526 tokio::time::sleep(dur).await;
527 });
528
529 assert!(now.elapsed() >= dur);
530 }).await);
531 });
532 }
533
534 #[test]
535 fn socket_from_blocking() {
536 let rt = rt();
537
538 rt.block_on(async move {
539 let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
540 let addr = assert_ok!(listener.local_addr());
541
542 let peer = tokio::task::spawn_blocking(move || {
543 // use the futures' block_on fn to make sure we aren't setting
544 // any Tokio context
545 futures::executor::block_on(async {
546 assert_ok!(TcpStream::connect(addr).await);
547 });
548 });
549
550 // Wait for the client to connect
551 let _ = assert_ok!(listener.accept().await);
552
553 assert_ok!(peer.await);
554 });
555 }
556
557 #[test]
558 fn always_active_parker() {
559 // This test it to show that we will always have
560 // an active parker even if we call block_on concurrently
561
562 let rt = rt();
563 let rt2 = rt.clone();
564
565 let (tx1, rx1) = oneshot::channel();
566 let (tx2, rx2) = oneshot::channel();
567
568 let jh1 = thread::spawn(move || {
569 rt.block_on(async move {
570 rx2.await.unwrap();
571 time::sleep(Duration::from_millis(5)).await;
572 tx1.send(()).unwrap();
573 });
574 });
575
576 let jh2 = thread::spawn(move || {
577 rt2.block_on(async move {
578 tx2.send(()).unwrap();
579 time::sleep(Duration::from_millis(5)).await;
580 rx1.await.unwrap();
581 time::sleep(Duration::from_millis(5)).await;
582 });
583 });
584
585 jh1.join().unwrap();
586 jh2.join().unwrap();
587 }
588
589 #[test]
590 // IOCP requires setting the "max thread" concurrency value. The sane,
591 // default, is to set this to the number of cores. Threads that poll I/O
592 // become associated with the IOCP handle. Once those threads sleep for any
593 // reason (mutex), they yield their ownership.
594 //
595 // This test hits an edge case on windows where more threads than cores are
596 // created, none of those threads ever yield due to being at capacity, so
597 // IOCP gets "starved".
598 //
599 // For now, this is a very edge case that is probably not a real production
600 // concern. There also isn't a great/obvious solution to take. For now, the
601 // test is disabled.
602 #[cfg(not(windows))]
603 fn io_driver_called_when_under_load() {
604 let rt = rt();
605
606 // Create a lot of constant load. The scheduler will always be busy.
607 for _ in 0..100 {
608 rt.spawn(async {
609 loop {
610 tokio::task::yield_now().await;
611 }
612 });
613 }
614
615 // Do some I/O work
616 rt.block_on(async {
617 let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
618 let addr = assert_ok!(listener.local_addr());
619
620 let srv = tokio::spawn(async move {
621 let (mut stream, _) = assert_ok!(listener.accept().await);
622 assert_ok!(stream.write_all(b"hello world").await);
623 });
624
625 let cli = tokio::spawn(async move {
626 let mut stream = assert_ok!(TcpStream::connect(addr).await);
627 let mut dst = vec![0; 11];
628
629 assert_ok!(stream.read_exact(&mut dst).await);
630 assert_eq!(dst, b"hello world");
631 });
632
633 assert_ok!(srv.await);
634 assert_ok!(cli.await);
635 });
636 }
637
638 #[test]
639 fn client_server_block_on() {
640 let rt = rt();
641 let (tx, rx) = mpsc::channel();
642
643 rt.block_on(async move { client_server(tx).await });
644
645 assert_ok!(rx.try_recv());
646 assert_err!(rx.try_recv());
647 }
648
649 #[test]
650 fn panic_in_task() {
651 let rt = rt();
652 let (tx, rx) = oneshot::channel();
653
654 struct Boom(Option<oneshot::Sender<()>>);
655
656 impl Future for Boom {
657 type Output = ();
658
659 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
660 panic!();
661 }
662 }
663
664 impl Drop for Boom {
665 fn drop(&mut self) {
666 assert!(std::thread::panicking());
667 self.0.take().unwrap().send(()).unwrap();
668 }
669 }
670
671 rt.spawn(Boom(Some(tx)));
672 assert_ok!(rt.block_on(rx));
673 }
674
675 #[test]
676 #[should_panic]
677 fn panic_in_block_on() {
678 let rt = rt();
679 rt.block_on(async { panic!() });
680 }
681
682 async fn yield_once() {
683 let mut yielded = false;
684 poll_fn(|cx| {
685 if yielded {
686 Poll::Ready(())
687 } else {
688 yielded = true;
689 cx.waker().wake_by_ref();
690 Poll::Pending
691 }
692 })
693 .await
694 }
695
696 #[test]
697 fn enter_and_spawn() {
698 let rt = rt();
699 let handle = {
700 let _enter = rt.enter();
701 tokio::spawn(async {})
702 };
703
704 assert_ok!(rt.block_on(handle));
705 }
706
707 #[test]
708 fn eagerly_drops_futures_on_shutdown() {
709 use std::sync::mpsc;
710
711 struct Never {
712 drop_tx: mpsc::Sender<()>,
713 }
714
715 impl Future for Never {
716 type Output = ();
717
718 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
719 Poll::Pending
720 }
721 }
722
723 impl Drop for Never {
724 fn drop(&mut self) {
725 self.drop_tx.send(()).unwrap();
726 }
727 }
728
729 let rt = rt();
730
731 let (drop_tx, drop_rx) = mpsc::channel();
732 let (run_tx, run_rx) = oneshot::channel();
733
734 rt.block_on(async move {
735 tokio::spawn(async move {
736 assert_ok!(run_tx.send(()));
737
738 Never { drop_tx }.await
739 });
740
741 assert_ok!(run_rx.await);
742 });
743
744 drop(rt);
745
746 assert_ok!(drop_rx.recv());
747 }
748
749 #[test]
750 fn wake_while_rt_is_dropping() {
751 use tokio::task;
752
753 struct OnDrop<F: FnMut()>(F);
754
755 impl<F: FnMut()> Drop for OnDrop<F> {
756 fn drop(&mut self) {
757 (self.0)()
758 }
759 }
760
761 let (tx1, rx1) = oneshot::channel();
762 let (tx2, rx2) = oneshot::channel();
763 let (tx3, rx3) = oneshot::channel();
764
765 let rt = rt();
766
767 let h1 = rt.clone();
768
769 rt.spawn(async move {
770 // Ensure a waker gets stored in oneshot 1.
771 let _ = rx1.await;
772 tx3.send(()).unwrap();
773 });
774
775 rt.spawn(async move {
776 // When this task is dropped, we'll be "closing remotes".
777 // We spawn a new task that owns the `tx1`, to move its Drop
778 // out of here.
779 //
780 // Importantly, the oneshot 1 has a waker already stored, so
781 // the eventual drop here will try to re-schedule again.
782 let mut opt_tx1 = Some(tx1);
783 let _d = OnDrop(move || {
784 let tx1 = opt_tx1.take().unwrap();
785 h1.spawn(async move {
786 tx1.send(()).unwrap();
787 });
788 });
789 let _ = rx2.await;
790 });
791
792 rt.spawn(async move {
793 let _ = rx3.await;
794 // We'll never get here, but once task 3 drops, this will
795 // force task 2 to re-schedule since it's waiting on oneshot 2.
796 tx2.send(()).unwrap();
797 });
798
799 // Tick the loop
800 rt.block_on(async {
801 task::yield_now().await;
802 });
803
804 // Drop the rt
805 drop(rt);
806 }
807
808 #[test]
809 fn io_notify_while_shutting_down() {
810 use std::net::Ipv6Addr;
811 use std::sync::Arc;
812
813 for _ in 1..10 {
814 let runtime = rt();
815
816 runtime.block_on(async {
817 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
818 let addr = socket.local_addr().unwrap();
819 let send_half = Arc::new(socket);
820 let recv_half = send_half.clone();
821
822 tokio::spawn(async move {
823 let mut buf = [0];
824 loop {
825 recv_half.recv_from(&mut buf).await.unwrap();
826 std::thread::sleep(Duration::from_millis(2));
827 }
828 });
829
830 tokio::spawn(async move {
831 let buf = [0];
832 loop {
833 send_half.send_to(&buf, &addr).await.unwrap();
834 tokio::time::sleep(Duration::from_millis(1)).await;
835 }
836 });
837
838 tokio::time::sleep(Duration::from_millis(5)).await;
839 });
840 }
841 }
842
843 #[test]
844 fn shutdown_timeout() {
845 let (tx, rx) = oneshot::channel();
846 let runtime = rt();
847
848 runtime.block_on(async move {
849 task::spawn_blocking(move || {
850 tx.send(()).unwrap();
851 thread::sleep(Duration::from_secs(10_000));
852 });
853
854 rx.await.unwrap();
855 });
856
857 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
858 }
859
860 #[test]
861 fn shutdown_timeout_0() {
862 let runtime = rt();
863
864 runtime.block_on(async move {
865 task::spawn_blocking(move || {
866 thread::sleep(Duration::from_secs(10_000));
867 });
868 });
869
870 let now = Instant::now();
871 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
872 assert!(now.elapsed().as_secs() < 1);
873 }
874
875 #[test]
876 fn shutdown_wakeup_time() {
877 let runtime = rt();
878
879 runtime.block_on(async move {
880 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
881 });
882
883 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
884 }
885
886 // This test is currently ignored on Windows because of a
887 // rust-lang issue in thread local storage destructors.
888 // See https://github.com/rust-lang/rust/issues/74875
889 #[test]
890 #[cfg(not(windows))]
891 fn runtime_in_thread_local() {
892 use std::cell::RefCell;
893 use std::thread;
894
895 thread_local!(
896 static R: RefCell<Option<Runtime>> = RefCell::new(None);
897 );
898
899 thread::spawn(|| {
900 R.with(|cell| {
901 let rt = rt();
902 let rt = Arc::try_unwrap(rt).unwrap();
903 *cell.borrow_mut() = Some(rt);
904 });
905
906 let _rt = rt();
907 }).join().unwrap();
908 }
909
910 async fn client_server(tx: mpsc::Sender<()>) {
911 let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
912
913 // Get the assigned address
914 let addr = assert_ok!(server.local_addr());
915
916 // Spawn the server
917 tokio::spawn(async move {
918 // Accept a socket
919 let (mut socket, _) = server.accept().await.unwrap();
920
921 // Write some data
922 socket.write_all(b"hello").await.unwrap();
923 });
924
925 let mut client = TcpStream::connect(&addr).await.unwrap();
926
927 let mut buf = vec![];
928 client.read_to_end(&mut buf).await.unwrap();
929
930 assert_eq!(buf, b"hello");
931 tx.send(()).unwrap();
932 }
933
934 #[test]
935 fn local_set_block_on_socket() {
936 let rt = rt();
937 let local = task::LocalSet::new();
938
939 local.block_on(&rt, async move {
940 let (tx, rx) = oneshot::channel();
941
942 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
943 let addr = listener.local_addr().unwrap();
944
945 task::spawn_local(async move {
946 let _ = listener.accept().await;
947 tx.send(()).unwrap();
948 });
949
950 TcpStream::connect(&addr).await.unwrap();
951 rx.await.unwrap();
952 });
953 }
954
955 #[test]
956 fn local_set_client_server_block_on() {
957 let rt = rt();
958 let (tx, rx) = mpsc::channel();
959
960 let local = task::LocalSet::new();
961
962 local.block_on(&rt, async move { client_server_local(tx).await });
963
964 assert_ok!(rx.try_recv());
965 assert_err!(rx.try_recv());
966 }
967
968 async fn client_server_local(tx: mpsc::Sender<()>) {
969 let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
970
971 // Get the assigned address
972 let addr = assert_ok!(server.local_addr());
973
974 // Spawn the server
975 task::spawn_local(async move {
976 // Accept a socket
977 let (mut socket, _) = server.accept().await.unwrap();
978
979 // Write some data
980 socket.write_all(b"hello").await.unwrap();
981 });
982
983 let mut client = TcpStream::connect(&addr).await.unwrap();
984
985 let mut buf = vec![];
986 client.read_to_end(&mut buf).await.unwrap();
987
988 assert_eq!(buf, b"hello");
989 tx.send(()).unwrap();
990 }
991
992 #[test]
993 fn coop() {
994 use std::task::Poll::Ready;
995
996 let rt = rt();
997
998 rt.block_on(async {
999 // Create a bunch of tasks
1000 let mut tasks = (0..1_000).map(|_| {
1001 tokio::spawn(async { })
1002 }).collect::<Vec<_>>();
1003
1004 // Hope that all the tasks complete...
1005 time::sleep(Duration::from_millis(100)).await;
1006
1007 poll_fn(|cx| {
1008 // At least one task should not be ready
1009 for task in &mut tasks {
1010 if Pin::new(task).poll(cx).is_pending() {
1011 return Ready(());
1012 }
1013 }
1014
1015 panic!("did not yield");
1016 }).await;
1017 });
1018 }
1019
1020 #[test]
1021 fn coop_unconstrained() {
1022 use std::task::Poll::Ready;
1023
1024 let rt = rt();
1025
1026 rt.block_on(async {
1027 // Create a bunch of tasks
1028 let mut tasks = (0..1_000).map(|_| {
1029 tokio::spawn(async { })
1030 }).collect::<Vec<_>>();
1031
1032 // Hope that all the tasks complete...
1033 time::sleep(Duration::from_millis(100)).await;
1034
1035 tokio::task::unconstrained(poll_fn(|cx| {
1036 // All the tasks should be ready
1037 for task in &mut tasks {
1038 assert!(Pin::new(task).poll(cx).is_ready());
1039 }
1040
1041 Ready(())
1042 })).await;
1043 });
1044 }
1045
1046 // Tests that the "next task" scheduler optimization is not able to starve
1047 // other tasks.
1048 #[test]
1049 fn ping_pong_saturation() {
1050 use std::sync::atomic::{Ordering, AtomicBool};
1051 use tokio::sync::mpsc;
1052
1053 const NUM: usize = 100;
1054
1055 let rt = rt();
1056
1057 let running = Arc::new(AtomicBool::new(true));
1058
1059 rt.block_on(async {
1060 let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1061
1062 let mut tasks = vec![];
1063 // Spawn a bunch of tasks that ping ping between each other to
1064 // saturate the runtime.
1065 for _ in 0..NUM {
1066 let (tx1, mut rx1) = mpsc::unbounded_channel();
1067 let (tx2, mut rx2) = mpsc::unbounded_channel();
1068 let spawned_tx = spawned_tx.clone();
1069 let running = running.clone();
1070 tasks.push(task::spawn(async move {
1071 spawned_tx.send(()).unwrap();
1072
1073
1074 while running.load(Ordering::Relaxed) {
1075 tx1.send(()).unwrap();
1076 rx2.recv().await.unwrap();
1077 }
1078
1079 // Close the channel and wait for the other task to exit.
1080 drop(tx1);
1081 assert!(rx2.recv().await.is_none());
1082 }));
1083
1084 tasks.push(task::spawn(async move {
1085 while rx1.recv().await.is_some() {
1086 tx2.send(()).unwrap();
1087 }
1088 }));
1089 }
1090
1091 for _ in 0..NUM {
1092 spawned_rx.recv().await.unwrap();
1093 }
1094
1095 // spawn another task and wait for it to complete
1096 let handle = task::spawn(async {
1097 for _ in 0..5 {
1098 // Yielding forces it back into the local queue.
1099 task::yield_now().await;
1100 }
1101 });
1102 handle.await.unwrap();
1103 running.store(false, Ordering::Relaxed);
1104 for t in tasks {
1105 t.await.unwrap();
1106 }
1107 });
1108 }
1109 }
1110