1 #![warn(rust_2018_idioms)]
2 #![cfg(all(target_os = "freebsd", feature = "net"))]
3 
4 use mio_aio::{AioCb, AioFsyncMode, LioCb};
5 use std::{
6     future::Future,
7     mem,
8     os::unix::io::{AsRawFd, RawFd},
9     pin::Pin,
10     task::{Context, Poll},
11 };
12 use tempfile::tempfile;
13 use tokio::io::bsd::{Aio, AioSource};
14 use tokio_test::assert_pending;
15 
16 mod aio {
17     use super::*;
18 
19     /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
20     struct WrappedAioCb<'a>(AioCb<'a>);
21     impl<'a> AioSource for WrappedAioCb<'a> {
register(&mut self, kq: RawFd, token: usize)22         fn register(&mut self, kq: RawFd, token: usize) {
23             self.0.register_raw(kq, token)
24         }
deregister(&mut self)25         fn deregister(&mut self) {
26             self.0.deregister_raw()
27         }
28     }
29 
30     /// A very crude implementation of an AIO-based future
31     struct FsyncFut(Aio<WrappedAioCb<'static>>);
32 
33     impl Future for FsyncFut {
34         type Output = std::io::Result<()>;
35 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>36         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37             let poll_result = self.0.poll_ready(cx);
38             match poll_result {
39                 Poll::Pending => Poll::Pending,
40                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
41                 Poll::Ready(Ok(_ev)) => {
42                     // At this point, we could clear readiness.  But there's no
43                     // point, since we're about to drop the Aio.
44                     let result = (*self.0).0.aio_return();
45                     match result {
46                         Ok(_) => Poll::Ready(Ok(())),
47                         Err(e) => Poll::Ready(Err(e.into())),
48                     }
49                 }
50             }
51         }
52     }
53 
54     /// Low-level AIO Source
55     ///
56     /// An example bypassing mio_aio and Nix to demonstrate how the kevent
57     /// registration actually works, under the hood.
58     struct LlSource(Pin<Box<libc::aiocb>>);
59 
60     impl AioSource for LlSource {
register(&mut self, kq: RawFd, token: usize)61         fn register(&mut self, kq: RawFd, token: usize) {
62             let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
63             sev.sigev_notify = libc::SIGEV_KEVENT;
64             sev.sigev_signo = kq;
65             sev.sigev_value = libc::sigval {
66                 sival_ptr: token as *mut libc::c_void,
67             };
68             self.0.aio_sigevent = sev;
69         }
70 
deregister(&mut self)71         fn deregister(&mut self) {
72             unsafe {
73                 self.0.aio_sigevent = mem::zeroed();
74             }
75         }
76     }
77 
78     struct LlFut(Aio<LlSource>);
79 
80     impl Future for LlFut {
81         type Output = std::io::Result<()>;
82 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>83         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84             let poll_result = self.0.poll_ready(cx);
85             match poll_result {
86                 Poll::Pending => Poll::Pending,
87                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
88                 Poll::Ready(Ok(_ev)) => {
89                     let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
90                     assert_eq!(0, r);
91                     Poll::Ready(Ok(()))
92                 }
93             }
94         }
95     }
96 
97     /// A very simple object that can implement AioSource and can be reused.
98     ///
99     /// mio_aio normally assumes that each AioCb will be consumed on completion.
100     /// This somewhat contrived example shows how an Aio object can be reused
101     /// anyway.
102     struct ReusableFsyncSource {
103         aiocb: Pin<Box<AioCb<'static>>>,
104         fd: RawFd,
105         token: usize,
106     }
107     impl ReusableFsyncSource {
fsync(&mut self)108         fn fsync(&mut self) {
109             self.aiocb.register_raw(self.fd, self.token);
110             self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
111         }
new(aiocb: AioCb<'static>) -> Self112         fn new(aiocb: AioCb<'static>) -> Self {
113             ReusableFsyncSource {
114                 aiocb: Box::pin(aiocb),
115                 fd: 0,
116                 token: 0,
117             }
118         }
reset(&mut self, aiocb: AioCb<'static>)119         fn reset(&mut self, aiocb: AioCb<'static>) {
120             self.aiocb = Box::pin(aiocb);
121         }
122     }
123     impl AioSource for ReusableFsyncSource {
register(&mut self, kq: RawFd, token: usize)124         fn register(&mut self, kq: RawFd, token: usize) {
125             self.fd = kq;
126             self.token = token;
127         }
deregister(&mut self)128         fn deregister(&mut self) {
129             self.fd = 0;
130         }
131     }
132 
133     struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
134     impl<'a> Future for ReusableFsyncFut<'a> {
135         type Output = std::io::Result<()>;
136 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>137         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138             let poll_result = self.0.poll_ready(cx);
139             match poll_result {
140                 Poll::Pending => Poll::Pending,
141                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
142                 Poll::Ready(Ok(ev)) => {
143                     // Since this future uses a reusable Aio, we must clear
144                     // its readiness here.  That makes the future
145                     // non-idempotent; the caller can't poll it repeatedly after
146                     // it has already returned Ready.  But that's ok; most
147                     // futures behave this way.
148                     self.0.clear_ready(ev);
149                     let result = (*self.0).aiocb.aio_return();
150                     match result {
151                         Ok(_) => Poll::Ready(Ok(())),
152                         Err(e) => Poll::Ready(Err(e.into())),
153                     }
154                 }
155             }
156         }
157     }
158 
159     #[tokio::test]
fsync()160     async fn fsync() {
161         let f = tempfile().unwrap();
162         let fd = f.as_raw_fd();
163         let aiocb = AioCb::from_fd(fd, 0);
164         let source = WrappedAioCb(aiocb);
165         let mut poll_aio = Aio::new_for_aio(source).unwrap();
166         (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
167         let fut = FsyncFut(poll_aio);
168         fut.await.unwrap();
169     }
170 
171     #[tokio::test]
ll_fsync()172     async fn ll_fsync() {
173         let f = tempfile().unwrap();
174         let fd = f.as_raw_fd();
175         let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
176         aiocb.aio_fildes = fd;
177         let source = LlSource(Box::pin(aiocb));
178         let mut poll_aio = Aio::new_for_aio(source).unwrap();
179         let r = unsafe {
180             let p = (*poll_aio).0.as_mut().get_unchecked_mut();
181             libc::aio_fsync(libc::O_SYNC, p)
182         };
183         assert_eq!(0, r);
184         let fut = LlFut(poll_aio);
185         fut.await.unwrap();
186     }
187 
188     /// A suitably crafted future type can reuse an Aio object
189     #[tokio::test]
reuse()190     async fn reuse() {
191         let f = tempfile().unwrap();
192         let fd = f.as_raw_fd();
193         let aiocb0 = AioCb::from_fd(fd, 0);
194         let source = ReusableFsyncSource::new(aiocb0);
195         let mut poll_aio = Aio::new_for_aio(source).unwrap();
196         poll_aio.fsync();
197         let fut0 = ReusableFsyncFut(&mut poll_aio);
198         fut0.await.unwrap();
199 
200         let aiocb1 = AioCb::from_fd(fd, 0);
201         poll_aio.reset(aiocb1);
202         let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
203         assert_pending!(poll_aio.poll_ready(&mut ctx));
204         poll_aio.fsync();
205         let fut1 = ReusableFsyncFut(&mut poll_aio);
206         fut1.await.unwrap();
207     }
208 }
209 
210 mod lio {
211     use super::*;
212 
213     struct WrappedLioCb<'a>(LioCb<'a>);
214     impl<'a> AioSource for WrappedLioCb<'a> {
register(&mut self, kq: RawFd, token: usize)215         fn register(&mut self, kq: RawFd, token: usize) {
216             self.0.register_raw(kq, token)
217         }
deregister(&mut self)218         fn deregister(&mut self) {
219             self.0.deregister_raw()
220         }
221     }
222 
223     /// A very crude lio_listio-based Future
224     struct LioFut(Option<Aio<WrappedLioCb<'static>>>);
225 
226     impl Future for LioFut {
227         type Output = std::io::Result<Vec<isize>>;
228 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>229         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230             let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
231             match poll_result {
232                 Poll::Pending => Poll::Pending,
233                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
234                 Poll::Ready(Ok(_ev)) => {
235                     // At this point, we could clear readiness.  But there's no
236                     // point, since we're about to drop the Aio.
237                     let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
238                         iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
239                     });
240                     Poll::Ready(Ok(r))
241                 }
242             }
243         }
244     }
245 
246     /// Minimal example demonstrating reuse of an Aio object with lio
247     /// readiness.  mio_aio::LioCb actually does something similar under the
248     /// hood.
249     struct ReusableLioSource {
250         liocb: Option<LioCb<'static>>,
251         fd: RawFd,
252         token: usize,
253     }
254     impl ReusableLioSource {
new(liocb: LioCb<'static>) -> Self255         fn new(liocb: LioCb<'static>) -> Self {
256             ReusableLioSource {
257                 liocb: Some(liocb),
258                 fd: 0,
259                 token: 0,
260             }
261         }
reset(&mut self, liocb: LioCb<'static>)262         fn reset(&mut self, liocb: LioCb<'static>) {
263             self.liocb = Some(liocb);
264         }
submit(&mut self)265         fn submit(&mut self) {
266             self.liocb
267                 .as_mut()
268                 .unwrap()
269                 .register_raw(self.fd, self.token);
270             self.liocb.as_mut().unwrap().submit().unwrap();
271         }
272     }
273     impl AioSource for ReusableLioSource {
register(&mut self, kq: RawFd, token: usize)274         fn register(&mut self, kq: RawFd, token: usize) {
275             self.fd = kq;
276             self.token = token;
277         }
deregister(&mut self)278         fn deregister(&mut self) {
279             self.fd = 0;
280         }
281     }
282     struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
283     impl<'a> Future for ReusableLioFut<'a> {
284         type Output = std::io::Result<Vec<isize>>;
285 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>286         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287             let poll_result = self.0.poll_ready(cx);
288             match poll_result {
289                 Poll::Pending => Poll::Pending,
290                 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
291                 Poll::Ready(Ok(ev)) => {
292                     // Since this future uses a reusable Aio, we must clear
293                     // its readiness here.  That makes the future
294                     // non-idempotent; the caller can't poll it repeatedly after
295                     // it has already returned Ready.  But that's ok; most
296                     // futures behave this way.
297                     self.0.clear_ready(ev);
298                     let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
299                         iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
300                     });
301                     Poll::Ready(Ok(r))
302                 }
303             }
304         }
305     }
306 
307     /// An lio_listio operation with one write element
308     #[tokio::test]
onewrite()309     async fn onewrite() {
310         const WBUF: &[u8] = b"abcdef";
311         let f = tempfile().unwrap();
312 
313         let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
314         builder = builder.emplace_slice(
315             f.as_raw_fd(),
316             0,
317             &WBUF[..],
318             0,
319             mio_aio::LioOpcode::LIO_WRITE,
320         );
321         let liocb = builder.finish();
322         let source = WrappedLioCb(liocb);
323         let mut poll_aio = Aio::new_for_lio(source).unwrap();
324 
325         // Send the operation to the kernel
326         (*poll_aio).0.submit().unwrap();
327         let fut = LioFut(Some(poll_aio));
328         let v = fut.await.unwrap();
329         assert_eq!(v.len(), 1);
330         assert_eq!(v[0] as usize, WBUF.len());
331     }
332 
333     /// A suitably crafted future type can reuse an Aio object
334     #[tokio::test]
reuse()335     async fn reuse() {
336         const WBUF: &[u8] = b"abcdef";
337         let f = tempfile().unwrap();
338 
339         let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
340         builder0 = builder0.emplace_slice(
341             f.as_raw_fd(),
342             0,
343             &WBUF[..],
344             0,
345             mio_aio::LioOpcode::LIO_WRITE,
346         );
347         let liocb0 = builder0.finish();
348         let source = ReusableLioSource::new(liocb0);
349         let mut poll_aio = Aio::new_for_aio(source).unwrap();
350         poll_aio.submit();
351         let fut0 = ReusableLioFut(&mut poll_aio);
352         let v = fut0.await.unwrap();
353         assert_eq!(v.len(), 1);
354         assert_eq!(v[0] as usize, WBUF.len());
355 
356         // Now reuse the same Aio
357         let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
358         builder1 = builder1.emplace_slice(
359             f.as_raw_fd(),
360             0,
361             &WBUF[..],
362             0,
363             mio_aio::LioOpcode::LIO_WRITE,
364         );
365         let liocb1 = builder1.finish();
366         poll_aio.reset(liocb1);
367         let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
368         assert_pending!(poll_aio.poll_ready(&mut ctx));
369         poll_aio.submit();
370         let fut1 = ReusableLioFut(&mut poll_aio);
371         let v = fut1.await.unwrap();
372         assert_eq!(v.len(), 1);
373         assert_eq!(v[0] as usize, WBUF.len());
374     }
375 }
376