1 #[cfg(feature = "std")]
2 mod maybe_pending {
3     use futures::io::AsyncWrite;
4     use futures::task::{Context, Poll};
5     use std::io;
6     use std::pin::Pin;
7 
8     pub struct MaybePending {
9         pub inner: Vec<u8>,
10         ready: bool,
11     }
12 
13     impl MaybePending {
new(inner: Vec<u8>) -> Self14         pub fn new(inner: Vec<u8>) -> Self {
15             Self { inner, ready: false }
16         }
17     }
18 
19     impl AsyncWrite for MaybePending {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>20         fn poll_write(
21             mut self: Pin<&mut Self>,
22             cx: &mut Context<'_>,
23             buf: &[u8],
24         ) -> Poll<io::Result<usize>> {
25             if self.ready {
26                 self.ready = false;
27                 Pin::new(&mut self.inner).poll_write(cx, buf)
28             } else {
29                 self.ready = true;
30                 Poll::Pending
31             }
32         }
33 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>34         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
35             Pin::new(&mut self.inner).poll_flush(cx)
36         }
37 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>38         fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
39             Pin::new(&mut self.inner).poll_close(cx)
40         }
41     }
42 }
43 
44 #[cfg(any(feature = "std", feature = "executor"))]
45 mod util {
46     use futures::future::Future;
47 
run<F: Future + Unpin>(mut f: F) -> F::Output48     pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
49         use futures::future::FutureExt;
50         use futures::task::Poll;
51         use futures_test::task::noop_context;
52 
53         let mut cx = noop_context();
54         loop {
55             if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
56                 return x;
57             }
58         }
59     }
60 }
61 
62 #[cfg(feature = "executor")]
63 #[test]
buf_writer()64 fn buf_writer() {
65     use futures::executor::block_on;
66     use futures::io::{AsyncWriteExt, BufWriter};
67 
68     let mut writer = BufWriter::with_capacity(2, Vec::new());
69 
70     block_on(writer.write(&[0, 1])).unwrap();
71     assert_eq!(writer.buffer(), []);
72     assert_eq!(*writer.get_ref(), [0, 1]);
73 
74     block_on(writer.write(&[2])).unwrap();
75     assert_eq!(writer.buffer(), [2]);
76     assert_eq!(*writer.get_ref(), [0, 1]);
77 
78     block_on(writer.write(&[3])).unwrap();
79     assert_eq!(writer.buffer(), [2, 3]);
80     assert_eq!(*writer.get_ref(), [0, 1]);
81 
82     block_on(writer.flush()).unwrap();
83     assert_eq!(writer.buffer(), []);
84     assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
85 
86     block_on(writer.write(&[4])).unwrap();
87     block_on(writer.write(&[5])).unwrap();
88     assert_eq!(writer.buffer(), [4, 5]);
89     assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
90 
91     block_on(writer.write(&[6])).unwrap();
92     assert_eq!(writer.buffer(), [6]);
93     assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]);
94 
95     block_on(writer.write(&[7, 8])).unwrap();
96     assert_eq!(writer.buffer(), []);
97     assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]);
98 
99     block_on(writer.write(&[9, 10, 11])).unwrap();
100     assert_eq!(writer.buffer(), []);
101     assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
102 
103     block_on(writer.flush()).unwrap();
104     assert_eq!(writer.buffer(), []);
105     assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
106 }
107 
108 #[cfg(feature = "executor")]
109 #[test]
buf_writer_inner_flushes()110 fn buf_writer_inner_flushes() {
111     use futures::executor::block_on;
112     use futures::io::{AsyncWriteExt, BufWriter};
113 
114     let mut w = BufWriter::with_capacity(3, Vec::new());
115     block_on(w.write(&[0, 1])).unwrap();
116     assert_eq!(*w.get_ref(), []);
117     block_on(w.flush()).unwrap();
118     let w = w.into_inner();
119     assert_eq!(w, [0, 1]);
120 }
121 
122 #[cfg(feature = "executor")]
123 #[test]
buf_writer_seek()124 fn buf_writer_seek() {
125     use futures::executor::block_on;
126     use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
127 
128     // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
129     // use `Vec::new` instead of `vec![0; 8]`.
130     let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8]));
131     block_on(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
132     block_on(w.write_all(&[6, 7])).unwrap();
133     assert_eq!(block_on(w.seek(SeekFrom::Current(0))).ok(), Some(8));
134     assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
135     assert_eq!(block_on(w.seek(SeekFrom::Start(2))).ok(), Some(2));
136     block_on(w.write_all(&[8, 9])).unwrap();
137     block_on(w.flush()).unwrap();
138     assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
139 }
140 
141 #[cfg(feature = "std")]
142 #[test]
maybe_pending_buf_writer()143 fn maybe_pending_buf_writer() {
144     use futures::io::{AsyncWriteExt, BufWriter};
145 
146     use maybe_pending::MaybePending;
147     use util::run;
148 
149     let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
150 
151     run(writer.write(&[0, 1])).unwrap();
152     assert_eq!(writer.buffer(), []);
153     assert_eq!(&writer.get_ref().inner, &[0, 1]);
154 
155     run(writer.write(&[2])).unwrap();
156     assert_eq!(writer.buffer(), [2]);
157     assert_eq!(&writer.get_ref().inner, &[0, 1]);
158 
159     run(writer.write(&[3])).unwrap();
160     assert_eq!(writer.buffer(), [2, 3]);
161     assert_eq!(&writer.get_ref().inner, &[0, 1]);
162 
163     run(writer.flush()).unwrap();
164     assert_eq!(writer.buffer(), []);
165     assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
166 
167     run(writer.write(&[4])).unwrap();
168     run(writer.write(&[5])).unwrap();
169     assert_eq!(writer.buffer(), [4, 5]);
170     assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
171 
172     run(writer.write(&[6])).unwrap();
173     assert_eq!(writer.buffer(), [6]);
174     assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]);
175 
176     run(writer.write(&[7, 8])).unwrap();
177     assert_eq!(writer.buffer(), []);
178     assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
179 
180     run(writer.write(&[9, 10, 11])).unwrap();
181     assert_eq!(writer.buffer(), []);
182     assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
183 
184     run(writer.flush()).unwrap();
185     assert_eq!(writer.buffer(), []);
186     assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
187 }
188 
189 #[cfg(feature = "std")]
190 #[test]
maybe_pending_buf_writer_inner_flushes()191 fn maybe_pending_buf_writer_inner_flushes() {
192     use futures::io::{AsyncWriteExt, BufWriter};
193 
194     use maybe_pending::MaybePending;
195     use util::run;
196 
197     let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
198     run(w.write(&[0, 1])).unwrap();
199     assert_eq!(&w.get_ref().inner, &[]);
200     run(w.flush()).unwrap();
201     let w = w.into_inner().inner;
202     assert_eq!(w, [0, 1]);
203 }
204 
205 #[cfg(feature = "std")]
206 #[test]
maybe_pending_buf_writer_seek()207 fn maybe_pending_buf_writer_seek() {
208     use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
209     use futures::task::{Context, Poll};
210     use std::io;
211     use std::pin::Pin;
212 
213     use util::run;
214 
215     struct MaybePendingSeek {
216         inner: Cursor<Vec<u8>>,
217         ready_write: bool,
218         ready_seek: bool,
219     }
220 
221     impl MaybePendingSeek {
222         fn new(inner: Vec<u8>) -> Self {
223             Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false }
224         }
225     }
226 
227     impl AsyncWrite for MaybePendingSeek {
228         fn poll_write(
229             mut self: Pin<&mut Self>,
230             cx: &mut Context<'_>,
231             buf: &[u8],
232         ) -> Poll<io::Result<usize>> {
233             if self.ready_write {
234                 self.ready_write = false;
235                 Pin::new(&mut self.inner).poll_write(cx, buf)
236             } else {
237                 self.ready_write = true;
238                 Poll::Pending
239             }
240         }
241 
242         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
243             Pin::new(&mut self.inner).poll_flush(cx)
244         }
245 
246         fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
247             Pin::new(&mut self.inner).poll_close(cx)
248         }
249     }
250 
251     impl AsyncSeek for MaybePendingSeek {
252         fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
253             -> Poll<io::Result<u64>>
254         {
255             if self.ready_seek {
256                 self.ready_seek = false;
257                 Pin::new(&mut self.inner).poll_seek(cx, pos)
258             } else {
259                 self.ready_seek = true;
260                 Poll::Pending
261             }
262         }
263     }
264 
265     // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
266     // use `Vec::new` instead of `vec![0; 8]`.
267     let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8]));
268     run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
269     run(w.write_all(&[6, 7])).unwrap();
270     assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8));
271     assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
272     assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2));
273     run(w.write_all(&[8, 9])).unwrap();
274     run(w.flush()).unwrap();
275     assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
276 }
277