1 #![warn(rust_2018_idioms)] 2 #![cfg(all(target_os = "freebsd", feature = "net"))] 3 4 use mio_aio::{AioCb, AioFsyncMode, LioCb}; 5 use std::{ 6 future::Future, 7 mem, 8 os::unix::io::{AsRawFd, RawFd}, 9 pin::Pin, 10 task::{Context, Poll}, 11 }; 12 use tempfile::tempfile; 13 use tokio::io::bsd::{Aio, AioSource}; 14 use tokio_test::assert_pending; 15 16 mod aio { 17 use super::*; 18 19 /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource 20 struct WrappedAioCb<'a>(AioCb<'a>); 21 impl<'a> AioSource for WrappedAioCb<'a> { register(&mut self, kq: RawFd, token: usize)22 fn register(&mut self, kq: RawFd, token: usize) { 23 self.0.register_raw(kq, token) 24 } deregister(&mut self)25 fn deregister(&mut self) { 26 self.0.deregister_raw() 27 } 28 } 29 30 /// A very crude implementation of an AIO-based future 31 struct FsyncFut(Aio<WrappedAioCb<'static>>); 32 33 impl Future for FsyncFut { 34 type Output = std::io::Result<()>; 35 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>36 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 37 let poll_result = self.0.poll_ready(cx); 38 match poll_result { 39 Poll::Pending => Poll::Pending, 40 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 41 Poll::Ready(Ok(_ev)) => { 42 // At this point, we could clear readiness. But there's no 43 // point, since we're about to drop the Aio. 44 let result = (*self.0).0.aio_return(); 45 match result { 46 Ok(_) => Poll::Ready(Ok(())), 47 Err(e) => Poll::Ready(Err(e.into())), 48 } 49 } 50 } 51 } 52 } 53 54 /// Low-level AIO Source 55 /// 56 /// An example bypassing mio_aio and Nix to demonstrate how the kevent 57 /// registration actually works, under the hood. 58 struct LlSource(Pin<Box<libc::aiocb>>); 59 60 impl AioSource for LlSource { register(&mut self, kq: RawFd, token: usize)61 fn register(&mut self, kq: RawFd, token: usize) { 62 let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; 63 sev.sigev_notify = libc::SIGEV_KEVENT; 64 sev.sigev_signo = kq; 65 sev.sigev_value = libc::sigval { 66 sival_ptr: token as *mut libc::c_void, 67 }; 68 self.0.aio_sigevent = sev; 69 } 70 deregister(&mut self)71 fn deregister(&mut self) { 72 unsafe { 73 self.0.aio_sigevent = mem::zeroed(); 74 } 75 } 76 } 77 78 struct LlFut(Aio<LlSource>); 79 80 impl Future for LlFut { 81 type Output = std::io::Result<()>; 82 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>83 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 84 let poll_result = self.0.poll_ready(cx); 85 match poll_result { 86 Poll::Pending => Poll::Pending, 87 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 88 Poll::Ready(Ok(_ev)) => { 89 let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; 90 assert_eq!(0, r); 91 Poll::Ready(Ok(())) 92 } 93 } 94 } 95 } 96 97 /// A very simple object that can implement AioSource and can be reused. 98 /// 99 /// mio_aio normally assumes that each AioCb will be consumed on completion. 100 /// This somewhat contrived example shows how an Aio object can be reused 101 /// anyway. 102 struct ReusableFsyncSource { 103 aiocb: Pin<Box<AioCb<'static>>>, 104 fd: RawFd, 105 token: usize, 106 } 107 impl ReusableFsyncSource { fsync(&mut self)108 fn fsync(&mut self) { 109 self.aiocb.register_raw(self.fd, self.token); 110 self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); 111 } new(aiocb: AioCb<'static>) -> Self112 fn new(aiocb: AioCb<'static>) -> Self { 113 ReusableFsyncSource { 114 aiocb: Box::pin(aiocb), 115 fd: 0, 116 token: 0, 117 } 118 } reset(&mut self, aiocb: AioCb<'static>)119 fn reset(&mut self, aiocb: AioCb<'static>) { 120 self.aiocb = Box::pin(aiocb); 121 } 122 } 123 impl AioSource for ReusableFsyncSource { register(&mut self, kq: RawFd, token: usize)124 fn register(&mut self, kq: RawFd, token: usize) { 125 self.fd = kq; 126 self.token = token; 127 } deregister(&mut self)128 fn deregister(&mut self) { 129 self.fd = 0; 130 } 131 } 132 133 struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>); 134 impl<'a> Future for ReusableFsyncFut<'a> { 135 type Output = std::io::Result<()>; 136 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>137 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 138 let poll_result = self.0.poll_ready(cx); 139 match poll_result { 140 Poll::Pending => Poll::Pending, 141 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 142 Poll::Ready(Ok(ev)) => { 143 // Since this future uses a reusable Aio, we must clear 144 // its readiness here. That makes the future 145 // non-idempotent; the caller can't poll it repeatedly after 146 // it has already returned Ready. But that's ok; most 147 // futures behave this way. 148 self.0.clear_ready(ev); 149 let result = (*self.0).aiocb.aio_return(); 150 match result { 151 Ok(_) => Poll::Ready(Ok(())), 152 Err(e) => Poll::Ready(Err(e.into())), 153 } 154 } 155 } 156 } 157 } 158 159 #[tokio::test] fsync()160 async fn fsync() { 161 let f = tempfile().unwrap(); 162 let fd = f.as_raw_fd(); 163 let aiocb = AioCb::from_fd(fd, 0); 164 let source = WrappedAioCb(aiocb); 165 let mut poll_aio = Aio::new_for_aio(source).unwrap(); 166 (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); 167 let fut = FsyncFut(poll_aio); 168 fut.await.unwrap(); 169 } 170 171 #[tokio::test] ll_fsync()172 async fn ll_fsync() { 173 let f = tempfile().unwrap(); 174 let fd = f.as_raw_fd(); 175 let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; 176 aiocb.aio_fildes = fd; 177 let source = LlSource(Box::pin(aiocb)); 178 let mut poll_aio = Aio::new_for_aio(source).unwrap(); 179 let r = unsafe { 180 let p = (*poll_aio).0.as_mut().get_unchecked_mut(); 181 libc::aio_fsync(libc::O_SYNC, p) 182 }; 183 assert_eq!(0, r); 184 let fut = LlFut(poll_aio); 185 fut.await.unwrap(); 186 } 187 188 /// A suitably crafted future type can reuse an Aio object 189 #[tokio::test] reuse()190 async fn reuse() { 191 let f = tempfile().unwrap(); 192 let fd = f.as_raw_fd(); 193 let aiocb0 = AioCb::from_fd(fd, 0); 194 let source = ReusableFsyncSource::new(aiocb0); 195 let mut poll_aio = Aio::new_for_aio(source).unwrap(); 196 poll_aio.fsync(); 197 let fut0 = ReusableFsyncFut(&mut poll_aio); 198 fut0.await.unwrap(); 199 200 let aiocb1 = AioCb::from_fd(fd, 0); 201 poll_aio.reset(aiocb1); 202 let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); 203 assert_pending!(poll_aio.poll_ready(&mut ctx)); 204 poll_aio.fsync(); 205 let fut1 = ReusableFsyncFut(&mut poll_aio); 206 fut1.await.unwrap(); 207 } 208 } 209 210 mod lio { 211 use super::*; 212 213 struct WrappedLioCb<'a>(LioCb<'a>); 214 impl<'a> AioSource for WrappedLioCb<'a> { register(&mut self, kq: RawFd, token: usize)215 fn register(&mut self, kq: RawFd, token: usize) { 216 self.0.register_raw(kq, token) 217 } deregister(&mut self)218 fn deregister(&mut self) { 219 self.0.deregister_raw() 220 } 221 } 222 223 /// A very crude lio_listio-based Future 224 struct LioFut(Option<Aio<WrappedLioCb<'static>>>); 225 226 impl Future for LioFut { 227 type Output = std::io::Result<Vec<isize>>; 228 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>229 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 230 let poll_result = self.0.as_mut().unwrap().poll_ready(cx); 231 match poll_result { 232 Poll::Pending => Poll::Pending, 233 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 234 Poll::Ready(Ok(_ev)) => { 235 // At this point, we could clear readiness. But there's no 236 // point, since we're about to drop the Aio. 237 let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { 238 iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() 239 }); 240 Poll::Ready(Ok(r)) 241 } 242 } 243 } 244 } 245 246 /// Minimal example demonstrating reuse of an Aio object with lio 247 /// readiness. mio_aio::LioCb actually does something similar under the 248 /// hood. 249 struct ReusableLioSource { 250 liocb: Option<LioCb<'static>>, 251 fd: RawFd, 252 token: usize, 253 } 254 impl ReusableLioSource { new(liocb: LioCb<'static>) -> Self255 fn new(liocb: LioCb<'static>) -> Self { 256 ReusableLioSource { 257 liocb: Some(liocb), 258 fd: 0, 259 token: 0, 260 } 261 } reset(&mut self, liocb: LioCb<'static>)262 fn reset(&mut self, liocb: LioCb<'static>) { 263 self.liocb = Some(liocb); 264 } submit(&mut self)265 fn submit(&mut self) { 266 self.liocb 267 .as_mut() 268 .unwrap() 269 .register_raw(self.fd, self.token); 270 self.liocb.as_mut().unwrap().submit().unwrap(); 271 } 272 } 273 impl AioSource for ReusableLioSource { register(&mut self, kq: RawFd, token: usize)274 fn register(&mut self, kq: RawFd, token: usize) { 275 self.fd = kq; 276 self.token = token; 277 } deregister(&mut self)278 fn deregister(&mut self) { 279 self.fd = 0; 280 } 281 } 282 struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>); 283 impl<'a> Future for ReusableLioFut<'a> { 284 type Output = std::io::Result<Vec<isize>>; 285 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>286 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 287 let poll_result = self.0.poll_ready(cx); 288 match poll_result { 289 Poll::Pending => Poll::Pending, 290 Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 291 Poll::Ready(Ok(ev)) => { 292 // Since this future uses a reusable Aio, we must clear 293 // its readiness here. That makes the future 294 // non-idempotent; the caller can't poll it repeatedly after 295 // it has already returned Ready. But that's ok; most 296 // futures behave this way. 297 self.0.clear_ready(ev); 298 let r = (*self.0).liocb.take().unwrap().into_results(|iter| { 299 iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() 300 }); 301 Poll::Ready(Ok(r)) 302 } 303 } 304 } 305 } 306 307 /// An lio_listio operation with one write element 308 #[tokio::test] onewrite()309 async fn onewrite() { 310 const WBUF: &[u8] = b"abcdef"; 311 let f = tempfile().unwrap(); 312 313 let mut builder = mio_aio::LioCbBuilder::with_capacity(1); 314 builder = builder.emplace_slice( 315 f.as_raw_fd(), 316 0, 317 &WBUF[..], 318 0, 319 mio_aio::LioOpcode::LIO_WRITE, 320 ); 321 let liocb = builder.finish(); 322 let source = WrappedLioCb(liocb); 323 let mut poll_aio = Aio::new_for_lio(source).unwrap(); 324 325 // Send the operation to the kernel 326 (*poll_aio).0.submit().unwrap(); 327 let fut = LioFut(Some(poll_aio)); 328 let v = fut.await.unwrap(); 329 assert_eq!(v.len(), 1); 330 assert_eq!(v[0] as usize, WBUF.len()); 331 } 332 333 /// A suitably crafted future type can reuse an Aio object 334 #[tokio::test] reuse()335 async fn reuse() { 336 const WBUF: &[u8] = b"abcdef"; 337 let f = tempfile().unwrap(); 338 339 let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); 340 builder0 = builder0.emplace_slice( 341 f.as_raw_fd(), 342 0, 343 &WBUF[..], 344 0, 345 mio_aio::LioOpcode::LIO_WRITE, 346 ); 347 let liocb0 = builder0.finish(); 348 let source = ReusableLioSource::new(liocb0); 349 let mut poll_aio = Aio::new_for_aio(source).unwrap(); 350 poll_aio.submit(); 351 let fut0 = ReusableLioFut(&mut poll_aio); 352 let v = fut0.await.unwrap(); 353 assert_eq!(v.len(), 1); 354 assert_eq!(v[0] as usize, WBUF.len()); 355 356 // Now reuse the same Aio 357 let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); 358 builder1 = builder1.emplace_slice( 359 f.as_raw_fd(), 360 0, 361 &WBUF[..], 362 0, 363 mio_aio::LioOpcode::LIO_WRITE, 364 ); 365 let liocb1 = builder1.finish(); 366 poll_aio.reset(liocb1); 367 let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); 368 assert_pending!(poll_aio.poll_ready(&mut ctx)); 369 poll_aio.submit(); 370 let fut1 = ReusableLioFut(&mut poll_aio); 371 let v = fut1.await.unwrap(); 372 assert_eq!(v.len(), 1); 373 assert_eq!(v[0] as usize, WBUF.len()); 374 } 375 } 376