1 #![allow(clippy::needless_range_loop)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 // Tests to run on both current-thread & thread-pool runtime variants.
6 
7 macro_rules! rt_test {
8     ($($t:tt)*) => {
9         mod current_thread_scheduler {
10             $($t)*
11 
12             fn rt() -> Arc<Runtime> {
13                 tokio::runtime::Builder::new_current_thread()
14                     .enable_all()
15                     .build()
16                     .unwrap()
17                     .into()
18             }
19         }
20 
21         mod threaded_scheduler_4_threads {
22             $($t)*
23 
24             fn rt() -> Arc<Runtime> {
25                 tokio::runtime::Builder::new_multi_thread()
26                     .worker_threads(4)
27                     .enable_all()
28                     .build()
29                     .unwrap()
30                     .into()
31             }
32         }
33 
34         mod threaded_scheduler_1_thread {
35             $($t)*
36 
37             fn rt() -> Arc<Runtime> {
38                 tokio::runtime::Builder::new_multi_thread()
39                     .worker_threads(1)
40                     .enable_all()
41                     .build()
42                     .unwrap()
43                     .into()
44             }
45         }
46     }
47 }
48 
49 #[test]
send_sync_bound()50 fn send_sync_bound() {
51     use tokio::runtime::Runtime;
52     fn is_send<T: Send + Sync>() {}
53 
54     is_send::<Runtime>();
55 }
56 
57 rt_test! {
58     use tokio::net::{TcpListener, TcpStream, UdpSocket};
59     use tokio::io::{AsyncReadExt, AsyncWriteExt};
60     use tokio::runtime::Runtime;
61     use tokio::sync::oneshot;
62     use tokio::{task, time};
63     use tokio_test::{assert_err, assert_ok};
64 
65     use futures::future::poll_fn;
66     use std::future::Future;
67     use std::pin::Pin;
68     use std::sync::{mpsc, Arc};
69     use std::task::{Context, Poll};
70     use std::thread;
71     use std::time::{Duration, Instant};
72 
73     #[test]
74     fn block_on_sync() {
75         let rt = rt();
76 
77         let mut win = false;
78         rt.block_on(async {
79             win = true;
80         });
81 
82         assert!(win);
83     }
84 
85 
86     #[test]
87     fn block_on_async() {
88         let rt = rt();
89 
90         let out = rt.block_on(async {
91             let (tx, rx) = oneshot::channel();
92 
93             thread::spawn(move || {
94                 thread::sleep(Duration::from_millis(50));
95                 tx.send("ZOMG").unwrap();
96             });
97 
98             assert_ok!(rx.await)
99         });
100 
101         assert_eq!(out, "ZOMG");
102     }
103 
104     #[test]
105     fn spawn_one_bg() {
106         let rt = rt();
107 
108         let out = rt.block_on(async {
109             let (tx, rx) = oneshot::channel();
110 
111             tokio::spawn(async move {
112                 tx.send("ZOMG").unwrap();
113             });
114 
115             assert_ok!(rx.await)
116         });
117 
118         assert_eq!(out, "ZOMG");
119     }
120 
121     #[test]
122     fn spawn_one_join() {
123         let rt = rt();
124 
125         let out = rt.block_on(async {
126             let (tx, rx) = oneshot::channel();
127 
128             let handle = tokio::spawn(async move {
129                 tx.send("ZOMG").unwrap();
130                 "DONE"
131             });
132 
133             let msg = assert_ok!(rx.await);
134 
135             let out = assert_ok!(handle.await);
136             assert_eq!(out, "DONE");
137 
138             msg
139         });
140 
141         assert_eq!(out, "ZOMG");
142     }
143 
144     #[test]
145     fn spawn_two() {
146         let rt = rt();
147 
148         let out = rt.block_on(async {
149             let (tx1, rx1) = oneshot::channel();
150             let (tx2, rx2) = oneshot::channel();
151 
152             tokio::spawn(async move {
153                 assert_ok!(tx1.send("ZOMG"));
154             });
155 
156             tokio::spawn(async move {
157                 let msg = assert_ok!(rx1.await);
158                 assert_ok!(tx2.send(msg));
159             });
160 
161             assert_ok!(rx2.await)
162         });
163 
164         assert_eq!(out, "ZOMG");
165     }
166 
167     #[test]
168     fn spawn_many_from_block_on() {
169         use tokio::sync::mpsc;
170 
171         const ITER: usize = 200;
172 
173         let rt = rt();
174 
175         let out = rt.block_on(async {
176             let (done_tx, mut done_rx) = mpsc::unbounded_channel();
177 
178             let mut txs = (0..ITER)
179                 .map(|i| {
180                     let (tx, rx) = oneshot::channel();
181                     let done_tx = done_tx.clone();
182 
183                     tokio::spawn(async move {
184                         let msg = assert_ok!(rx.await);
185                         assert_eq!(i, msg);
186                         assert_ok!(done_tx.send(msg));
187                     });
188 
189                     tx
190                 })
191                 .collect::<Vec<_>>();
192 
193             drop(done_tx);
194 
195             thread::spawn(move || {
196                 for (i, tx) in txs.drain(..).enumerate() {
197                     assert_ok!(tx.send(i));
198                 }
199             });
200 
201             let mut out = vec![];
202             while let Some(i) = done_rx.recv().await {
203                 out.push(i);
204             }
205 
206             out.sort_unstable();
207             out
208         });
209 
210         assert_eq!(ITER, out.len());
211 
212         for i in 0..ITER {
213             assert_eq!(i, out[i]);
214         }
215     }
216 
217     #[test]
218     fn spawn_many_from_task() {
219         use tokio::sync::mpsc;
220 
221         const ITER: usize = 500;
222 
223         let rt = rt();
224 
225         let out = rt.block_on(async {
226             tokio::spawn(async move {
227                 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
228 
229                 /*
230                 for _ in 0..100 {
231                     tokio::spawn(async move { });
232                 }
233 
234                 tokio::task::yield_now().await;
235                 */
236 
237                 let mut txs = (0..ITER)
238                     .map(|i| {
239                         let (tx, rx) = oneshot::channel();
240                         let done_tx = done_tx.clone();
241 
242                         tokio::spawn(async move {
243                             let msg = assert_ok!(rx.await);
244                             assert_eq!(i, msg);
245                             assert_ok!(done_tx.send(msg));
246                         });
247 
248                         tx
249                     })
250                     .collect::<Vec<_>>();
251 
252                 drop(done_tx);
253 
254                 thread::spawn(move || {
255                     for (i, tx) in txs.drain(..).enumerate() {
256                         assert_ok!(tx.send(i));
257                     }
258                 });
259 
260                 let mut out = vec![];
261                 while let Some(i) = done_rx.recv().await {
262                     out.push(i);
263                 }
264 
265                 out.sort_unstable();
266                 out
267             }).await.unwrap()
268         });
269 
270         assert_eq!(ITER, out.len());
271 
272         for i in 0..ITER {
273             assert_eq!(i, out[i]);
274         }
275     }
276 
277     #[test]
278     fn spawn_await_chain() {
279         let rt = rt();
280 
281         let out = rt.block_on(async {
282             assert_ok!(tokio::spawn(async {
283                 assert_ok!(tokio::spawn(async {
284                     "hello"
285                 }).await)
286             }).await)
287         });
288 
289         assert_eq!(out, "hello");
290     }
291 
292     #[test]
293     fn outstanding_tasks_dropped() {
294         let rt = rt();
295 
296         let cnt = Arc::new(());
297 
298         rt.block_on(async {
299             let cnt = cnt.clone();
300 
301             tokio::spawn(poll_fn(move |_| {
302                 assert_eq!(2, Arc::strong_count(&cnt));
303                 Poll::<()>::Pending
304             }));
305         });
306 
307         assert_eq!(2, Arc::strong_count(&cnt));
308 
309         drop(rt);
310 
311         assert_eq!(1, Arc::strong_count(&cnt));
312     }
313 
314     #[test]
315     #[should_panic]
316     fn nested_rt() {
317         let rt1 = rt();
318         let rt2 = rt();
319 
320         rt1.block_on(async { rt2.block_on(async { "hello" }) });
321     }
322 
323     #[test]
324     fn create_rt_in_block_on() {
325         let rt1 = rt();
326         let rt2 = rt1.block_on(async { rt() });
327         let out = rt2.block_on(async { "ZOMG" });
328 
329         assert_eq!(out, "ZOMG");
330     }
331 
332     #[test]
333     fn complete_block_on_under_load() {
334         let rt = rt();
335 
336         rt.block_on(async {
337             let (tx, rx) = oneshot::channel();
338 
339             // Spin hard
340             tokio::spawn(async {
341                 loop {
342                     yield_once().await;
343                 }
344             });
345 
346             thread::spawn(move || {
347                 thread::sleep(Duration::from_millis(50));
348                 assert_ok!(tx.send(()));
349             });
350 
351             assert_ok!(rx.await);
352         });
353     }
354 
355     #[test]
356     fn complete_task_under_load() {
357         let rt = rt();
358 
359         rt.block_on(async {
360             let (tx1, rx1) = oneshot::channel();
361             let (tx2, rx2) = oneshot::channel();
362 
363             // Spin hard
364             tokio::spawn(async {
365                 loop {
366                     yield_once().await;
367                 }
368             });
369 
370             thread::spawn(move || {
371                 thread::sleep(Duration::from_millis(50));
372                 assert_ok!(tx1.send(()));
373             });
374 
375             tokio::spawn(async move {
376                 assert_ok!(rx1.await);
377                 assert_ok!(tx2.send(()));
378             });
379 
380             assert_ok!(rx2.await);
381         });
382     }
383 
384     #[test]
385     fn spawn_from_other_thread_idle() {
386         let rt = rt();
387         let handle = rt.clone();
388 
389         let (tx, rx) = oneshot::channel();
390 
391         thread::spawn(move || {
392             thread::sleep(Duration::from_millis(50));
393 
394             handle.spawn(async move {
395                 assert_ok!(tx.send(()));
396             });
397         });
398 
399         rt.block_on(async move {
400             assert_ok!(rx.await);
401         });
402     }
403 
404     #[test]
405     fn spawn_from_other_thread_under_load() {
406         let rt = rt();
407         let handle = rt.clone();
408 
409         let (tx, rx) = oneshot::channel();
410 
411         thread::spawn(move || {
412             handle.spawn(async move {
413                 assert_ok!(tx.send(()));
414             });
415         });
416 
417         rt.block_on(async move {
418             // Spin hard
419             tokio::spawn(async {
420                 loop {
421                     yield_once().await;
422                 }
423             });
424 
425             assert_ok!(rx.await);
426         });
427     }
428 
429     #[test]
430     fn sleep_at_root() {
431         let rt = rt();
432 
433         let now = Instant::now();
434         let dur = Duration::from_millis(50);
435 
436         rt.block_on(async move {
437             time::sleep(dur).await;
438         });
439 
440         assert!(now.elapsed() >= dur);
441     }
442 
443     #[test]
444     fn sleep_in_spawn() {
445         let rt = rt();
446 
447         let now = Instant::now();
448         let dur = Duration::from_millis(50);
449 
450         rt.block_on(async move {
451             let (tx, rx) = oneshot::channel();
452 
453             tokio::spawn(async move {
454                 time::sleep(dur).await;
455                 assert_ok!(tx.send(()));
456             });
457 
458             assert_ok!(rx.await);
459         });
460 
461         assert!(now.elapsed() >= dur);
462     }
463 
464     #[test]
465     fn block_on_socket() {
466         let rt = rt();
467 
468         rt.block_on(async move {
469             let (tx, rx) = oneshot::channel();
470 
471             let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
472             let addr = listener.local_addr().unwrap();
473 
474             tokio::spawn(async move {
475                 let _ = listener.accept().await;
476                 tx.send(()).unwrap();
477             });
478 
479             TcpStream::connect(&addr).await.unwrap();
480             rx.await.unwrap();
481         });
482     }
483 
484     #[test]
485     fn spawn_from_blocking() {
486         let rt = rt();
487 
488         let out = rt.block_on(async move {
489             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
490                 tokio::spawn(async move { "hello" })
491             }).await);
492 
493             assert_ok!(inner.await)
494         });
495 
496         assert_eq!(out, "hello")
497     }
498 
499     #[test]
500     fn spawn_blocking_from_blocking() {
501         let rt = rt();
502 
503         let out = rt.block_on(async move {
504             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
505                 tokio::task::spawn_blocking(|| "hello")
506             }).await);
507 
508             assert_ok!(inner.await)
509         });
510 
511         assert_eq!(out, "hello")
512     }
513 
514     #[test]
515     fn sleep_from_blocking() {
516         let rt = rt();
517 
518         rt.block_on(async move {
519             assert_ok!(tokio::task::spawn_blocking(|| {
520                 let now = std::time::Instant::now();
521                 let dur = Duration::from_millis(1);
522 
523                 // use the futures' block_on fn to make sure we aren't setting
524                 // any Tokio context
525                 futures::executor::block_on(async {
526                     tokio::time::sleep(dur).await;
527                 });
528 
529                 assert!(now.elapsed() >= dur);
530             }).await);
531         });
532     }
533 
534     #[test]
535     fn socket_from_blocking() {
536         let rt = rt();
537 
538         rt.block_on(async move {
539             let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
540             let addr = assert_ok!(listener.local_addr());
541 
542             let peer = tokio::task::spawn_blocking(move || {
543                 // use the futures' block_on fn to make sure we aren't setting
544                 // any Tokio context
545                 futures::executor::block_on(async {
546                     assert_ok!(TcpStream::connect(addr).await);
547                 });
548             });
549 
550             // Wait for the client to connect
551             let _ = assert_ok!(listener.accept().await);
552 
553             assert_ok!(peer.await);
554         });
555     }
556 
557     #[test]
558     fn always_active_parker() {
559         // This test it to show that we will always have
560         // an active parker even if we call block_on concurrently
561 
562         let rt = rt();
563         let rt2 = rt.clone();
564 
565         let (tx1, rx1) = oneshot::channel();
566         let (tx2, rx2) = oneshot::channel();
567 
568         let jh1 = thread::spawn(move || {
569                 rt.block_on(async move {
570                     rx2.await.unwrap();
571                     time::sleep(Duration::from_millis(5)).await;
572                     tx1.send(()).unwrap();
573                 });
574         });
575 
576         let jh2 = thread::spawn(move || {
577             rt2.block_on(async move {
578                 tx2.send(()).unwrap();
579                 time::sleep(Duration::from_millis(5)).await;
580                 rx1.await.unwrap();
581                 time::sleep(Duration::from_millis(5)).await;
582             });
583         });
584 
585         jh1.join().unwrap();
586         jh2.join().unwrap();
587     }
588 
589     #[test]
590     // IOCP requires setting the "max thread" concurrency value. The sane,
591     // default, is to set this to the number of cores. Threads that poll I/O
592     // become associated with the IOCP handle. Once those threads sleep for any
593     // reason (mutex), they yield their ownership.
594     //
595     // This test hits an edge case on windows where more threads than cores are
596     // created, none of those threads ever yield due to being at capacity, so
597     // IOCP gets "starved".
598     //
599     // For now, this is a very edge case that is probably not a real production
600     // concern. There also isn't a great/obvious solution to take. For now, the
601     // test is disabled.
602     #[cfg(not(windows))]
603     fn io_driver_called_when_under_load() {
604         let rt = rt();
605 
606         // Create a lot of constant load. The scheduler will always be busy.
607         for _ in 0..100 {
608             rt.spawn(async {
609                 loop {
610                     tokio::task::yield_now().await;
611                 }
612             });
613         }
614 
615         // Do some I/O work
616         rt.block_on(async {
617             let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
618             let addr = assert_ok!(listener.local_addr());
619 
620             let srv = tokio::spawn(async move {
621                 let (mut stream, _) = assert_ok!(listener.accept().await);
622                 assert_ok!(stream.write_all(b"hello world").await);
623             });
624 
625             let cli = tokio::spawn(async move {
626                 let mut stream = assert_ok!(TcpStream::connect(addr).await);
627                 let mut dst = vec![0; 11];
628 
629                 assert_ok!(stream.read_exact(&mut dst).await);
630                 assert_eq!(dst, b"hello world");
631             });
632 
633             assert_ok!(srv.await);
634             assert_ok!(cli.await);
635         });
636     }
637 
638     #[test]
639     fn client_server_block_on() {
640         let rt = rt();
641         let (tx, rx) = mpsc::channel();
642 
643         rt.block_on(async move { client_server(tx).await });
644 
645         assert_ok!(rx.try_recv());
646         assert_err!(rx.try_recv());
647     }
648 
649     #[test]
650     fn panic_in_task() {
651         let rt = rt();
652         let (tx, rx) = oneshot::channel();
653 
654         struct Boom(Option<oneshot::Sender<()>>);
655 
656         impl Future for Boom {
657             type Output = ();
658 
659             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
660                 panic!();
661             }
662         }
663 
664         impl Drop for Boom {
665             fn drop(&mut self) {
666                 assert!(std::thread::panicking());
667                 self.0.take().unwrap().send(()).unwrap();
668             }
669         }
670 
671         rt.spawn(Boom(Some(tx)));
672         assert_ok!(rt.block_on(rx));
673     }
674 
675     #[test]
676     #[should_panic]
677     fn panic_in_block_on() {
678         let rt = rt();
679         rt.block_on(async { panic!() });
680     }
681 
682     async fn yield_once() {
683         let mut yielded = false;
684         poll_fn(|cx| {
685             if yielded {
686                 Poll::Ready(())
687             } else {
688                 yielded = true;
689                 cx.waker().wake_by_ref();
690                 Poll::Pending
691             }
692         })
693         .await
694     }
695 
696     #[test]
697     fn enter_and_spawn() {
698         let rt = rt();
699         let handle = {
700             let _enter = rt.enter();
701             tokio::spawn(async {})
702         };
703 
704         assert_ok!(rt.block_on(handle));
705     }
706 
707     #[test]
708     fn eagerly_drops_futures_on_shutdown() {
709         use std::sync::mpsc;
710 
711         struct Never {
712             drop_tx: mpsc::Sender<()>,
713         }
714 
715         impl Future for Never {
716             type Output = ();
717 
718             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
719                 Poll::Pending
720             }
721         }
722 
723         impl Drop for Never {
724             fn drop(&mut self) {
725                 self.drop_tx.send(()).unwrap();
726             }
727         }
728 
729         let rt = rt();
730 
731         let (drop_tx, drop_rx) = mpsc::channel();
732         let (run_tx, run_rx) = oneshot::channel();
733 
734         rt.block_on(async move {
735             tokio::spawn(async move {
736                 assert_ok!(run_tx.send(()));
737 
738                 Never { drop_tx }.await
739             });
740 
741             assert_ok!(run_rx.await);
742         });
743 
744         drop(rt);
745 
746         assert_ok!(drop_rx.recv());
747     }
748 
749     #[test]
750     fn wake_while_rt_is_dropping() {
751         use tokio::task;
752 
753         struct OnDrop<F: FnMut()>(F);
754 
755         impl<F: FnMut()> Drop for OnDrop<F> {
756             fn drop(&mut self) {
757                 (self.0)()
758             }
759         }
760 
761         let (tx1, rx1) = oneshot::channel();
762         let (tx2, rx2) = oneshot::channel();
763         let (tx3, rx3) = oneshot::channel();
764 
765         let rt = rt();
766 
767         let h1 = rt.clone();
768 
769         rt.spawn(async move {
770             // Ensure a waker gets stored in oneshot 1.
771             let _ = rx1.await;
772             tx3.send(()).unwrap();
773         });
774 
775         rt.spawn(async move {
776             // When this task is dropped, we'll be "closing remotes".
777             // We spawn a new task that owns the `tx1`, to move its Drop
778             // out of here.
779             //
780             // Importantly, the oneshot 1 has a waker already stored, so
781             // the eventual drop here will try to re-schedule again.
782             let mut opt_tx1 = Some(tx1);
783             let _d = OnDrop(move || {
784                 let tx1 = opt_tx1.take().unwrap();
785                 h1.spawn(async move {
786                     tx1.send(()).unwrap();
787                 });
788             });
789             let _ = rx2.await;
790         });
791 
792         rt.spawn(async move {
793             let _ = rx3.await;
794             // We'll never get here, but once task 3 drops, this will
795             // force task 2 to re-schedule since it's waiting on oneshot 2.
796             tx2.send(()).unwrap();
797         });
798 
799         // Tick the loop
800         rt.block_on(async {
801             task::yield_now().await;
802         });
803 
804         // Drop the rt
805         drop(rt);
806     }
807 
808     #[test]
809     fn io_notify_while_shutting_down() {
810         use std::net::Ipv6Addr;
811         use std::sync::Arc;
812 
813         for _ in 1..10 {
814             let runtime = rt();
815 
816             runtime.block_on(async {
817                 let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
818                 let addr = socket.local_addr().unwrap();
819                 let send_half = Arc::new(socket);
820                 let recv_half = send_half.clone();
821 
822                 tokio::spawn(async move {
823                     let mut buf = [0];
824                     loop {
825                         recv_half.recv_from(&mut buf).await.unwrap();
826                         std::thread::sleep(Duration::from_millis(2));
827                     }
828                 });
829 
830                 tokio::spawn(async move {
831                     let buf = [0];
832                     loop {
833                         send_half.send_to(&buf, &addr).await.unwrap();
834                         tokio::time::sleep(Duration::from_millis(1)).await;
835                     }
836                 });
837 
838                 tokio::time::sleep(Duration::from_millis(5)).await;
839             });
840         }
841     }
842 
843     #[test]
844     fn shutdown_timeout() {
845         let (tx, rx) = oneshot::channel();
846         let runtime = rt();
847 
848         runtime.block_on(async move {
849             task::spawn_blocking(move || {
850                 tx.send(()).unwrap();
851                 thread::sleep(Duration::from_secs(10_000));
852             });
853 
854             rx.await.unwrap();
855         });
856 
857         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
858     }
859 
860     #[test]
861     fn shutdown_timeout_0() {
862         let runtime = rt();
863 
864         runtime.block_on(async move {
865             task::spawn_blocking(move || {
866                 thread::sleep(Duration::from_secs(10_000));
867             });
868         });
869 
870         let now = Instant::now();
871         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
872         assert!(now.elapsed().as_secs() < 1);
873     }
874 
875     #[test]
876     fn shutdown_wakeup_time() {
877         let runtime = rt();
878 
879         runtime.block_on(async move {
880             tokio::time::sleep(std::time::Duration::from_millis(100)).await;
881         });
882 
883         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
884     }
885 
886     // This test is currently ignored on Windows because of a
887     // rust-lang issue in thread local storage destructors.
888     // See https://github.com/rust-lang/rust/issues/74875
889     #[test]
890     #[cfg(not(windows))]
891     fn runtime_in_thread_local() {
892         use std::cell::RefCell;
893         use std::thread;
894 
895         thread_local!(
896             static R: RefCell<Option<Runtime>> = RefCell::new(None);
897         );
898 
899         thread::spawn(|| {
900             R.with(|cell| {
901                 let rt = rt();
902                 let rt = Arc::try_unwrap(rt).unwrap();
903                 *cell.borrow_mut() = Some(rt);
904             });
905 
906             let _rt = rt();
907         }).join().unwrap();
908     }
909 
910     async fn client_server(tx: mpsc::Sender<()>) {
911         let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
912 
913         // Get the assigned address
914         let addr = assert_ok!(server.local_addr());
915 
916         // Spawn the server
917         tokio::spawn(async move {
918             // Accept a socket
919             let (mut socket, _) = server.accept().await.unwrap();
920 
921             // Write some data
922             socket.write_all(b"hello").await.unwrap();
923         });
924 
925         let mut client = TcpStream::connect(&addr).await.unwrap();
926 
927         let mut buf = vec![];
928         client.read_to_end(&mut buf).await.unwrap();
929 
930         assert_eq!(buf, b"hello");
931         tx.send(()).unwrap();
932     }
933 
934     #[test]
935     fn local_set_block_on_socket() {
936         let rt = rt();
937         let local = task::LocalSet::new();
938 
939         local.block_on(&rt, async move {
940             let (tx, rx) = oneshot::channel();
941 
942             let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
943             let addr = listener.local_addr().unwrap();
944 
945             task::spawn_local(async move {
946                 let _ = listener.accept().await;
947                 tx.send(()).unwrap();
948             });
949 
950             TcpStream::connect(&addr).await.unwrap();
951             rx.await.unwrap();
952         });
953     }
954 
955     #[test]
956     fn local_set_client_server_block_on() {
957         let rt = rt();
958         let (tx, rx) = mpsc::channel();
959 
960         let local = task::LocalSet::new();
961 
962         local.block_on(&rt, async move { client_server_local(tx).await });
963 
964         assert_ok!(rx.try_recv());
965         assert_err!(rx.try_recv());
966     }
967 
968     async fn client_server_local(tx: mpsc::Sender<()>) {
969         let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
970 
971         // Get the assigned address
972         let addr = assert_ok!(server.local_addr());
973 
974         // Spawn the server
975         task::spawn_local(async move {
976             // Accept a socket
977             let (mut socket, _) = server.accept().await.unwrap();
978 
979             // Write some data
980             socket.write_all(b"hello").await.unwrap();
981         });
982 
983         let mut client = TcpStream::connect(&addr).await.unwrap();
984 
985         let mut buf = vec![];
986         client.read_to_end(&mut buf).await.unwrap();
987 
988         assert_eq!(buf, b"hello");
989         tx.send(()).unwrap();
990     }
991 
992     #[test]
993     fn coop() {
994         use std::task::Poll::Ready;
995 
996         let rt = rt();
997 
998         rt.block_on(async {
999             // Create a bunch of tasks
1000             let mut tasks = (0..1_000).map(|_| {
1001                 tokio::spawn(async { })
1002             }).collect::<Vec<_>>();
1003 
1004             // Hope that all the tasks complete...
1005             time::sleep(Duration::from_millis(100)).await;
1006 
1007             poll_fn(|cx| {
1008                 // At least one task should not be ready
1009                 for task in &mut tasks {
1010                     if Pin::new(task).poll(cx).is_pending() {
1011                         return Ready(());
1012                     }
1013                 }
1014 
1015                 panic!("did not yield");
1016             }).await;
1017         });
1018     }
1019 
1020     #[test]
1021     fn coop_unconstrained() {
1022         use std::task::Poll::Ready;
1023 
1024         let rt = rt();
1025 
1026         rt.block_on(async {
1027             // Create a bunch of tasks
1028             let mut tasks = (0..1_000).map(|_| {
1029                 tokio::spawn(async { })
1030             }).collect::<Vec<_>>();
1031 
1032             // Hope that all the tasks complete...
1033             time::sleep(Duration::from_millis(100)).await;
1034 
1035             tokio::task::unconstrained(poll_fn(|cx| {
1036                 // All the tasks should be ready
1037                 for task in &mut tasks {
1038                     assert!(Pin::new(task).poll(cx).is_ready());
1039                 }
1040 
1041                 Ready(())
1042             })).await;
1043         });
1044     }
1045 
1046     // Tests that the "next task" scheduler optimization is not able to starve
1047     // other tasks.
1048     #[test]
1049     fn ping_pong_saturation() {
1050         use std::sync::atomic::{Ordering, AtomicBool};
1051         use tokio::sync::mpsc;
1052 
1053         const NUM: usize = 100;
1054 
1055         let rt = rt();
1056 
1057         let running = Arc::new(AtomicBool::new(true));
1058 
1059         rt.block_on(async {
1060             let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1061 
1062             let mut tasks = vec![];
1063             // Spawn a bunch of tasks that ping ping between each other to
1064             // saturate the runtime.
1065             for _ in 0..NUM {
1066                 let (tx1, mut rx1) = mpsc::unbounded_channel();
1067                 let (tx2, mut rx2) = mpsc::unbounded_channel();
1068                 let spawned_tx = spawned_tx.clone();
1069                 let running = running.clone();
1070                 tasks.push(task::spawn(async move {
1071                     spawned_tx.send(()).unwrap();
1072 
1073 
1074                     while running.load(Ordering::Relaxed) {
1075                         tx1.send(()).unwrap();
1076                         rx2.recv().await.unwrap();
1077                     }
1078 
1079                     // Close the channel and wait for the other task to exit.
1080                     drop(tx1);
1081                     assert!(rx2.recv().await.is_none());
1082                 }));
1083 
1084                 tasks.push(task::spawn(async move {
1085                     while rx1.recv().await.is_some() {
1086                         tx2.send(()).unwrap();
1087                     }
1088                 }));
1089             }
1090 
1091             for _ in 0..NUM {
1092                 spawned_rx.recv().await.unwrap();
1093             }
1094 
1095             // spawn another task and wait for it to complete
1096             let handle = task::spawn(async {
1097                 for _ in 0..5 {
1098                     // Yielding forces it back into the local queue.
1099                     task::yield_now().await;
1100                 }
1101             });
1102             handle.await.unwrap();
1103             running.store(false, Ordering::Relaxed);
1104             for t in tasks {
1105                 t.await.unwrap();
1106             }
1107         });
1108     }
1109 }
1110