1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 };
9 use std::time::Duration;
10 use std::{
11 future::Future,
12 io::{self, ErrorKind, Read, Write},
13 task::{Context, Waker},
14 };
15
16 use nix::errno::Errno;
17 use nix::unistd::{close, read, write};
18
19 use futures::{poll, FutureExt};
20
21 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
22 use tokio_test::{assert_err, assert_pending};
23
24 struct TestWaker {
25 inner: Arc<TestWakerInner>,
26 waker: Waker,
27 }
28
29 #[derive(Default)]
30 struct TestWakerInner {
31 awoken: AtomicBool,
32 }
33
34 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)35 fn wake_by_ref(arc_self: &Arc<Self>) {
36 arc_self.awoken.store(true, Ordering::SeqCst);
37 }
38 }
39
40 impl TestWaker {
new() -> Self41 fn new() -> Self {
42 let inner: Arc<TestWakerInner> = Default::default();
43
44 Self {
45 inner: inner.clone(),
46 waker: futures::task::waker(inner),
47 }
48 }
49
awoken(&self) -> bool50 fn awoken(&self) -> bool {
51 self.inner.awoken.swap(false, Ordering::SeqCst)
52 }
53
context(&self) -> Context<'_>54 fn context(&self) -> Context<'_> {
55 Context::from_waker(&self.waker)
56 }
57 }
58
is_blocking(e: &nix::Error) -> bool59 fn is_blocking(e: &nix::Error) -> bool {
60 Some(Errno::EAGAIN) == e.as_errno()
61 }
62
63 #[derive(Debug)]
64 struct FileDescriptor {
65 fd: RawFd,
66 }
67
68 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd69 fn as_raw_fd(&self) -> RawFd {
70 self.fd
71 }
72 }
73
74 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>75 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
76 match read(self.fd, buf) {
77 Ok(n) => Ok(n),
78 Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
79 Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
80 }
81 }
82 }
83
84 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>85 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
86 (self as &Self).read(buf)
87 }
88 }
89
90 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>91 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
92 match write(self.fd, buf) {
93 Ok(n) => Ok(n),
94 Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
95 Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
96 }
97 }
98
flush(&mut self) -> io::Result<()>99 fn flush(&mut self) -> io::Result<()> {
100 Ok(())
101 }
102 }
103
104 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>105 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
106 (self as &Self).write(buf)
107 }
108
flush(&mut self) -> io::Result<()>109 fn flush(&mut self) -> io::Result<()> {
110 (self as &Self).flush()
111 }
112 }
113
114 impl Drop for FileDescriptor {
drop(&mut self)115 fn drop(&mut self) {
116 let _ = close(self.fd);
117 }
118 }
119
set_nonblocking(fd: RawFd)120 fn set_nonblocking(fd: RawFd) {
121 use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
122
123 let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
124
125 if flags < 0 {
126 panic!(
127 "bad return value from fcntl(F_GETFL): {} ({:?})",
128 flags,
129 nix::Error::last()
130 );
131 }
132
133 let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
134
135 nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
136 }
137
socketpair() -> (FileDescriptor, FileDescriptor)138 fn socketpair() -> (FileDescriptor, FileDescriptor) {
139 use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
140
141 let (fd_a, fd_b) = socket::socketpair(
142 AddressFamily::Unix,
143 SockType::Stream,
144 None,
145 SockFlag::empty(),
146 )
147 .expect("socketpair");
148 let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
149
150 set_nonblocking(fds.0.fd);
151 set_nonblocking(fds.1.fd);
152
153 fds
154 }
155
drain(mut fd: &FileDescriptor)156 fn drain(mut fd: &FileDescriptor) {
157 let mut buf = [0u8; 512];
158
159 loop {
160 match fd.read(&mut buf[..]) {
161 Err(e) if e.kind() == ErrorKind::WouldBlock => break,
162 Ok(0) => panic!("unexpected EOF"),
163 Err(e) => panic!("unexpected error: {:?}", e),
164 Ok(_) => continue,
165 }
166 }
167 }
168
169 #[tokio::test]
initially_writable()170 async fn initially_writable() {
171 let (a, b) = socketpair();
172
173 let afd_a = AsyncFd::new(a).unwrap();
174 let afd_b = AsyncFd::new(b).unwrap();
175
176 afd_a.writable().await.unwrap().clear_ready();
177 afd_b.writable().await.unwrap().clear_ready();
178
179 futures::select_biased! {
180 _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
181 _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
182 _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
183 }
184 }
185
186 #[tokio::test]
reset_readable()187 async fn reset_readable() {
188 let (a, mut b) = socketpair();
189
190 let afd_a = AsyncFd::new(a).unwrap();
191
192 let readable = afd_a.readable();
193 tokio::pin!(readable);
194
195 tokio::select! {
196 _ = readable.as_mut() => panic!(),
197 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
198 }
199
200 b.write_all(b"0").unwrap();
201
202 let mut guard = readable.await.unwrap();
203
204 guard
205 .try_io(|_| afd_a.get_ref().read(&mut [0]))
206 .unwrap()
207 .unwrap();
208
209 // `a` is not readable, but the reactor still thinks it is
210 // (because we have not observed a not-ready error yet)
211 afd_a.readable().await.unwrap().retain_ready();
212
213 // Explicitly clear the ready state
214 guard.clear_ready();
215
216 let readable = afd_a.readable();
217 tokio::pin!(readable);
218
219 tokio::select! {
220 _ = readable.as_mut() => panic!(),
221 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
222 }
223
224 b.write_all(b"0").unwrap();
225
226 // We can observe the new readable event
227 afd_a.readable().await.unwrap().clear_ready();
228 }
229
230 #[tokio::test]
reset_writable()231 async fn reset_writable() {
232 let (a, b) = socketpair();
233
234 let afd_a = AsyncFd::new(a).unwrap();
235
236 let mut guard = afd_a.writable().await.unwrap();
237
238 // Write until we get a WouldBlock. This also clears the ready state.
239 while guard
240 .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
241 .is_ok()
242 {}
243
244 // Writable state should be cleared now.
245 let writable = afd_a.writable();
246 tokio::pin!(writable);
247
248 tokio::select! {
249 _ = writable.as_mut() => panic!(),
250 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
251 }
252
253 // Read from the other side; we should become writable now.
254 drain(&b);
255
256 let _ = writable.await.unwrap();
257 }
258
259 #[derive(Debug)]
260 struct ArcFd<T>(Arc<T>);
261 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd262 fn as_raw_fd(&self) -> RawFd {
263 self.0.as_raw_fd()
264 }
265 }
266
267 #[tokio::test]
drop_closes()268 async fn drop_closes() {
269 let (a, mut b) = socketpair();
270
271 let afd_a = AsyncFd::new(a).unwrap();
272
273 assert_eq!(
274 ErrorKind::WouldBlock,
275 b.read(&mut [0]).err().unwrap().kind()
276 );
277
278 std::mem::drop(afd_a);
279
280 assert_eq!(0, b.read(&mut [0]).unwrap());
281
282 // into_inner does not close the fd
283
284 let (a, mut b) = socketpair();
285 let afd_a = AsyncFd::new(a).unwrap();
286 let _a: FileDescriptor = afd_a.into_inner();
287
288 assert_eq!(
289 ErrorKind::WouldBlock,
290 b.read(&mut [0]).err().unwrap().kind()
291 );
292
293 // Drop closure behavior is delegated to the inner object
294 let (a, mut b) = socketpair();
295 let arc_fd = Arc::new(a);
296 let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
297 std::mem::drop(afd_a);
298
299 assert_eq!(
300 ErrorKind::WouldBlock,
301 b.read(&mut [0]).err().unwrap().kind()
302 );
303
304 std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
305 }
306
307 #[tokio::test]
reregister()308 async fn reregister() {
309 let (a, _b) = socketpair();
310
311 let afd_a = AsyncFd::new(a).unwrap();
312 let a = afd_a.into_inner();
313 AsyncFd::new(a).unwrap();
314 }
315
316 #[tokio::test]
try_io()317 async fn try_io() {
318 let (a, mut b) = socketpair();
319
320 b.write_all(b"0").unwrap();
321
322 let afd_a = AsyncFd::new(a).unwrap();
323
324 let mut guard = afd_a.readable().await.unwrap();
325
326 afd_a.get_ref().read_exact(&mut [0]).unwrap();
327
328 // Should not clear the readable state
329 let _ = guard.try_io(|_| Ok(()));
330
331 // Still readable...
332 let _ = afd_a.readable().await.unwrap();
333
334 // Should clear the readable state
335 let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
336
337 // Assert not readable
338 let readable = afd_a.readable();
339 tokio::pin!(readable);
340
341 tokio::select! {
342 _ = readable.as_mut() => panic!(),
343 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
344 }
345
346 // Write something down b again and make sure we're reawoken
347 b.write_all(b"0").unwrap();
348 let _ = readable.await.unwrap();
349 }
350
351 #[tokio::test]
multiple_waiters()352 async fn multiple_waiters() {
353 let (a, mut b) = socketpair();
354 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
355
356 let barrier = Arc::new(tokio::sync::Barrier::new(11));
357
358 let mut tasks = Vec::new();
359 for _ in 0..10 {
360 let afd_a = afd_a.clone();
361 let barrier = barrier.clone();
362
363 let f = async move {
364 let notify_barrier = async {
365 barrier.wait().await;
366 futures::future::pending::<()>().await;
367 };
368
369 futures::select_biased! {
370 guard = afd_a.readable().fuse() => {
371 tokio::task::yield_now().await;
372 guard.unwrap().clear_ready()
373 },
374 _ = notify_barrier.fuse() => unreachable!(),
375 }
376
377 std::mem::drop(afd_a);
378 };
379
380 tasks.push(tokio::spawn(f));
381 }
382
383 let mut all_tasks = futures::future::try_join_all(tasks);
384
385 tokio::select! {
386 r = std::pin::Pin::new(&mut all_tasks) => {
387 r.unwrap(); // propagate panic
388 panic!("Tasks exited unexpectedly")
389 },
390 _ = barrier.wait() => {}
391 };
392
393 b.write_all(b"0").unwrap();
394
395 all_tasks.await.unwrap();
396 }
397
398 #[tokio::test]
poll_fns()399 async fn poll_fns() {
400 let (a, b) = socketpair();
401 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
402 let afd_b = Arc::new(AsyncFd::new(b).unwrap());
403
404 // Fill up the write side of A
405 while afd_a.get_ref().write(&[0; 512]).is_ok() {}
406
407 let waker = TestWaker::new();
408
409 assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
410
411 let afd_a_2 = afd_a.clone();
412 let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
413 let barrier_clone = r_barrier.clone();
414
415 let read_fut = tokio::spawn(async move {
416 // Move waker onto this task first
417 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
418 .as_ref()
419 .poll_read_ready(cx))));
420 barrier_clone.wait().await;
421
422 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
423 });
424
425 let afd_a_2 = afd_a.clone();
426 let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
427 let barrier_clone = w_barrier.clone();
428
429 let mut write_fut = tokio::spawn(async move {
430 // Move waker onto this task first
431 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
432 .as_ref()
433 .poll_write_ready(cx))));
434 barrier_clone.wait().await;
435
436 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
437 });
438
439 r_barrier.wait().await;
440 w_barrier.wait().await;
441
442 let readable = afd_a.readable();
443 tokio::pin!(readable);
444
445 tokio::select! {
446 _ = &mut readable => unreachable!(),
447 _ = tokio::task::yield_now() => {}
448 }
449
450 // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
451 afd_b.get_ref().write_all(b"0").unwrap();
452
453 let _ = tokio::join!(readable, read_fut);
454
455 // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
456 assert!(!waker.awoken());
457
458 // The writable side should not be awoken
459 tokio::select! {
460 _ = &mut write_fut => unreachable!(),
461 _ = tokio::time::sleep(Duration::from_millis(5)) => {}
462 }
463
464 // Make it writable now
465 drain(afd_b.get_ref());
466
467 // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
468 let _ = write_fut.await;
469 }
470
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>471 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
472 let mut pinned = Box::pin(f);
473
474 assert_pending!(pinned
475 .as_mut()
476 .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
477
478 pinned
479 }
480
rt() -> tokio::runtime::Runtime481 fn rt() -> tokio::runtime::Runtime {
482 tokio::runtime::Builder::new_current_thread()
483 .enable_all()
484 .build()
485 .unwrap()
486 }
487
488 #[test]
driver_shutdown_wakes_currently_pending()489 fn driver_shutdown_wakes_currently_pending() {
490 let rt = rt();
491
492 let (a, _b) = socketpair();
493 let afd_a = {
494 let _enter = rt.enter();
495 AsyncFd::new(a).unwrap()
496 };
497
498 let readable = assert_pending(afd_a.readable());
499
500 std::mem::drop(rt);
501
502 // The future was initialized **before** dropping the rt
503 assert_err!(futures::executor::block_on(readable));
504
505 // The future is initialized **after** dropping the rt.
506 assert_err!(futures::executor::block_on(afd_a.readable()));
507 }
508
509 #[test]
driver_shutdown_wakes_future_pending()510 fn driver_shutdown_wakes_future_pending() {
511 let rt = rt();
512
513 let (a, _b) = socketpair();
514 let afd_a = {
515 let _enter = rt.enter();
516 AsyncFd::new(a).unwrap()
517 };
518
519 std::mem::drop(rt);
520
521 assert_err!(futures::executor::block_on(afd_a.readable()));
522 }
523
524 #[test]
driver_shutdown_wakes_pending_race()525 fn driver_shutdown_wakes_pending_race() {
526 // TODO: make this a loom test
527 for _ in 0..100 {
528 let rt = rt();
529
530 let (a, _b) = socketpair();
531 let afd_a = {
532 let _enter = rt.enter();
533 AsyncFd::new(a).unwrap()
534 };
535
536 let _ = std::thread::spawn(move || std::mem::drop(rt));
537
538 // This may or may not return an error (but will be awoken)
539 let _ = futures::executor::block_on(afd_a.readable());
540
541 // However retrying will always return an error
542 assert_err!(futures::executor::block_on(afd_a.readable()));
543 }
544 }
545
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>546 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
547 futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
548 }
549
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>550 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
551 futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
552 }
553
554 #[test]
driver_shutdown_wakes_currently_pending_polls()555 fn driver_shutdown_wakes_currently_pending_polls() {
556 let rt = rt();
557
558 let (a, _b) = socketpair();
559 let afd_a = {
560 let _enter = rt.enter();
561 AsyncFd::new(a).unwrap()
562 };
563
564 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
565
566 let readable = assert_pending(poll_readable(&afd_a));
567 let writable = assert_pending(poll_writable(&afd_a));
568
569 std::mem::drop(rt);
570
571 // Attempting to poll readiness when the rt is dropped is an error
572 assert_err!(futures::executor::block_on(readable));
573 assert_err!(futures::executor::block_on(writable));
574 }
575
576 #[test]
driver_shutdown_wakes_poll()577 fn driver_shutdown_wakes_poll() {
578 let rt = rt();
579
580 let (a, _b) = socketpair();
581 let afd_a = {
582 let _enter = rt.enter();
583 AsyncFd::new(a).unwrap()
584 };
585
586 std::mem::drop(rt);
587
588 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
589 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
590 }
591
592 #[test]
driver_shutdown_wakes_poll_race()593 fn driver_shutdown_wakes_poll_race() {
594 // TODO: make this a loom test
595 for _ in 0..100 {
596 let rt = rt();
597
598 let (a, _b) = socketpair();
599 let afd_a = {
600 let _enter = rt.enter();
601 AsyncFd::new(a).unwrap()
602 };
603
604 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
605
606 let _ = std::thread::spawn(move || std::mem::drop(rt));
607
608 // The poll variants will always return an error in this case
609 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
610 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
611 }
612 }
613