1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 // Tests to run on both current-thread & therad-pool runtime variants.
6
7 macro_rules! rt_test {
8 ($($t:tt)*) => {
9 mod basic_scheduler {
10 $($t)*
11
12 fn rt() -> Runtime {
13 tokio::runtime::Builder::new()
14 .basic_scheduler()
15 .enable_all()
16 .build()
17 .unwrap()
18 }
19 }
20
21 mod threaded_scheduler_4_threads {
22 $($t)*
23
24 fn rt() -> Runtime {
25 tokio::runtime::Builder::new()
26 .threaded_scheduler()
27 .core_threads(4)
28 .enable_all()
29 .build()
30 .unwrap()
31 }
32 }
33
34 mod threaded_scheduler_1_thread {
35 $($t)*
36
37 fn rt() -> Runtime {
38 tokio::runtime::Builder::new()
39 .threaded_scheduler()
40 .core_threads(1)
41 .enable_all()
42 .build()
43 .unwrap()
44 }
45 }
46 }
47 }
48
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51 use tokio::runtime::Runtime;
52 fn is_send<T: Send + Sync>() {}
53
54 is_send::<Runtime>();
55 }
56
57 rt_test! {
58 use tokio::net::{TcpListener, TcpStream, UdpSocket};
59 use tokio::prelude::*;
60 use tokio::runtime::Runtime;
61 use tokio::sync::oneshot;
62 use tokio::{task, time};
63 use tokio_test::{assert_err, assert_ok};
64
65 use futures::future::poll_fn;
66 use std::future::Future;
67 use std::pin::Pin;
68 use std::sync::{mpsc, Arc};
69 use std::task::{Context, Poll};
70 use std::thread;
71 use std::time::{Duration, Instant};
72
73 #[test]
74 fn block_on_sync() {
75 let mut rt = rt();
76
77 let mut win = false;
78 rt.block_on(async {
79 win = true;
80 });
81
82 assert!(win);
83 }
84
85 #[test]
86 fn block_on_async() {
87 let mut rt = rt();
88
89 let out = rt.block_on(async {
90 let (tx, rx) = oneshot::channel();
91
92 thread::spawn(move || {
93 thread::sleep(Duration::from_millis(50));
94 tx.send("ZOMG").unwrap();
95 });
96
97 assert_ok!(rx.await)
98 });
99
100 assert_eq!(out, "ZOMG");
101 }
102
103 #[test]
104 fn spawn_one_bg() {
105 let mut rt = rt();
106
107 let out = rt.block_on(async {
108 let (tx, rx) = oneshot::channel();
109
110 tokio::spawn(async move {
111 tx.send("ZOMG").unwrap();
112 });
113
114 assert_ok!(rx.await)
115 });
116
117 assert_eq!(out, "ZOMG");
118 }
119
120 #[test]
121 fn spawn_one_join() {
122 let mut rt = rt();
123
124 let out = rt.block_on(async {
125 let (tx, rx) = oneshot::channel();
126
127 let handle = tokio::spawn(async move {
128 tx.send("ZOMG").unwrap();
129 "DONE"
130 });
131
132 let msg = assert_ok!(rx.await);
133
134 let out = assert_ok!(handle.await);
135 assert_eq!(out, "DONE");
136
137 msg
138 });
139
140 assert_eq!(out, "ZOMG");
141 }
142
143 #[test]
144 fn spawn_two() {
145 let mut rt = rt();
146
147 let out = rt.block_on(async {
148 let (tx1, rx1) = oneshot::channel();
149 let (tx2, rx2) = oneshot::channel();
150
151 tokio::spawn(async move {
152 assert_ok!(tx1.send("ZOMG"));
153 });
154
155 tokio::spawn(async move {
156 let msg = assert_ok!(rx1.await);
157 assert_ok!(tx2.send(msg));
158 });
159
160 assert_ok!(rx2.await)
161 });
162
163 assert_eq!(out, "ZOMG");
164 }
165
166 #[test]
167 fn spawn_many_from_block_on() {
168 use tokio::sync::mpsc;
169
170 const ITER: usize = 200;
171
172 let mut rt = rt();
173
174 let out = rt.block_on(async {
175 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
176
177 let mut txs = (0..ITER)
178 .map(|i| {
179 let (tx, rx) = oneshot::channel();
180 let done_tx = done_tx.clone();
181
182 tokio::spawn(async move {
183 let msg = assert_ok!(rx.await);
184 assert_eq!(i, msg);
185 assert_ok!(done_tx.send(msg));
186 });
187
188 tx
189 })
190 .collect::<Vec<_>>();
191
192 drop(done_tx);
193
194 thread::spawn(move || {
195 for (i, tx) in txs.drain(..).enumerate() {
196 assert_ok!(tx.send(i));
197 }
198 });
199
200 let mut out = vec![];
201 while let Some(i) = done_rx.recv().await {
202 out.push(i);
203 }
204
205 out.sort();
206 out
207 });
208
209 assert_eq!(ITER, out.len());
210
211 for i in 0..ITER {
212 assert_eq!(i, out[i]);
213 }
214 }
215
216 #[test]
217 fn spawn_many_from_task() {
218 use tokio::sync::mpsc;
219
220 const ITER: usize = 500;
221
222 let mut rt = rt();
223
224 let out = rt.block_on(async {
225 tokio::spawn(async move {
226 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
227
228 /*
229 for _ in 0..100 {
230 tokio::spawn(async move { });
231 }
232
233 tokio::task::yield_now().await;
234 */
235
236 let mut txs = (0..ITER)
237 .map(|i| {
238 let (tx, rx) = oneshot::channel();
239 let done_tx = done_tx.clone();
240
241 tokio::spawn(async move {
242 let msg = assert_ok!(rx.await);
243 assert_eq!(i, msg);
244 assert_ok!(done_tx.send(msg));
245 });
246
247 tx
248 })
249 .collect::<Vec<_>>();
250
251 drop(done_tx);
252
253 thread::spawn(move || {
254 for (i, tx) in txs.drain(..).enumerate() {
255 assert_ok!(tx.send(i));
256 }
257 });
258
259 let mut out = vec![];
260 while let Some(i) = done_rx.recv().await {
261 out.push(i);
262 }
263
264 out.sort();
265 out
266 }).await.unwrap()
267 });
268
269 assert_eq!(ITER, out.len());
270
271 for i in 0..ITER {
272 assert_eq!(i, out[i]);
273 }
274 }
275
276 #[test]
277 fn spawn_await_chain() {
278 let mut rt = rt();
279
280 let out = rt.block_on(async {
281 assert_ok!(tokio::spawn(async {
282 assert_ok!(tokio::spawn(async {
283 "hello"
284 }).await)
285 }).await)
286 });
287
288 assert_eq!(out, "hello");
289 }
290
291 #[test]
292 fn outstanding_tasks_dropped() {
293 let mut rt = rt();
294
295 let cnt = Arc::new(());
296
297 rt.block_on(async {
298 let cnt = cnt.clone();
299
300 tokio::spawn(poll_fn(move |_| {
301 assert_eq!(2, Arc::strong_count(&cnt));
302 Poll::<()>::Pending
303 }));
304 });
305
306 assert_eq!(2, Arc::strong_count(&cnt));
307
308 drop(rt);
309
310 assert_eq!(1, Arc::strong_count(&cnt));
311 }
312
313 #[test]
314 #[should_panic]
315 fn nested_rt() {
316 let mut rt1 = rt();
317 let mut rt2 = rt();
318
319 rt1.block_on(async { rt2.block_on(async { "hello" }) });
320 }
321
322 #[test]
323 fn create_rt_in_block_on() {
324 let mut rt1 = rt();
325 let mut rt2 = rt1.block_on(async { rt() });
326 let out = rt2.block_on(async { "ZOMG" });
327
328 assert_eq!(out, "ZOMG");
329 }
330
331 #[test]
332 fn complete_block_on_under_load() {
333 let mut rt = rt();
334
335 rt.block_on(async {
336 let (tx, rx) = oneshot::channel();
337
338 // Spin hard
339 tokio::spawn(async {
340 loop {
341 yield_once().await;
342 }
343 });
344
345 thread::spawn(move || {
346 thread::sleep(Duration::from_millis(50));
347 assert_ok!(tx.send(()));
348 });
349
350 assert_ok!(rx.await);
351 });
352 }
353
354 #[test]
355 fn complete_task_under_load() {
356 let mut rt = rt();
357
358 rt.block_on(async {
359 let (tx1, rx1) = oneshot::channel();
360 let (tx2, rx2) = oneshot::channel();
361
362 // Spin hard
363 tokio::spawn(async {
364 loop {
365 yield_once().await;
366 }
367 });
368
369 thread::spawn(move || {
370 thread::sleep(Duration::from_millis(50));
371 assert_ok!(tx1.send(()));
372 });
373
374 tokio::spawn(async move {
375 assert_ok!(rx1.await);
376 assert_ok!(tx2.send(()));
377 });
378
379 assert_ok!(rx2.await);
380 });
381 }
382
383 #[test]
384 fn spawn_from_other_thread_idle() {
385 let mut rt = rt();
386 let handle = rt.handle().clone();
387
388 let (tx, rx) = oneshot::channel();
389
390 thread::spawn(move || {
391 thread::sleep(Duration::from_millis(50));
392
393 handle.spawn(async move {
394 assert_ok!(tx.send(()));
395 });
396 });
397
398 rt.block_on(async move {
399 assert_ok!(rx.await);
400 });
401 }
402
403 #[test]
404 fn spawn_from_other_thread_under_load() {
405 let mut rt = rt();
406 let handle = rt.handle().clone();
407
408 let (tx, rx) = oneshot::channel();
409
410 thread::spawn(move || {
411 handle.spawn(async move {
412 assert_ok!(tx.send(()));
413 });
414 });
415
416 rt.block_on(async move {
417 // Spin hard
418 tokio::spawn(async {
419 loop {
420 yield_once().await;
421 }
422 });
423
424 assert_ok!(rx.await);
425 });
426 }
427
428 #[test]
429 fn delay_at_root() {
430 let mut rt = rt();
431
432 let now = Instant::now();
433 let dur = Duration::from_millis(50);
434
435 rt.block_on(async move {
436 time::delay_for(dur).await;
437 });
438
439 assert!(now.elapsed() >= dur);
440 }
441
442 #[test]
443 fn delay_in_spawn() {
444 let mut rt = rt();
445
446 let now = Instant::now();
447 let dur = Duration::from_millis(50);
448
449 rt.block_on(async move {
450 let (tx, rx) = oneshot::channel();
451
452 tokio::spawn(async move {
453 time::delay_for(dur).await;
454 assert_ok!(tx.send(()));
455 });
456
457 assert_ok!(rx.await);
458 });
459
460 assert!(now.elapsed() >= dur);
461 }
462
463 #[test]
464 fn block_on_socket() {
465 let mut rt = rt();
466
467 rt.block_on(async move {
468 let (tx, rx) = oneshot::channel();
469
470 let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
471 let addr = listener.local_addr().unwrap();
472
473 tokio::spawn(async move {
474 let _ = listener.accept().await;
475 tx.send(()).unwrap();
476 });
477
478 TcpStream::connect(&addr).await.unwrap();
479 rx.await.unwrap();
480 });
481 }
482
483 #[test]
484 fn spawn_from_blocking() {
485 let mut rt = rt();
486
487 let out = rt.block_on(async move {
488 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
489 tokio::spawn(async move { "hello" })
490 }).await);
491
492 assert_ok!(inner.await)
493 });
494
495 assert_eq!(out, "hello")
496 }
497
498 #[test]
499 fn spawn_blocking_from_blocking() {
500 let mut rt = rt();
501
502 let out = rt.block_on(async move {
503 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
504 tokio::task::spawn_blocking(|| "hello")
505 }).await);
506
507 assert_ok!(inner.await)
508 });
509
510 assert_eq!(out, "hello")
511 }
512
513 #[test]
514 fn delay_from_blocking() {
515 let mut rt = rt();
516
517 rt.block_on(async move {
518 assert_ok!(tokio::task::spawn_blocking(|| {
519 let now = std::time::Instant::now();
520 let dur = Duration::from_millis(1);
521
522 // use the futures' block_on fn to make sure we aren't setting
523 // any Tokio context
524 futures::executor::block_on(async {
525 tokio::time::delay_for(dur).await;
526 });
527
528 assert!(now.elapsed() >= dur);
529 }).await);
530 });
531 }
532
533 #[test]
534 fn socket_from_blocking() {
535 let mut rt = rt();
536
537 rt.block_on(async move {
538 let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
539 let addr = assert_ok!(listener.local_addr());
540
541 let peer = tokio::task::spawn_blocking(move || {
542 // use the futures' block_on fn to make sure we aren't setting
543 // any Tokio context
544 futures::executor::block_on(async {
545 assert_ok!(TcpStream::connect(addr).await);
546 });
547 });
548
549 // Wait for the client to connect
550 let _ = assert_ok!(listener.accept().await);
551
552 assert_ok!(peer.await);
553 });
554 }
555
556 #[test]
557 fn spawn_blocking_after_shutdown() {
558 let rt = rt();
559 let handle = rt.handle().clone();
560
561 // Shutdown
562 drop(rt);
563
564 handle.enter(|| {
565 let res = task::spawn_blocking(|| unreachable!());
566
567 // Avoid using a tokio runtime
568 let out = futures::executor::block_on(res);
569 assert!(out.is_err());
570 });
571 }
572
573 #[test]
574 fn io_driver_called_when_under_load() {
575 let mut rt = rt();
576
577 // Create a lot of constant load. The scheduler will always be busy.
578 for _ in 0..100 {
579 rt.spawn(async {
580 loop {
581 tokio::task::yield_now().await;
582 }
583 });
584 }
585
586 // Do some I/O work
587 rt.block_on(async {
588 let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
589 let addr = assert_ok!(listener.local_addr());
590
591 let srv = tokio::spawn(async move {
592 let (mut stream, _) = assert_ok!(listener.accept().await);
593 assert_ok!(stream.write_all(b"hello world").await);
594 });
595
596 let cli = tokio::spawn(async move {
597 let mut stream = assert_ok!(TcpStream::connect(addr).await);
598 let mut dst = vec![0; 11];
599
600 assert_ok!(stream.read_exact(&mut dst).await);
601 assert_eq!(dst, b"hello world");
602 });
603
604 assert_ok!(srv.await);
605 assert_ok!(cli.await);
606 });
607 }
608
609 #[test]
610 fn client_server_block_on() {
611 let mut rt = rt();
612 let (tx, rx) = mpsc::channel();
613
614 rt.block_on(async move { client_server(tx).await });
615
616 assert_ok!(rx.try_recv());
617 assert_err!(rx.try_recv());
618 }
619
620 #[test]
621 fn panic_in_task() {
622 let mut rt = rt();
623 let (tx, rx) = oneshot::channel();
624
625 struct Boom(Option<oneshot::Sender<()>>);
626
627 impl Future for Boom {
628 type Output = ();
629
630 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
631 panic!();
632 }
633 }
634
635 impl Drop for Boom {
636 fn drop(&mut self) {
637 assert!(std::thread::panicking());
638 self.0.take().unwrap().send(()).unwrap();
639 }
640 }
641
642 rt.spawn(Boom(Some(tx)));
643 assert_ok!(rt.block_on(rx));
644 }
645
646 #[test]
647 #[should_panic]
648 fn panic_in_block_on() {
649 let mut rt = rt();
650 rt.block_on(async { panic!() });
651 }
652
653 async fn yield_once() {
654 let mut yielded = false;
655 poll_fn(|cx| {
656 if yielded {
657 Poll::Ready(())
658 } else {
659 yielded = true;
660 cx.waker().wake_by_ref();
661 Poll::Pending
662 }
663 })
664 .await
665 }
666
667 #[test]
668 fn enter_and_spawn() {
669 let mut rt = rt();
670 let handle = rt.enter(|| {
671 tokio::spawn(async {})
672 });
673
674 assert_ok!(rt.block_on(handle));
675 }
676
677 #[test]
678 fn eagerly_drops_futures_on_shutdown() {
679 use std::sync::mpsc;
680
681 struct Never {
682 drop_tx: mpsc::Sender<()>,
683 }
684
685 impl Future for Never {
686 type Output = ();
687
688 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
689 Poll::Pending
690 }
691 }
692
693 impl Drop for Never {
694 fn drop(&mut self) {
695 self.drop_tx.send(()).unwrap();
696 }
697 }
698
699 let mut rt = rt();
700
701 let (drop_tx, drop_rx) = mpsc::channel();
702 let (run_tx, run_rx) = oneshot::channel();
703
704 rt.block_on(async move {
705 tokio::spawn(async move {
706 assert_ok!(run_tx.send(()));
707
708 Never { drop_tx }.await
709 });
710
711 assert_ok!(run_rx.await);
712 });
713
714 drop(rt);
715
716 assert_ok!(drop_rx.recv());
717 }
718
719 #[test]
720 fn wake_while_rt_is_dropping() {
721 use tokio::task;
722
723 struct OnDrop<F: FnMut()>(F);
724
725 impl<F: FnMut()> Drop for OnDrop<F> {
726 fn drop(&mut self) {
727 (self.0)()
728 }
729 }
730
731 let (tx1, rx1) = oneshot::channel();
732 let (tx2, rx2) = oneshot::channel();
733 let (tx3, rx3) = oneshot::channel();
734
735 let mut rt = rt();
736
737 let h1 = rt.handle().clone();
738
739 rt.handle().spawn(async move {
740 // Ensure a waker gets stored in oneshot 1.
741 let _ = rx1.await;
742 tx3.send(()).unwrap();
743 });
744
745 rt.handle().spawn(async move {
746 // When this task is dropped, we'll be "closing remotes".
747 // We spawn a new task that owns the `tx1`, to move its Drop
748 // out of here.
749 //
750 // Importantly, the oneshot 1 has a waker already stored, so
751 // the eventual drop here will try to re-schedule again.
752 let mut opt_tx1 = Some(tx1);
753 let _d = OnDrop(move || {
754 let tx1 = opt_tx1.take().unwrap();
755 h1.spawn(async move {
756 tx1.send(()).unwrap();
757 });
758 });
759 let _ = rx2.await;
760 });
761
762 rt.handle().spawn(async move {
763 let _ = rx3.await;
764 // We'll never get here, but once task 3 drops, this will
765 // force task 2 to re-schedule since it's waiting on oneshot 2.
766 tx2.send(()).unwrap();
767 });
768
769 // Tick the loop
770 rt.block_on(async {
771 task::yield_now().await;
772 });
773
774 // Drop the rt
775 drop(rt);
776 }
777
778 #[test]
779 fn io_notify_while_shutting_down() {
780 use std::net::Ipv6Addr;
781
782 for _ in 1..10 {
783 let mut runtime = rt();
784
785 runtime.block_on(async {
786 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
787 let addr = socket.local_addr().unwrap();
788 let (mut recv_half, mut send_half) = socket.split();
789
790 tokio::spawn(async move {
791 let mut buf = [0];
792 loop {
793 recv_half.recv_from(&mut buf).await.unwrap();
794 std::thread::sleep(Duration::from_millis(2));
795 }
796 });
797
798 tokio::spawn(async move {
799 let buf = [0];
800 loop {
801 send_half.send_to(&buf, &addr).await.unwrap();
802 tokio::time::delay_for(Duration::from_millis(1)).await;
803 }
804 });
805
806 tokio::time::delay_for(Duration::from_millis(5)).await;
807 });
808 }
809 }
810
811 #[test]
812 fn shutdown_timeout() {
813 let (tx, rx) = oneshot::channel();
814 let mut runtime = rt();
815
816 runtime.block_on(async move {
817 task::spawn_blocking(move || {
818 tx.send(()).unwrap();
819 thread::sleep(Duration::from_secs(10_000));
820 });
821
822 rx.await.unwrap();
823 });
824
825 runtime.shutdown_timeout(Duration::from_millis(100));
826 }
827
828 #[test]
829 fn runtime_in_thread_local() {
830 use std::cell::RefCell;
831 use std::thread;
832
833 thread_local!(
834 static R: RefCell<Option<Runtime>> = RefCell::new(None);
835 );
836
837 thread::spawn(|| {
838 R.with(|cell| {
839 *cell.borrow_mut() = Some(rt());
840 });
841
842 let _rt = rt();
843 }).join().unwrap();
844 }
845
846 async fn client_server(tx: mpsc::Sender<()>) {
847 let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
848
849 // Get the assigned address
850 let addr = assert_ok!(server.local_addr());
851
852 // Spawn the server
853 tokio::spawn(async move {
854 // Accept a socket
855 let (mut socket, _) = server.accept().await.unwrap();
856
857 // Write some data
858 socket.write_all(b"hello").await.unwrap();
859 });
860
861 let mut client = TcpStream::connect(&addr).await.unwrap();
862
863 let mut buf = vec![];
864 client.read_to_end(&mut buf).await.unwrap();
865
866 assert_eq!(buf, b"hello");
867 tx.send(()).unwrap();
868 }
869
870 #[test]
871 fn local_set_block_on_socket() {
872 let mut rt = rt();
873 let local = task::LocalSet::new();
874
875 local.block_on(&mut rt, async move {
876 let (tx, rx) = oneshot::channel();
877
878 let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
879 let addr = listener.local_addr().unwrap();
880
881 task::spawn_local(async move {
882 let _ = listener.accept().await;
883 tx.send(()).unwrap();
884 });
885
886 TcpStream::connect(&addr).await.unwrap();
887 rx.await.unwrap();
888 });
889 }
890
891 #[test]
892 fn local_set_client_server_block_on() {
893 let mut rt = rt();
894 let (tx, rx) = mpsc::channel();
895
896 let local = task::LocalSet::new();
897
898 local.block_on(&mut rt, async move { client_server_local(tx).await });
899
900 assert_ok!(rx.try_recv());
901 assert_err!(rx.try_recv());
902 }
903
904 async fn client_server_local(tx: mpsc::Sender<()>) {
905 let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
906
907 // Get the assigned address
908 let addr = assert_ok!(server.local_addr());
909
910 // Spawn the server
911 task::spawn_local(async move {
912 // Accept a socket
913 let (mut socket, _) = server.accept().await.unwrap();
914
915 // Write some data
916 socket.write_all(b"hello").await.unwrap();
917 });
918
919 let mut client = TcpStream::connect(&addr).await.unwrap();
920
921 let mut buf = vec![];
922 client.read_to_end(&mut buf).await.unwrap();
923
924 assert_eq!(buf, b"hello");
925 tx.send(()).unwrap();
926 }
927
928 #[test]
929 fn coop() {
930 use std::task::Poll::Ready;
931
932 let mut rt = rt();
933
934 rt.block_on(async {
935 // Create a bunch of tasks
936 let mut tasks = (0..1_000).map(|_| {
937 tokio::spawn(async { })
938 }).collect::<Vec<_>>();
939
940 // Hope that all the tasks complete...
941 time::delay_for(Duration::from_millis(100)).await;
942
943 poll_fn(|cx| {
944 // At least one task should not be ready
945 for task in &mut tasks {
946 if Pin::new(task).poll(cx).is_pending() {
947 return Ready(());
948 }
949 }
950
951 panic!("did not yield");
952 }).await;
953 });
954 }
955
956 // Tests that the "next task" scheduler optimization is not able to starve
957 // other tasks.
958 #[test]
959 fn ping_pong_saturation() {
960 use tokio::sync::mpsc;
961
962 const NUM: usize = 100;
963
964 let mut rt = rt();
965
966 rt.block_on(async {
967 let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
968
969 // Spawn a bunch of tasks that ping ping between each other to
970 // saturate the runtime.
971 for _ in 0..NUM {
972 let (tx1, mut rx1) = mpsc::unbounded_channel();
973 let (tx2, mut rx2) = mpsc::unbounded_channel();
974 let spawned_tx = spawned_tx.clone();
975
976 task::spawn(async move {
977 spawned_tx.send(()).unwrap();
978
979 tx1.send(()).unwrap();
980
981 loop {
982 rx2.recv().await.unwrap();
983 tx1.send(()).unwrap();
984 }
985 });
986
987 task::spawn(async move {
988 loop {
989 rx1.recv().await.unwrap();
990 tx2.send(()).unwrap();
991 }
992 });
993 }
994
995 for _ in 0..NUM {
996 spawned_rx.recv().await.unwrap();
997 }
998
999 // spawn another task and wait for it to complete
1000 let handle = task::spawn(async {
1001 for _ in 0..5 {
1002 // Yielding forces it back into the local queue.
1003 task::yield_now().await;
1004 }
1005 });
1006 handle.await.unwrap();
1007 });
1008 }
1009 }
1010