1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3 
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6     atomic::{AtomicBool, Ordering},
7     Arc,
8 };
9 use std::time::Duration;
10 use std::{
11     future::Future,
12     io::{self, ErrorKind, Read, Write},
13     task::{Context, Waker},
14 };
15 
16 use nix::unistd::{close, read, write};
17 
18 use futures::{poll, FutureExt};
19 
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio_test::{assert_err, assert_pending};
22 
23 struct TestWaker {
24     inner: Arc<TestWakerInner>,
25     waker: Waker,
26 }
27 
28 #[derive(Default)]
29 struct TestWakerInner {
30     awoken: AtomicBool,
31 }
32 
33 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)34     fn wake_by_ref(arc_self: &Arc<Self>) {
35         arc_self.awoken.store(true, Ordering::SeqCst);
36     }
37 }
38 
39 impl TestWaker {
new() -> Self40     fn new() -> Self {
41         let inner: Arc<TestWakerInner> = Default::default();
42 
43         Self {
44             inner: inner.clone(),
45             waker: futures::task::waker(inner),
46         }
47     }
48 
awoken(&self) -> bool49     fn awoken(&self) -> bool {
50         self.inner.awoken.swap(false, Ordering::SeqCst)
51     }
52 
context(&self) -> Context<'_>53     fn context(&self) -> Context<'_> {
54         Context::from_waker(&self.waker)
55     }
56 }
57 
58 #[derive(Debug)]
59 struct FileDescriptor {
60     fd: RawFd,
61 }
62 
63 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd64     fn as_raw_fd(&self) -> RawFd {
65         self.fd
66     }
67 }
68 
69 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>70     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71         read(self.fd, buf).map_err(io::Error::from)
72     }
73 }
74 
75 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>76     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77         (self as &Self).read(buf)
78     }
79 }
80 
81 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>82     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83         write(self.fd, buf).map_err(io::Error::from)
84     }
85 
flush(&mut self) -> io::Result<()>86     fn flush(&mut self) -> io::Result<()> {
87         Ok(())
88     }
89 }
90 
91 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>92     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
93         (self as &Self).write(buf)
94     }
95 
flush(&mut self) -> io::Result<()>96     fn flush(&mut self) -> io::Result<()> {
97         (self as &Self).flush()
98     }
99 }
100 
101 impl Drop for FileDescriptor {
drop(&mut self)102     fn drop(&mut self) {
103         let _ = close(self.fd);
104     }
105 }
106 
set_nonblocking(fd: RawFd)107 fn set_nonblocking(fd: RawFd) {
108     use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
109 
110     let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
111 
112     if flags < 0 {
113         panic!(
114             "bad return value from fcntl(F_GETFL): {} ({:?})",
115             flags,
116             nix::Error::last()
117         );
118     }
119 
120     let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
121 
122     nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
123 }
124 
socketpair() -> (FileDescriptor, FileDescriptor)125 fn socketpair() -> (FileDescriptor, FileDescriptor) {
126     use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
127 
128     let (fd_a, fd_b) = socket::socketpair(
129         AddressFamily::Unix,
130         SockType::Stream,
131         None,
132         SockFlag::empty(),
133     )
134     .expect("socketpair");
135     let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
136 
137     set_nonblocking(fds.0.fd);
138     set_nonblocking(fds.1.fd);
139 
140     fds
141 }
142 
drain(mut fd: &FileDescriptor)143 fn drain(mut fd: &FileDescriptor) {
144     let mut buf = [0u8; 512];
145 
146     loop {
147         match fd.read(&mut buf[..]) {
148             Err(e) if e.kind() == ErrorKind::WouldBlock => break,
149             Ok(0) => panic!("unexpected EOF"),
150             Err(e) => panic!("unexpected error: {:?}", e),
151             Ok(_) => continue,
152         }
153     }
154 }
155 
156 #[tokio::test]
initially_writable()157 async fn initially_writable() {
158     let (a, b) = socketpair();
159 
160     let afd_a = AsyncFd::new(a).unwrap();
161     let afd_b = AsyncFd::new(b).unwrap();
162 
163     afd_a.writable().await.unwrap().clear_ready();
164     afd_b.writable().await.unwrap().clear_ready();
165 
166     futures::select_biased! {
167         _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
168         _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
169         _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
170     }
171 }
172 
173 #[tokio::test]
reset_readable()174 async fn reset_readable() {
175     let (a, mut b) = socketpair();
176 
177     let afd_a = AsyncFd::new(a).unwrap();
178 
179     let readable = afd_a.readable();
180     tokio::pin!(readable);
181 
182     tokio::select! {
183         _ = readable.as_mut() => panic!(),
184         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
185     }
186 
187     b.write_all(b"0").unwrap();
188 
189     let mut guard = readable.await.unwrap();
190 
191     guard
192         .try_io(|_| afd_a.get_ref().read(&mut [0]))
193         .unwrap()
194         .unwrap();
195 
196     // `a` is not readable, but the reactor still thinks it is
197     // (because we have not observed a not-ready error yet)
198     afd_a.readable().await.unwrap().retain_ready();
199 
200     // Explicitly clear the ready state
201     guard.clear_ready();
202 
203     let readable = afd_a.readable();
204     tokio::pin!(readable);
205 
206     tokio::select! {
207         _ = readable.as_mut() => panic!(),
208         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
209     }
210 
211     b.write_all(b"0").unwrap();
212 
213     // We can observe the new readable event
214     afd_a.readable().await.unwrap().clear_ready();
215 }
216 
217 #[tokio::test]
reset_writable()218 async fn reset_writable() {
219     let (a, b) = socketpair();
220 
221     let afd_a = AsyncFd::new(a).unwrap();
222 
223     let mut guard = afd_a.writable().await.unwrap();
224 
225     // Write until we get a WouldBlock. This also clears the ready state.
226     while guard
227         .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
228         .is_ok()
229     {}
230 
231     // Writable state should be cleared now.
232     let writable = afd_a.writable();
233     tokio::pin!(writable);
234 
235     tokio::select! {
236         _ = writable.as_mut() => panic!(),
237         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
238     }
239 
240     // Read from the other side; we should become writable now.
241     drain(&b);
242 
243     let _ = writable.await.unwrap();
244 }
245 
246 #[derive(Debug)]
247 struct ArcFd<T>(Arc<T>);
248 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd249     fn as_raw_fd(&self) -> RawFd {
250         self.0.as_raw_fd()
251     }
252 }
253 
254 #[tokio::test]
drop_closes()255 async fn drop_closes() {
256     let (a, mut b) = socketpair();
257 
258     let afd_a = AsyncFd::new(a).unwrap();
259 
260     assert_eq!(
261         ErrorKind::WouldBlock,
262         b.read(&mut [0]).err().unwrap().kind()
263     );
264 
265     std::mem::drop(afd_a);
266 
267     assert_eq!(0, b.read(&mut [0]).unwrap());
268 
269     // into_inner does not close the fd
270 
271     let (a, mut b) = socketpair();
272     let afd_a = AsyncFd::new(a).unwrap();
273     let _a: FileDescriptor = afd_a.into_inner();
274 
275     assert_eq!(
276         ErrorKind::WouldBlock,
277         b.read(&mut [0]).err().unwrap().kind()
278     );
279 
280     // Drop closure behavior is delegated to the inner object
281     let (a, mut b) = socketpair();
282     let arc_fd = Arc::new(a);
283     let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
284     std::mem::drop(afd_a);
285 
286     assert_eq!(
287         ErrorKind::WouldBlock,
288         b.read(&mut [0]).err().unwrap().kind()
289     );
290 
291     std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
292 }
293 
294 #[tokio::test]
reregister()295 async fn reregister() {
296     let (a, _b) = socketpair();
297 
298     let afd_a = AsyncFd::new(a).unwrap();
299     let a = afd_a.into_inner();
300     AsyncFd::new(a).unwrap();
301 }
302 
303 #[tokio::test]
try_io()304 async fn try_io() {
305     let (a, mut b) = socketpair();
306 
307     b.write_all(b"0").unwrap();
308 
309     let afd_a = AsyncFd::new(a).unwrap();
310 
311     let mut guard = afd_a.readable().await.unwrap();
312 
313     afd_a.get_ref().read_exact(&mut [0]).unwrap();
314 
315     // Should not clear the readable state
316     let _ = guard.try_io(|_| Ok(()));
317 
318     // Still readable...
319     let _ = afd_a.readable().await.unwrap();
320 
321     // Should clear the readable state
322     let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
323 
324     // Assert not readable
325     let readable = afd_a.readable();
326     tokio::pin!(readable);
327 
328     tokio::select! {
329         _ = readable.as_mut() => panic!(),
330         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
331     }
332 
333     // Write something down b again and make sure we're reawoken
334     b.write_all(b"0").unwrap();
335     let _ = readable.await.unwrap();
336 }
337 
338 #[tokio::test]
multiple_waiters()339 async fn multiple_waiters() {
340     let (a, mut b) = socketpair();
341     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
342 
343     let barrier = Arc::new(tokio::sync::Barrier::new(11));
344 
345     let mut tasks = Vec::new();
346     for _ in 0..10 {
347         let afd_a = afd_a.clone();
348         let barrier = barrier.clone();
349 
350         let f = async move {
351             let notify_barrier = async {
352                 barrier.wait().await;
353                 futures::future::pending::<()>().await;
354             };
355 
356             futures::select_biased! {
357                 guard = afd_a.readable().fuse() => {
358                     tokio::task::yield_now().await;
359                     guard.unwrap().clear_ready()
360                 },
361                 _ = notify_barrier.fuse() => unreachable!(),
362             }
363 
364             std::mem::drop(afd_a);
365         };
366 
367         tasks.push(tokio::spawn(f));
368     }
369 
370     let mut all_tasks = futures::future::try_join_all(tasks);
371 
372     tokio::select! {
373         r = std::pin::Pin::new(&mut all_tasks) => {
374             r.unwrap(); // propagate panic
375             panic!("Tasks exited unexpectedly")
376         },
377         _ = barrier.wait() => {}
378     };
379 
380     b.write_all(b"0").unwrap();
381 
382     all_tasks.await.unwrap();
383 }
384 
385 #[tokio::test]
poll_fns()386 async fn poll_fns() {
387     let (a, b) = socketpair();
388     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
389     let afd_b = Arc::new(AsyncFd::new(b).unwrap());
390 
391     // Fill up the write side of A
392     while afd_a.get_ref().write(&[0; 512]).is_ok() {}
393 
394     let waker = TestWaker::new();
395 
396     assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
397 
398     let afd_a_2 = afd_a.clone();
399     let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
400     let barrier_clone = r_barrier.clone();
401 
402     let read_fut = tokio::spawn(async move {
403         // Move waker onto this task first
404         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
405             .as_ref()
406             .poll_read_ready(cx))));
407         barrier_clone.wait().await;
408 
409         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
410     });
411 
412     let afd_a_2 = afd_a.clone();
413     let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
414     let barrier_clone = w_barrier.clone();
415 
416     let mut write_fut = tokio::spawn(async move {
417         // Move waker onto this task first
418         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
419             .as_ref()
420             .poll_write_ready(cx))));
421         barrier_clone.wait().await;
422 
423         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
424     });
425 
426     r_barrier.wait().await;
427     w_barrier.wait().await;
428 
429     let readable = afd_a.readable();
430     tokio::pin!(readable);
431 
432     tokio::select! {
433         _ = &mut readable => unreachable!(),
434         _ = tokio::task::yield_now() => {}
435     }
436 
437     // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
438     afd_b.get_ref().write_all(b"0").unwrap();
439 
440     let _ = tokio::join!(readable, read_fut);
441 
442     // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
443     assert!(!waker.awoken());
444 
445     // The writable side should not be awoken
446     tokio::select! {
447         _ = &mut write_fut => unreachable!(),
448         _ = tokio::time::sleep(Duration::from_millis(5)) => {}
449     }
450 
451     // Make it writable now
452     drain(afd_b.get_ref());
453 
454     // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
455     let _ = write_fut.await;
456 }
457 
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>458 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
459     let mut pinned = Box::pin(f);
460 
461     assert_pending!(pinned
462         .as_mut()
463         .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
464 
465     pinned
466 }
467 
rt() -> tokio::runtime::Runtime468 fn rt() -> tokio::runtime::Runtime {
469     tokio::runtime::Builder::new_current_thread()
470         .enable_all()
471         .build()
472         .unwrap()
473 }
474 
475 #[test]
driver_shutdown_wakes_currently_pending()476 fn driver_shutdown_wakes_currently_pending() {
477     let rt = rt();
478 
479     let (a, _b) = socketpair();
480     let afd_a = {
481         let _enter = rt.enter();
482         AsyncFd::new(a).unwrap()
483     };
484 
485     let readable = assert_pending(afd_a.readable());
486 
487     std::mem::drop(rt);
488 
489     // The future was initialized **before** dropping the rt
490     assert_err!(futures::executor::block_on(readable));
491 
492     // The future is initialized **after** dropping the rt.
493     assert_err!(futures::executor::block_on(afd_a.readable()));
494 }
495 
496 #[test]
driver_shutdown_wakes_future_pending()497 fn driver_shutdown_wakes_future_pending() {
498     let rt = rt();
499 
500     let (a, _b) = socketpair();
501     let afd_a = {
502         let _enter = rt.enter();
503         AsyncFd::new(a).unwrap()
504     };
505 
506     std::mem::drop(rt);
507 
508     assert_err!(futures::executor::block_on(afd_a.readable()));
509 }
510 
511 #[test]
driver_shutdown_wakes_pending_race()512 fn driver_shutdown_wakes_pending_race() {
513     // TODO: make this a loom test
514     for _ in 0..100 {
515         let rt = rt();
516 
517         let (a, _b) = socketpair();
518         let afd_a = {
519             let _enter = rt.enter();
520             AsyncFd::new(a).unwrap()
521         };
522 
523         let _ = std::thread::spawn(move || std::mem::drop(rt));
524 
525         // This may or may not return an error (but will be awoken)
526         let _ = futures::executor::block_on(afd_a.readable());
527 
528         // However retrying will always return an error
529         assert_err!(futures::executor::block_on(afd_a.readable()));
530     }
531 }
532 
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>533 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
534     futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
535 }
536 
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>537 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
538     futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
539 }
540 
541 #[test]
driver_shutdown_wakes_currently_pending_polls()542 fn driver_shutdown_wakes_currently_pending_polls() {
543     let rt = rt();
544 
545     let (a, _b) = socketpair();
546     let afd_a = {
547         let _enter = rt.enter();
548         AsyncFd::new(a).unwrap()
549     };
550 
551     while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
552 
553     let readable = assert_pending(poll_readable(&afd_a));
554     let writable = assert_pending(poll_writable(&afd_a));
555 
556     std::mem::drop(rt);
557 
558     // Attempting to poll readiness when the rt is dropped is an error
559     assert_err!(futures::executor::block_on(readable));
560     assert_err!(futures::executor::block_on(writable));
561 }
562 
563 #[test]
driver_shutdown_wakes_poll()564 fn driver_shutdown_wakes_poll() {
565     let rt = rt();
566 
567     let (a, _b) = socketpair();
568     let afd_a = {
569         let _enter = rt.enter();
570         AsyncFd::new(a).unwrap()
571     };
572 
573     std::mem::drop(rt);
574 
575     assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
576     assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
577 }
578 
579 #[test]
driver_shutdown_wakes_poll_race()580 fn driver_shutdown_wakes_poll_race() {
581     // TODO: make this a loom test
582     for _ in 0..100 {
583         let rt = rt();
584 
585         let (a, _b) = socketpair();
586         let afd_a = {
587             let _enter = rt.enter();
588             AsyncFd::new(a).unwrap()
589         };
590 
591         while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
592 
593         let _ = std::thread::spawn(move || std::mem::drop(rt));
594 
595         // The poll variants will always return an error in this case
596         assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
597         assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
598     }
599 }
600