1 use std::io;
2
3 use futures::{Future, Poll};
4
5 use {AsyncRead, AsyncWrite};
6
7 /// A future which will copy all data from a reader into a writer.
8 ///
9 /// Created by the [`copy`] function, this future will resolve to the number of
10 /// bytes copied or an error if one happens.
11 ///
12 /// [`copy`]: fn.copy.html
13 #[derive(Debug)]
14 pub struct Copy<R, W> {
15 reader: Option<R>,
16 read_done: bool,
17 writer: Option<W>,
18 pos: usize,
19 cap: usize,
20 amt: u64,
21 buf: Box<[u8]>,
22 }
23
24 /// Creates a future which represents copying all the bytes from one object to
25 /// another.
26 ///
27 /// The returned future will copy all the bytes read from `reader` into the
28 /// `writer` specified. This future will only complete once the `reader` has hit
29 /// EOF and all bytes have been written to and flushed from the `writer`
30 /// provided.
31 ///
32 /// On success the number of bytes is returned and the `reader` and `writer` are
33 /// consumed. On error the error is returned and the I/O objects are consumed as
34 /// well.
copy<R, W>(reader: R, writer: W) -> Copy<R, W> where R: AsyncRead, W: AsyncWrite,35 pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
36 where
37 R: AsyncRead,
38 W: AsyncWrite,
39 {
40 Copy {
41 reader: Some(reader),
42 read_done: false,
43 writer: Some(writer),
44 amt: 0,
45 pos: 0,
46 cap: 0,
47 buf: Box::new([0; 2048]),
48 }
49 }
50
51 impl<R, W> Future for Copy<R, W>
52 where
53 R: AsyncRead,
54 W: AsyncWrite,
55 {
56 type Item = (u64, R, W);
57 type Error = io::Error;
58
poll(&mut self) -> Poll<(u64, R, W), io::Error>59 fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
60 loop {
61 // If our buffer is empty, then we need to read some data to
62 // continue.
63 if self.pos == self.cap && !self.read_done {
64 let reader = self.reader.as_mut().unwrap();
65 let n = try_ready!(reader.poll_read(&mut self.buf));
66 if n == 0 {
67 self.read_done = true;
68 } else {
69 self.pos = 0;
70 self.cap = n;
71 }
72 }
73
74 // If our buffer has some data, let's write it out!
75 while self.pos < self.cap {
76 let writer = self.writer.as_mut().unwrap();
77 let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
78 if i == 0 {
79 return Err(io::Error::new(
80 io::ErrorKind::WriteZero,
81 "write zero byte into writer",
82 ));
83 } else {
84 self.pos += i;
85 self.amt += i as u64;
86 }
87 }
88
89 // If we've written al the data and we've seen EOF, flush out the
90 // data and finish the transfer.
91 // done with the entire transfer.
92 if self.pos == self.cap && self.read_done {
93 try_ready!(self.writer.as_mut().unwrap().poll_flush());
94 let reader = self.reader.take().unwrap();
95 let writer = self.writer.take().unwrap();
96 return Ok((self.amt, reader, writer).into());
97 }
98 }
99 }
100 }
101