1 #[macro_use]
2 extern crate futures;
3
4 use futures::prelude::*;
5 use futures::executor;
6 use futures::future::{err, ok};
7 use futures::stream::{empty, iter_ok, poll_fn, Peekable};
8 use futures::sync::oneshot;
9 use futures::sync::mpsc;
10
11 mod support;
12 use support::*;
13
14 pub struct Iter<I> {
15 iter: I,
16 }
17
iter<J, T, E>(i: J) -> Iter<J::IntoIter> where J: IntoIterator<Item=Result<T, E>>,18 pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
19 where J: IntoIterator<Item=Result<T, E>>,
20 {
21 Iter {
22 iter: i.into_iter(),
23 }
24 }
25
26 impl<I, T, E> Stream for Iter<I>
27 where I: Iterator<Item=Result<T, E>>,
28 {
29 type Item = T;
30 type Error = E;
31
poll(&mut self) -> Poll<Option<T>, E>32 fn poll(&mut self) -> Poll<Option<T>, E> {
33 match self.iter.next() {
34 Some(Ok(e)) => Ok(Async::Ready(Some(e))),
35 Some(Err(e)) => Err(e),
36 None => Ok(Async::Ready(None)),
37 }
38 }
39 }
40
list() -> Box<Stream<Item=i32, Error=u32> + Send>41 fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
42 let (tx, rx) = mpsc::channel(1);
43 tx.send(Ok(1))
44 .and_then(|tx| tx.send(Ok(2)))
45 .and_then(|tx| tx.send(Ok(3)))
46 .forget();
47 Box::new(rx.then(|r| r.unwrap()))
48 }
49
err_list() -> Box<Stream<Item=i32, Error=u32> + Send>50 fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
51 let (tx, rx) = mpsc::channel(1);
52 tx.send(Ok(1))
53 .and_then(|tx| tx.send(Ok(2)))
54 .and_then(|tx| tx.send(Err(3)))
55 .forget();
56 Box::new(rx.then(|r| r.unwrap()))
57 }
58
59 #[test]
map()60 fn map() {
61 assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
62 }
63
64 #[test]
map_err()65 fn map_err() {
66 assert_done(|| err_list().map_err(|a| a + 1).collect(), Err(4));
67 }
68
69 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
70 struct FromErrTest(u32);
71
72 impl From<u32> for FromErrTest {
from(i: u32) -> FromErrTest73 fn from(i: u32) -> FromErrTest {
74 FromErrTest(i)
75 }
76 }
77
78 #[test]
from_err()79 fn from_err() {
80 assert_done(|| err_list().from_err().collect(), Err(FromErrTest(3)));
81 }
82
83 #[test]
fold()84 fn fold() {
85 assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
86 assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
87 }
88
89 #[test]
filter()90 fn filter() {
91 assert_done(|| list().filter(|a| *a % 2 == 0).collect(), Ok(vec![2]));
92 }
93
94 #[test]
filter_map()95 fn filter_map() {
96 assert_done(|| list().filter_map(|x| {
97 if x % 2 == 0 {
98 Some(x + 10)
99 } else {
100 None
101 }
102 }).collect(), Ok(vec![12]));
103 }
104
105 #[test]
and_then()106 fn and_then() {
107 assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
108 assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect(),
109 Err(1));
110 }
111
112 #[test]
then()113 fn then() {
114 assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
115
116 }
117
118 #[test]
or_else()119 fn or_else() {
120 assert_done(|| err_list().or_else(|a| {
121 ok::<i32, u32>(a as i32)
122 }).collect(), Ok(vec![1, 2, 3]));
123 }
124
125 #[test]
flatten()126 fn flatten() {
127 assert_done(|| list().map(|_| list()).flatten().collect(),
128 Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
129
130 }
131
132 #[test]
skip()133 fn skip() {
134 assert_done(|| list().skip(2).collect(), Ok(vec![3]));
135 }
136
137 #[test]
skip_passes_errors_through()138 fn skip_passes_errors_through() {
139 let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)])
140 .skip(1)
141 .wait();
142 assert_eq!(s.next(), Some(Err(1)));
143 assert_eq!(s.next(), Some(Err(2)));
144 assert_eq!(s.next(), Some(Ok(4)));
145 assert_eq!(s.next(), Some(Ok(5)));
146 assert_eq!(s.next(), None);
147 }
148
149 #[test]
skip_while()150 fn skip_while() {
151 assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
152 Ok(vec![2, 3]));
153 }
154 #[test]
take()155 fn take() {
156 assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
157 }
158
159 #[test]
take_while()160 fn take_while() {
161 assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
162 Ok(vec![1, 2]));
163 }
164
165 #[test]
take_passes_errors_through()166 fn take_passes_errors_through() {
167 let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)])
168 .take(1)
169 .wait();
170 assert_eq!(s.next(), Some(Err(1)));
171 assert_eq!(s.next(), Some(Err(2)));
172 assert_eq!(s.next(), Some(Ok(3)));
173 assert_eq!(s.next(), None);
174
175 let mut s = iter(vec![Ok(1), Err(2)]).take(1).wait();
176 assert_eq!(s.next(), Some(Ok(1)));
177 assert_eq!(s.next(), None);
178 }
179
180 #[test]
peekable()181 fn peekable() {
182 assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
183 }
184
185 #[test]
fuse()186 fn fuse() {
187 let mut stream = list().fuse().wait();
188 assert_eq!(stream.next(), Some(Ok(1)));
189 assert_eq!(stream.next(), Some(Ok(2)));
190 assert_eq!(stream.next(), Some(Ok(3)));
191 assert_eq!(stream.next(), None);
192 assert_eq!(stream.next(), None);
193 assert_eq!(stream.next(), None);
194 }
195
196 #[test]
buffered()197 fn buffered() {
198 let (tx, rx) = mpsc::channel(1);
199 let (a, b) = oneshot::channel::<u32>();
200 let (c, d) = oneshot::channel::<u32>();
201
202 tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
203 .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
204 .forget();
205
206 let mut rx = rx.buffered(2);
207 sassert_empty(&mut rx);
208 c.send(3).unwrap();
209 sassert_empty(&mut rx);
210 a.send(5).unwrap();
211 let mut rx = rx.wait();
212 assert_eq!(rx.next(), Some(Ok(5)));
213 assert_eq!(rx.next(), Some(Ok(3)));
214 assert_eq!(rx.next(), None);
215
216 let (tx, rx) = mpsc::channel(1);
217 let (a, b) = oneshot::channel::<u32>();
218 let (c, d) = oneshot::channel::<u32>();
219
220 tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
221 .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
222 .forget();
223
224 let mut rx = rx.buffered(1);
225 sassert_empty(&mut rx);
226 c.send(3).unwrap();
227 sassert_empty(&mut rx);
228 a.send(5).unwrap();
229 let mut rx = rx.wait();
230 assert_eq!(rx.next(), Some(Ok(5)));
231 assert_eq!(rx.next(), Some(Ok(3)));
232 assert_eq!(rx.next(), None);
233 }
234
235 #[test]
unordered()236 fn unordered() {
237 let (tx, rx) = mpsc::channel(1);
238 let (a, b) = oneshot::channel::<u32>();
239 let (c, d) = oneshot::channel::<u32>();
240
241 tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
242 .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
243 .forget();
244
245 let mut rx = rx.buffer_unordered(2);
246 sassert_empty(&mut rx);
247 let mut rx = rx.wait();
248 c.send(3).unwrap();
249 assert_eq!(rx.next(), Some(Ok(3)));
250 a.send(5).unwrap();
251 assert_eq!(rx.next(), Some(Ok(5)));
252 assert_eq!(rx.next(), None);
253
254 let (tx, rx) = mpsc::channel(1);
255 let (a, b) = oneshot::channel::<u32>();
256 let (c, d) = oneshot::channel::<u32>();
257
258 tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
259 .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
260 .forget();
261
262 // We don't even get to see `c` until `a` completes.
263 let mut rx = rx.buffer_unordered(1);
264 sassert_empty(&mut rx);
265 c.send(3).unwrap();
266 sassert_empty(&mut rx);
267 a.send(5).unwrap();
268 let mut rx = rx.wait();
269 assert_eq!(rx.next(), Some(Ok(5)));
270 assert_eq!(rx.next(), Some(Ok(3)));
271 assert_eq!(rx.next(), None);
272 }
273
274 #[test]
zip()275 fn zip() {
276 assert_done(|| list().zip(list()).collect(),
277 Ok(vec![(1, 1), (2, 2), (3, 3)]));
278 assert_done(|| list().zip(list().take(2)).collect(),
279 Ok(vec![(1, 1), (2, 2)]));
280 assert_done(|| list().take(2).zip(list()).collect(),
281 Ok(vec![(1, 1), (2, 2)]));
282 assert_done(|| err_list().zip(list()).collect(), Err(3));
283 assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
284 Ok(vec![(1, 2), (2, 3), (3, 4)]));
285 }
286
287 #[test]
peek()288 fn peek() {
289 struct Peek {
290 inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
291 }
292
293 impl Future for Peek {
294 type Item = ();
295 type Error = u32;
296
297 fn poll(&mut self) -> Poll<(), u32> {
298 {
299 let res = try_ready!(self.inner.peek());
300 assert_eq!(res, Some(&1));
301 }
302 assert_eq!(self.inner.peek().unwrap(), Some(&1).into());
303 assert_eq!(self.inner.poll().unwrap(), Some(1).into());
304 Ok(().into())
305 }
306 }
307
308 Peek {
309 inner: list().peekable(),
310 }.wait().unwrap()
311 }
312
313 #[test]
wait()314 fn wait() {
315 assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(),
316 Ok(vec![1, 2, 3]));
317 }
318
319 #[test]
chunks()320 fn chunks() {
321 assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
322 assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
323 assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
324 let mut list = executor::spawn(err_list().chunks(3));
325 let i = list.wait_stream().unwrap().unwrap();
326 assert_eq!(i, vec![1, 2]);
327 let i = list.wait_stream().unwrap().unwrap_err();
328 assert_eq!(i, 3);
329 }
330
331 #[test]
332 #[should_panic]
chunks_panic_on_cap_zero()333 fn chunks_panic_on_cap_zero() {
334 let _ = list().chunks(0);
335 }
336
337 #[test]
select()338 fn select() {
339 let a = iter_ok::<_, u32>(vec![1, 2, 3]);
340 let b = iter_ok(vec![4, 5, 6]);
341 assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6]));
342
343 let a = iter_ok::<_, u32>(vec![1, 2, 3]);
344 let b = iter_ok(vec![1, 2]);
345 assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
346
347 let a = iter_ok(vec![1, 2]);
348 let b = iter_ok::<_, u32>(vec![1, 2, 3]);
349 assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
350 }
351
352 #[test]
forward()353 fn forward() {
354 let v = Vec::new();
355 let v = iter_ok::<_, ()>(vec![0, 1]).forward(v).wait().unwrap().1;
356 assert_eq!(v, vec![0, 1]);
357
358 let v = iter_ok::<_, ()>(vec![2, 3]).forward(v).wait().unwrap().1;
359 assert_eq!(v, vec![0, 1, 2, 3]);
360
361 assert_done(move || iter_ok(vec![4, 5]).forward(v).map(|(_, s)| s),
362 Ok::<_, ()>(vec![0, 1, 2, 3, 4, 5]));
363 }
364
365 #[test]
366 #[allow(deprecated)]
concat()367 fn concat() {
368 let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
369 assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
370
371 let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
372 assert_done(move || b.concat(), Err(()));
373 }
374
375 #[test]
concat2()376 fn concat2() {
377 let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
378 assert_done(move || a.concat2(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
379
380 let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
381 assert_done(move || b.concat2(), Err(()));
382
383 let c = empty::<Vec<()>, ()>();
384 assert_done(move || c.concat2(), Ok(vec![]))
385 }
386
387 #[test]
stream_poll_fn()388 fn stream_poll_fn() {
389 let mut counter = 5usize;
390
391 let read_stream = poll_fn(move || -> Poll<Option<usize>, std::io::Error> {
392 if counter == 0 {
393 return Ok(Async::Ready(None));
394 }
395 counter -= 1;
396 Ok(Async::Ready(Some(counter)))
397 });
398
399 assert_eq!(read_stream.wait().count(), 5);
400 }
401
402 #[test]
inspect()403 fn inspect() {
404 let mut seen = vec![];
405 assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
406 assert_eq!(seen, [1, 2, 3]);
407 }
408
409 #[test]
inspect_err()410 fn inspect_err() {
411 let mut seen = vec![];
412 assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect(), Err(3));
413 assert_eq!(seen, [3]);
414 }
415