1 use futures_core::task::{Context, Poll}; 2 #[cfg(feature = "read-initializer")] 3 use futures_io::Initializer; 4 use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom}; 5 use std::{fmt, io}; 6 use std::pin::Pin; 7 8 /// A simple wrapper type which allows types which implement only 9 /// implement `std::io::Read` or `std::io::Write` 10 /// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`. 11 /// 12 /// If these types issue an error with the kind `io::ErrorKind::WouldBlock`, 13 /// it is expected that they will notify the current task on readiness. 14 /// Synchronous `std` types should not issue errors of this kind and 15 /// are safe to use in this context. However, using these types with 16 /// `AllowStdIo` will cause the event loop to block, so they should be used 17 /// with care. 18 #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] 19 pub struct AllowStdIo<T>(T); 20 21 impl<T> Unpin for AllowStdIo<T> {} 22 23 macro_rules! try_with_interrupt { 24 ($e:expr) => { 25 loop { 26 match $e { 27 Ok(e) => { 28 break e; 29 } 30 Err(ref e) if e.kind() == ::std::io::ErrorKind::Interrupted => { 31 continue; 32 } 33 Err(e) => { 34 return Poll::Ready(Err(e)); 35 } 36 } 37 } 38 } 39 } 40 41 impl<T> AllowStdIo<T> { 42 /// Creates a new `AllowStdIo` from an existing IO object. new(io: T) -> Self43 pub fn new(io: T) -> Self { 44 AllowStdIo(io) 45 } 46 47 /// Returns a reference to the contained IO object. get_ref(&self) -> &T48 pub fn get_ref(&self) -> &T { 49 &self.0 50 } 51 52 /// Returns a mutable reference to the contained IO object. get_mut(&mut self) -> &mut T53 pub fn get_mut(&mut self) -> &mut T { 54 &mut self.0 55 } 56 57 /// Consumes self and returns the contained IO object. into_inner(self) -> T58 pub fn into_inner(self) -> T { 59 self.0 60 } 61 } 62 63 impl<T> io::Write for AllowStdIo<T> where T: io::Write { write(&mut self, buf: &[u8]) -> io::Result<usize>64 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 65 self.0.write(buf) 66 } write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>67 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 68 self.0.write_vectored(bufs) 69 } flush(&mut self) -> io::Result<()>70 fn flush(&mut self) -> io::Result<()> { 71 self.0.flush() 72 } write_all(&mut self, buf: &[u8]) -> io::Result<()>73 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { 74 self.0.write_all(buf) 75 } write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()>76 fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { 77 self.0.write_fmt(fmt) 78 } 79 } 80 81 impl<T> AsyncWrite for AllowStdIo<T> where T: io::Write { poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>82 fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) 83 -> Poll<io::Result<usize>> 84 { 85 Poll::Ready(Ok(try_with_interrupt!(self.0.write(buf)))) 86 } 87 poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) -> Poll<io::Result<usize>>88 fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) 89 -> Poll<io::Result<usize>> 90 { 91 Poll::Ready(Ok(try_with_interrupt!(self.0.write_vectored(bufs)))) 92 } 93 poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>94 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 95 try_with_interrupt!(self.0.flush()); 96 Poll::Ready(Ok(())) 97 } 98 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>99 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 100 self.poll_flush(cx) 101 } 102 } 103 104 impl<T> io::Read for AllowStdIo<T> where T: io::Read { read(&mut self, buf: &mut [u8]) -> io::Result<usize>105 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 106 self.0.read(buf) 107 } read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>108 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 109 self.0.read_vectored(bufs) 110 } 111 #[cfg(feature = "read-initializer")] initializer(&self) -> Initializer112 unsafe fn initializer(&self) -> Initializer { 113 self.0.initializer() 114 } read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize>115 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { 116 self.0.read_to_end(buf) 117 } read_to_string(&mut self, buf: &mut String) -> io::Result<usize>118 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> { 119 self.0.read_to_string(buf) 120 } read_exact(&mut self, buf: &mut [u8]) -> io::Result<()>121 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { 122 self.0.read_exact(buf) 123 } 124 } 125 126 impl<T> AsyncRead for AllowStdIo<T> where T: io::Read { poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>127 fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) 128 -> Poll<io::Result<usize>> 129 { 130 Poll::Ready(Ok(try_with_interrupt!(self.0.read(buf)))) 131 } 132 poll_read_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) -> Poll<io::Result<usize>>133 fn poll_read_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) 134 -> Poll<io::Result<usize>> 135 { 136 Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs)))) 137 } 138 139 #[cfg(feature = "read-initializer")] initializer(&self) -> Initializer140 unsafe fn initializer(&self) -> Initializer { 141 self.0.initializer() 142 } 143 } 144 145 impl<T> io::Seek for AllowStdIo<T> where T: io::Seek { seek(&mut self, pos: SeekFrom) -> io::Result<u64>146 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { 147 self.0.seek(pos) 148 } 149 } 150 151 impl<T> AsyncSeek for AllowStdIo<T> where T: io::Seek { poll_seek(mut self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) -> Poll<io::Result<u64>>152 fn poll_seek(mut self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) 153 -> Poll<io::Result<u64>> 154 { 155 Poll::Ready(Ok(try_with_interrupt!(self.0.seek(pos)))) 156 } 157 } 158 159 impl<T> io::BufRead for AllowStdIo<T> where T: io::BufRead { fill_buf(&mut self) -> io::Result<&[u8]>160 fn fill_buf(&mut self) -> io::Result<&[u8]> { 161 self.0.fill_buf() 162 } consume(&mut self, amt: usize)163 fn consume(&mut self, amt: usize) { 164 self.0.consume(amt) 165 } 166 } 167 168 impl<T> AsyncBufRead for AllowStdIo<T> where T: io::BufRead { poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>>169 fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) 170 -> Poll<io::Result<&[u8]>> 171 { 172 let this: *mut Self = &mut *self as *mut _; 173 Poll::Ready(Ok(try_with_interrupt!(unsafe { &mut *this }.0.fill_buf()))) 174 } 175 consume(mut self: Pin<&mut Self>, amt: usize)176 fn consume(mut self: Pin<&mut Self>, amt: usize) { 177 self.0.consume(amt) 178 } 179 } 180