1 use crate::io::{AsyncRead, AsyncWrite}; 2 3 use std::future::Future; 4 use std::io; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 8 cfg_io_util! { 9 /// A future that asynchronously copies the entire contents of a reader into a 10 /// writer. 11 /// 12 /// This struct is generally created by calling [`copy`][copy]. Please 13 /// see the documentation of `copy()` for more details. 14 /// 15 /// [copy]: copy() 16 #[derive(Debug)] 17 #[must_use = "futures do nothing unless you `.await` or poll them"] 18 pub struct Copy<'a, R: ?Sized, W: ?Sized> { 19 reader: &'a mut R, 20 read_done: bool, 21 writer: &'a mut W, 22 pos: usize, 23 cap: usize, 24 amt: u64, 25 buf: Box<[u8]>, 26 } 27 28 /// Asynchronously copies the entire contents of a reader into a writer. 29 /// 30 /// This function returns a future that will continuously read data from 31 /// `reader` and then write it into `writer` in a streaming fashion until 32 /// `reader` returns EOF. 33 /// 34 /// On success, the total number of bytes that were copied from `reader` to 35 /// `writer` is returned. 36 /// 37 /// This is an asynchronous version of [`std::io::copy`][std]. 38 /// 39 /// [std]: std::io::copy 40 /// 41 /// # Errors 42 /// 43 /// The returned future will finish with an error will return an error 44 /// immediately if any call to `poll_read` or `poll_write` returns an error. 45 /// 46 /// # Examples 47 /// 48 /// ``` 49 /// use tokio::io; 50 /// 51 /// # async fn dox() -> std::io::Result<()> { 52 /// let mut reader: &[u8] = b"hello"; 53 /// let mut writer: Vec<u8> = vec![]; 54 /// 55 /// io::copy(&mut reader, &mut writer).await?; 56 /// 57 /// assert_eq!(&b"hello"[..], &writer[..]); 58 /// # Ok(()) 59 /// # } 60 /// ``` 61 pub fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W> 62 where 63 R: AsyncRead + Unpin + ?Sized, 64 W: AsyncWrite + Unpin + ?Sized, 65 { 66 Copy { 67 reader, 68 read_done: false, 69 writer, 70 amt: 0, 71 pos: 0, 72 cap: 0, 73 buf: Box::new([0; 2048]), 74 } 75 } 76 } 77 78 impl<R, W> Future for Copy<'_, R, W> 79 where 80 R: AsyncRead + Unpin + ?Sized, 81 W: AsyncWrite + Unpin + ?Sized, 82 { 83 type Output = io::Result<u64>; 84 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>85 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { 86 loop { 87 // If our buffer is empty, then we need to read some data to 88 // continue. 89 if self.pos == self.cap && !self.read_done { 90 let me = &mut *self; 91 let n = ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?; 92 if n == 0 { 93 self.read_done = true; 94 } else { 95 self.pos = 0; 96 self.cap = n; 97 } 98 } 99 100 // If our buffer has some data, let's write it out! 101 while self.pos < self.cap { 102 let me = &mut *self; 103 let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, &me.buf[me.pos..me.cap]))?; 104 if i == 0 { 105 return Poll::Ready(Err(io::Error::new( 106 io::ErrorKind::WriteZero, 107 "write zero byte into writer", 108 ))); 109 } else { 110 self.pos += i; 111 self.amt += i as u64; 112 } 113 } 114 115 // If we've written all the data and we've seen EOF, flush out the 116 // data and finish the transfer. 117 if self.pos == self.cap && self.read_done { 118 let me = &mut *self; 119 ready!(Pin::new(&mut *me.writer).poll_flush(cx))?; 120 return Poll::Ready(Ok(self.amt)); 121 } 122 } 123 } 124 } 125 126 #[cfg(test)] 127 mod tests { 128 use super::*; 129 130 #[test] assert_unpin()131 fn assert_unpin() { 132 use std::marker::PhantomPinned; 133 crate::is_unpin::<Copy<'_, PhantomPinned, PhantomPinned>>(); 134 } 135 } 136