1 #[cfg(feature = "std")]
2 mod maybe_pending {
3 use futures::io::AsyncWrite;
4 use futures::task::{Context, Poll};
5 use std::io;
6 use std::pin::Pin;
7
8 pub struct MaybePending {
9 pub inner: Vec<u8>,
10 ready: bool,
11 }
12
13 impl MaybePending {
new(inner: Vec<u8>) -> Self14 pub fn new(inner: Vec<u8>) -> Self {
15 Self { inner, ready: false }
16 }
17 }
18
19 impl AsyncWrite for MaybePending {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>20 fn poll_write(
21 mut self: Pin<&mut Self>,
22 cx: &mut Context<'_>,
23 buf: &[u8],
24 ) -> Poll<io::Result<usize>> {
25 if self.ready {
26 self.ready = false;
27 Pin::new(&mut self.inner).poll_write(cx, buf)
28 } else {
29 self.ready = true;
30 Poll::Pending
31 }
32 }
33
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>34 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
35 Pin::new(&mut self.inner).poll_flush(cx)
36 }
37
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>38 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
39 Pin::new(&mut self.inner).poll_close(cx)
40 }
41 }
42 }
43
44 #[cfg(any(feature = "std", feature = "executor"))]
45 mod util {
46 use futures::future::Future;
47
run<F: Future + Unpin>(mut f: F) -> F::Output48 pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
49 use futures::future::FutureExt;
50 use futures::task::Poll;
51 use futures_test::task::noop_context;
52
53 let mut cx = noop_context();
54 loop {
55 if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
56 return x;
57 }
58 }
59 }
60 }
61
62 #[cfg(feature = "executor")]
63 #[test]
buf_writer()64 fn buf_writer() {
65 use futures::executor::block_on;
66 use futures::io::{AsyncWriteExt, BufWriter};
67
68 let mut writer = BufWriter::with_capacity(2, Vec::new());
69
70 block_on(writer.write(&[0, 1])).unwrap();
71 assert_eq!(writer.buffer(), []);
72 assert_eq!(*writer.get_ref(), [0, 1]);
73
74 block_on(writer.write(&[2])).unwrap();
75 assert_eq!(writer.buffer(), [2]);
76 assert_eq!(*writer.get_ref(), [0, 1]);
77
78 block_on(writer.write(&[3])).unwrap();
79 assert_eq!(writer.buffer(), [2, 3]);
80 assert_eq!(*writer.get_ref(), [0, 1]);
81
82 block_on(writer.flush()).unwrap();
83 assert_eq!(writer.buffer(), []);
84 assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
85
86 block_on(writer.write(&[4])).unwrap();
87 block_on(writer.write(&[5])).unwrap();
88 assert_eq!(writer.buffer(), [4, 5]);
89 assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
90
91 block_on(writer.write(&[6])).unwrap();
92 assert_eq!(writer.buffer(), [6]);
93 assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]);
94
95 block_on(writer.write(&[7, 8])).unwrap();
96 assert_eq!(writer.buffer(), []);
97 assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]);
98
99 block_on(writer.write(&[9, 10, 11])).unwrap();
100 assert_eq!(writer.buffer(), []);
101 assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
102
103 block_on(writer.flush()).unwrap();
104 assert_eq!(writer.buffer(), []);
105 assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
106 }
107
108 #[cfg(feature = "executor")]
109 #[test]
buf_writer_inner_flushes()110 fn buf_writer_inner_flushes() {
111 use futures::executor::block_on;
112 use futures::io::{AsyncWriteExt, BufWriter};
113
114 let mut w = BufWriter::with_capacity(3, Vec::new());
115 block_on(w.write(&[0, 1])).unwrap();
116 assert_eq!(*w.get_ref(), []);
117 block_on(w.flush()).unwrap();
118 let w = w.into_inner();
119 assert_eq!(w, [0, 1]);
120 }
121
122 #[cfg(feature = "executor")]
123 #[test]
buf_writer_seek()124 fn buf_writer_seek() {
125 use futures::executor::block_on;
126 use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
127
128 // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
129 // use `Vec::new` instead of `vec![0; 8]`.
130 let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8]));
131 block_on(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
132 block_on(w.write_all(&[6, 7])).unwrap();
133 assert_eq!(block_on(w.seek(SeekFrom::Current(0))).ok(), Some(8));
134 assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
135 assert_eq!(block_on(w.seek(SeekFrom::Start(2))).ok(), Some(2));
136 block_on(w.write_all(&[8, 9])).unwrap();
137 block_on(w.flush()).unwrap();
138 assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
139 }
140
141 #[cfg(feature = "std")]
142 #[test]
maybe_pending_buf_writer()143 fn maybe_pending_buf_writer() {
144 use futures::io::{AsyncWriteExt, BufWriter};
145
146 use maybe_pending::MaybePending;
147 use util::run;
148
149 let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
150
151 run(writer.write(&[0, 1])).unwrap();
152 assert_eq!(writer.buffer(), []);
153 assert_eq!(&writer.get_ref().inner, &[0, 1]);
154
155 run(writer.write(&[2])).unwrap();
156 assert_eq!(writer.buffer(), [2]);
157 assert_eq!(&writer.get_ref().inner, &[0, 1]);
158
159 run(writer.write(&[3])).unwrap();
160 assert_eq!(writer.buffer(), [2, 3]);
161 assert_eq!(&writer.get_ref().inner, &[0, 1]);
162
163 run(writer.flush()).unwrap();
164 assert_eq!(writer.buffer(), []);
165 assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
166
167 run(writer.write(&[4])).unwrap();
168 run(writer.write(&[5])).unwrap();
169 assert_eq!(writer.buffer(), [4, 5]);
170 assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
171
172 run(writer.write(&[6])).unwrap();
173 assert_eq!(writer.buffer(), [6]);
174 assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]);
175
176 run(writer.write(&[7, 8])).unwrap();
177 assert_eq!(writer.buffer(), []);
178 assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
179
180 run(writer.write(&[9, 10, 11])).unwrap();
181 assert_eq!(writer.buffer(), []);
182 assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
183
184 run(writer.flush()).unwrap();
185 assert_eq!(writer.buffer(), []);
186 assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
187 }
188
189 #[cfg(feature = "std")]
190 #[test]
maybe_pending_buf_writer_inner_flushes()191 fn maybe_pending_buf_writer_inner_flushes() {
192 use futures::io::{AsyncWriteExt, BufWriter};
193
194 use maybe_pending::MaybePending;
195 use util::run;
196
197 let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
198 run(w.write(&[0, 1])).unwrap();
199 assert_eq!(&w.get_ref().inner, &[]);
200 run(w.flush()).unwrap();
201 let w = w.into_inner().inner;
202 assert_eq!(w, [0, 1]);
203 }
204
205 #[cfg(feature = "std")]
206 #[test]
maybe_pending_buf_writer_seek()207 fn maybe_pending_buf_writer_seek() {
208 use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
209 use futures::task::{Context, Poll};
210 use std::io;
211 use std::pin::Pin;
212
213 use util::run;
214
215 struct MaybePendingSeek {
216 inner: Cursor<Vec<u8>>,
217 ready_write: bool,
218 ready_seek: bool,
219 }
220
221 impl MaybePendingSeek {
222 fn new(inner: Vec<u8>) -> Self {
223 Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false }
224 }
225 }
226
227 impl AsyncWrite for MaybePendingSeek {
228 fn poll_write(
229 mut self: Pin<&mut Self>,
230 cx: &mut Context<'_>,
231 buf: &[u8],
232 ) -> Poll<io::Result<usize>> {
233 if self.ready_write {
234 self.ready_write = false;
235 Pin::new(&mut self.inner).poll_write(cx, buf)
236 } else {
237 self.ready_write = true;
238 Poll::Pending
239 }
240 }
241
242 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
243 Pin::new(&mut self.inner).poll_flush(cx)
244 }
245
246 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
247 Pin::new(&mut self.inner).poll_close(cx)
248 }
249 }
250
251 impl AsyncSeek for MaybePendingSeek {
252 fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
253 -> Poll<io::Result<u64>>
254 {
255 if self.ready_seek {
256 self.ready_seek = false;
257 Pin::new(&mut self.inner).poll_seek(cx, pos)
258 } else {
259 self.ready_seek = true;
260 Poll::Pending
261 }
262 }
263 }
264
265 // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
266 // use `Vec::new` instead of `vec![0; 8]`.
267 let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8]));
268 run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
269 run(w.write_all(&[6, 7])).unwrap();
270 assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8));
271 assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
272 assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2));
273 run(w.write_all(&[8, 9])).unwrap();
274 run(w.flush()).unwrap();
275 assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
276 }
277