1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3 
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6     atomic::{AtomicBool, Ordering},
7     Arc,
8 };
9 use std::time::Duration;
10 use std::{
11     future::Future,
12     io::{self, ErrorKind, Read, Write},
13     task::{Context, Waker},
14 };
15 
16 use nix::errno::Errno;
17 use nix::unistd::{close, read, write};
18 
19 use futures::{poll, FutureExt};
20 
21 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
22 use tokio_test::{assert_err, assert_pending};
23 
24 struct TestWaker {
25     inner: Arc<TestWakerInner>,
26     waker: Waker,
27 }
28 
29 #[derive(Default)]
30 struct TestWakerInner {
31     awoken: AtomicBool,
32 }
33 
34 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)35     fn wake_by_ref(arc_self: &Arc<Self>) {
36         arc_self.awoken.store(true, Ordering::SeqCst);
37     }
38 }
39 
40 impl TestWaker {
new() -> Self41     fn new() -> Self {
42         let inner: Arc<TestWakerInner> = Default::default();
43 
44         Self {
45             inner: inner.clone(),
46             waker: futures::task::waker(inner),
47         }
48     }
49 
awoken(&self) -> bool50     fn awoken(&self) -> bool {
51         self.inner.awoken.swap(false, Ordering::SeqCst)
52     }
53 
context(&self) -> Context<'_>54     fn context(&self) -> Context<'_> {
55         Context::from_waker(&self.waker)
56     }
57 }
58 
is_blocking(e: &nix::Error) -> bool59 fn is_blocking(e: &nix::Error) -> bool {
60     Some(Errno::EAGAIN) == e.as_errno()
61 }
62 
63 #[derive(Debug)]
64 struct FileDescriptor {
65     fd: RawFd,
66 }
67 
68 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd69     fn as_raw_fd(&self) -> RawFd {
70         self.fd
71     }
72 }
73 
74 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>75     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
76         match read(self.fd, buf) {
77             Ok(n) => Ok(n),
78             Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
79             Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
80         }
81     }
82 }
83 
84 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>85     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
86         (self as &Self).read(buf)
87     }
88 }
89 
90 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>91     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
92         match write(self.fd, buf) {
93             Ok(n) => Ok(n),
94             Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
95             Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
96         }
97     }
98 
flush(&mut self) -> io::Result<()>99     fn flush(&mut self) -> io::Result<()> {
100         Ok(())
101     }
102 }
103 
104 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>105     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
106         (self as &Self).write(buf)
107     }
108 
flush(&mut self) -> io::Result<()>109     fn flush(&mut self) -> io::Result<()> {
110         (self as &Self).flush()
111     }
112 }
113 
114 impl Drop for FileDescriptor {
drop(&mut self)115     fn drop(&mut self) {
116         let _ = close(self.fd);
117     }
118 }
119 
set_nonblocking(fd: RawFd)120 fn set_nonblocking(fd: RawFd) {
121     use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
122 
123     let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
124 
125     if flags < 0 {
126         panic!(
127             "bad return value from fcntl(F_GETFL): {} ({:?})",
128             flags,
129             nix::Error::last()
130         );
131     }
132 
133     let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
134 
135     nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
136 }
137 
socketpair() -> (FileDescriptor, FileDescriptor)138 fn socketpair() -> (FileDescriptor, FileDescriptor) {
139     use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
140 
141     let (fd_a, fd_b) = socket::socketpair(
142         AddressFamily::Unix,
143         SockType::Stream,
144         None,
145         SockFlag::empty(),
146     )
147     .expect("socketpair");
148     let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
149 
150     set_nonblocking(fds.0.fd);
151     set_nonblocking(fds.1.fd);
152 
153     fds
154 }
155 
drain(mut fd: &FileDescriptor)156 fn drain(mut fd: &FileDescriptor) {
157     let mut buf = [0u8; 512];
158 
159     loop {
160         match fd.read(&mut buf[..]) {
161             Err(e) if e.kind() == ErrorKind::WouldBlock => break,
162             Ok(0) => panic!("unexpected EOF"),
163             Err(e) => panic!("unexpected error: {:?}", e),
164             Ok(_) => continue,
165         }
166     }
167 }
168 
169 #[tokio::test]
initially_writable()170 async fn initially_writable() {
171     let (a, b) = socketpair();
172 
173     let afd_a = AsyncFd::new(a).unwrap();
174     let afd_b = AsyncFd::new(b).unwrap();
175 
176     afd_a.writable().await.unwrap().clear_ready();
177     afd_b.writable().await.unwrap().clear_ready();
178 
179     futures::select_biased! {
180         _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
181         _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
182         _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
183     }
184 }
185 
186 #[tokio::test]
reset_readable()187 async fn reset_readable() {
188     let (a, mut b) = socketpair();
189 
190     let afd_a = AsyncFd::new(a).unwrap();
191 
192     let readable = afd_a.readable();
193     tokio::pin!(readable);
194 
195     tokio::select! {
196         _ = readable.as_mut() => panic!(),
197         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
198     }
199 
200     b.write_all(b"0").unwrap();
201 
202     let mut guard = readable.await.unwrap();
203 
204     guard
205         .try_io(|_| afd_a.get_ref().read(&mut [0]))
206         .unwrap()
207         .unwrap();
208 
209     // `a` is not readable, but the reactor still thinks it is
210     // (because we have not observed a not-ready error yet)
211     afd_a.readable().await.unwrap().retain_ready();
212 
213     // Explicitly clear the ready state
214     guard.clear_ready();
215 
216     let readable = afd_a.readable();
217     tokio::pin!(readable);
218 
219     tokio::select! {
220         _ = readable.as_mut() => panic!(),
221         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
222     }
223 
224     b.write_all(b"0").unwrap();
225 
226     // We can observe the new readable event
227     afd_a.readable().await.unwrap().clear_ready();
228 }
229 
230 #[tokio::test]
reset_writable()231 async fn reset_writable() {
232     let (a, b) = socketpair();
233 
234     let afd_a = AsyncFd::new(a).unwrap();
235 
236     let mut guard = afd_a.writable().await.unwrap();
237 
238     // Write until we get a WouldBlock. This also clears the ready state.
239     while guard
240         .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
241         .is_ok()
242     {}
243 
244     // Writable state should be cleared now.
245     let writable = afd_a.writable();
246     tokio::pin!(writable);
247 
248     tokio::select! {
249         _ = writable.as_mut() => panic!(),
250         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
251     }
252 
253     // Read from the other side; we should become writable now.
254     drain(&b);
255 
256     let _ = writable.await.unwrap();
257 }
258 
259 #[derive(Debug)]
260 struct ArcFd<T>(Arc<T>);
261 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd262     fn as_raw_fd(&self) -> RawFd {
263         self.0.as_raw_fd()
264     }
265 }
266 
267 #[tokio::test]
drop_closes()268 async fn drop_closes() {
269     let (a, mut b) = socketpair();
270 
271     let afd_a = AsyncFd::new(a).unwrap();
272 
273     assert_eq!(
274         ErrorKind::WouldBlock,
275         b.read(&mut [0]).err().unwrap().kind()
276     );
277 
278     std::mem::drop(afd_a);
279 
280     assert_eq!(0, b.read(&mut [0]).unwrap());
281 
282     // into_inner does not close the fd
283 
284     let (a, mut b) = socketpair();
285     let afd_a = AsyncFd::new(a).unwrap();
286     let _a: FileDescriptor = afd_a.into_inner();
287 
288     assert_eq!(
289         ErrorKind::WouldBlock,
290         b.read(&mut [0]).err().unwrap().kind()
291     );
292 
293     // Drop closure behavior is delegated to the inner object
294     let (a, mut b) = socketpair();
295     let arc_fd = Arc::new(a);
296     let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
297     std::mem::drop(afd_a);
298 
299     assert_eq!(
300         ErrorKind::WouldBlock,
301         b.read(&mut [0]).err().unwrap().kind()
302     );
303 
304     std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
305 }
306 
307 #[tokio::test]
reregister()308 async fn reregister() {
309     let (a, _b) = socketpair();
310 
311     let afd_a = AsyncFd::new(a).unwrap();
312     let a = afd_a.into_inner();
313     AsyncFd::new(a).unwrap();
314 }
315 
316 #[tokio::test]
try_io()317 async fn try_io() {
318     let (a, mut b) = socketpair();
319 
320     b.write_all(b"0").unwrap();
321 
322     let afd_a = AsyncFd::new(a).unwrap();
323 
324     let mut guard = afd_a.readable().await.unwrap();
325 
326     afd_a.get_ref().read_exact(&mut [0]).unwrap();
327 
328     // Should not clear the readable state
329     let _ = guard.try_io(|_| Ok(()));
330 
331     // Still readable...
332     let _ = afd_a.readable().await.unwrap();
333 
334     // Should clear the readable state
335     let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
336 
337     // Assert not readable
338     let readable = afd_a.readable();
339     tokio::pin!(readable);
340 
341     tokio::select! {
342         _ = readable.as_mut() => panic!(),
343         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
344     }
345 
346     // Write something down b again and make sure we're reawoken
347     b.write_all(b"0").unwrap();
348     let _ = readable.await.unwrap();
349 }
350 
351 #[tokio::test]
multiple_waiters()352 async fn multiple_waiters() {
353     let (a, mut b) = socketpair();
354     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
355 
356     let barrier = Arc::new(tokio::sync::Barrier::new(11));
357 
358     let mut tasks = Vec::new();
359     for _ in 0..10 {
360         let afd_a = afd_a.clone();
361         let barrier = barrier.clone();
362 
363         let f = async move {
364             let notify_barrier = async {
365                 barrier.wait().await;
366                 futures::future::pending::<()>().await;
367             };
368 
369             futures::select_biased! {
370                 guard = afd_a.readable().fuse() => {
371                     tokio::task::yield_now().await;
372                     guard.unwrap().clear_ready()
373                 },
374                 _ = notify_barrier.fuse() => unreachable!(),
375             }
376 
377             std::mem::drop(afd_a);
378         };
379 
380         tasks.push(tokio::spawn(f));
381     }
382 
383     let mut all_tasks = futures::future::try_join_all(tasks);
384 
385     tokio::select! {
386         r = std::pin::Pin::new(&mut all_tasks) => {
387             r.unwrap(); // propagate panic
388             panic!("Tasks exited unexpectedly")
389         },
390         _ = barrier.wait() => {}
391     };
392 
393     b.write_all(b"0").unwrap();
394 
395     all_tasks.await.unwrap();
396 }
397 
398 #[tokio::test]
poll_fns()399 async fn poll_fns() {
400     let (a, b) = socketpair();
401     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
402     let afd_b = Arc::new(AsyncFd::new(b).unwrap());
403 
404     // Fill up the write side of A
405     while afd_a.get_ref().write(&[0; 512]).is_ok() {}
406 
407     let waker = TestWaker::new();
408 
409     assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
410 
411     let afd_a_2 = afd_a.clone();
412     let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
413     let barrier_clone = r_barrier.clone();
414 
415     let read_fut = tokio::spawn(async move {
416         // Move waker onto this task first
417         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
418             .as_ref()
419             .poll_read_ready(cx))));
420         barrier_clone.wait().await;
421 
422         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
423     });
424 
425     let afd_a_2 = afd_a.clone();
426     let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
427     let barrier_clone = w_barrier.clone();
428 
429     let mut write_fut = tokio::spawn(async move {
430         // Move waker onto this task first
431         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
432             .as_ref()
433             .poll_write_ready(cx))));
434         barrier_clone.wait().await;
435 
436         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
437     });
438 
439     r_barrier.wait().await;
440     w_barrier.wait().await;
441 
442     let readable = afd_a.readable();
443     tokio::pin!(readable);
444 
445     tokio::select! {
446         _ = &mut readable => unreachable!(),
447         _ = tokio::task::yield_now() => {}
448     }
449 
450     // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
451     afd_b.get_ref().write_all(b"0").unwrap();
452 
453     let _ = tokio::join!(readable, read_fut);
454 
455     // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
456     assert!(!waker.awoken());
457 
458     // The writable side should not be awoken
459     tokio::select! {
460         _ = &mut write_fut => unreachable!(),
461         _ = tokio::time::sleep(Duration::from_millis(5)) => {}
462     }
463 
464     // Make it writable now
465     drain(afd_b.get_ref());
466 
467     // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
468     let _ = write_fut.await;
469 }
470 
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>471 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
472     let mut pinned = Box::pin(f);
473 
474     assert_pending!(pinned
475         .as_mut()
476         .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
477 
478     pinned
479 }
480 
rt() -> tokio::runtime::Runtime481 fn rt() -> tokio::runtime::Runtime {
482     tokio::runtime::Builder::new_current_thread()
483         .enable_all()
484         .build()
485         .unwrap()
486 }
487 
488 #[test]
driver_shutdown_wakes_currently_pending()489 fn driver_shutdown_wakes_currently_pending() {
490     let rt = rt();
491 
492     let (a, _b) = socketpair();
493     let afd_a = {
494         let _enter = rt.enter();
495         AsyncFd::new(a).unwrap()
496     };
497 
498     let readable = assert_pending(afd_a.readable());
499 
500     std::mem::drop(rt);
501 
502     // The future was initialized **before** dropping the rt
503     assert_err!(futures::executor::block_on(readable));
504 
505     // The future is initialized **after** dropping the rt.
506     assert_err!(futures::executor::block_on(afd_a.readable()));
507 }
508 
509 #[test]
driver_shutdown_wakes_future_pending()510 fn driver_shutdown_wakes_future_pending() {
511     let rt = rt();
512 
513     let (a, _b) = socketpair();
514     let afd_a = {
515         let _enter = rt.enter();
516         AsyncFd::new(a).unwrap()
517     };
518 
519     std::mem::drop(rt);
520 
521     assert_err!(futures::executor::block_on(afd_a.readable()));
522 }
523 
524 #[test]
driver_shutdown_wakes_pending_race()525 fn driver_shutdown_wakes_pending_race() {
526     // TODO: make this a loom test
527     for _ in 0..100 {
528         let rt = rt();
529 
530         let (a, _b) = socketpair();
531         let afd_a = {
532             let _enter = rt.enter();
533             AsyncFd::new(a).unwrap()
534         };
535 
536         let _ = std::thread::spawn(move || std::mem::drop(rt));
537 
538         // This may or may not return an error (but will be awoken)
539         let _ = futures::executor::block_on(afd_a.readable());
540 
541         // However retrying will always return an error
542         assert_err!(futures::executor::block_on(afd_a.readable()));
543     }
544 }
545 
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>546 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
547     futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
548 }
549 
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>550 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
551     futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
552 }
553 
554 #[test]
driver_shutdown_wakes_currently_pending_polls()555 fn driver_shutdown_wakes_currently_pending_polls() {
556     let rt = rt();
557 
558     let (a, _b) = socketpair();
559     let afd_a = {
560         let _enter = rt.enter();
561         AsyncFd::new(a).unwrap()
562     };
563 
564     while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
565 
566     let readable = assert_pending(poll_readable(&afd_a));
567     let writable = assert_pending(poll_writable(&afd_a));
568 
569     std::mem::drop(rt);
570 
571     // Attempting to poll readiness when the rt is dropped is an error
572     assert_err!(futures::executor::block_on(readable));
573     assert_err!(futures::executor::block_on(writable));
574 }
575 
576 #[test]
driver_shutdown_wakes_poll()577 fn driver_shutdown_wakes_poll() {
578     let rt = rt();
579 
580     let (a, _b) = socketpair();
581     let afd_a = {
582         let _enter = rt.enter();
583         AsyncFd::new(a).unwrap()
584     };
585 
586     std::mem::drop(rt);
587 
588     assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
589     assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
590 }
591 
592 #[test]
driver_shutdown_wakes_poll_race()593 fn driver_shutdown_wakes_poll_race() {
594     // TODO: make this a loom test
595     for _ in 0..100 {
596         let rt = rt();
597 
598         let (a, _b) = socketpair();
599         let afd_a = {
600             let _enter = rt.enter();
601             AsyncFd::new(a).unwrap()
602         };
603 
604         while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
605 
606         let _ = std::thread::spawn(move || std::mem::drop(rt));
607 
608         // The poll variants will always return an error in this case
609         assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
610         assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
611     }
612 }
613