1 use std::mem;
2 use std::prelude::v1::*;
3
4 use {Async, Poll};
5 use stream::{Stream, Fuse};
6
7 /// An adaptor that chunks up elements in a vector.
8 ///
9 /// This adaptor will buffer up a list of items in the stream and pass on the
10 /// vector used for buffering when a specified capacity has been reached. This
11 /// is created by the `Stream::chunks` method.
12 #[derive(Debug)]
13 #[must_use = "streams do nothing unless polled"]
14 pub struct Chunks<S>
15 where S: Stream
16 {
17 items: Vec<S::Item>,
18 err: Option<S::Error>,
19 stream: Fuse<S>,
20 cap: usize, // https://github.com/rust-lang-nursery/futures-rs/issues/1475
21 }
22
new<S>(s: S, capacity: usize) -> Chunks<S> where S: Stream23 pub fn new<S>(s: S, capacity: usize) -> Chunks<S>
24 where S: Stream
25 {
26 assert!(capacity > 0);
27
28 Chunks {
29 items: Vec::with_capacity(capacity),
30 err: None,
31 stream: super::fuse::new(s),
32 cap: capacity,
33 }
34 }
35
36 // Forwarding impl of Sink from the underlying stream
37 impl<S> ::sink::Sink for Chunks<S>
38 where S: ::sink::Sink + Stream
39 {
40 type SinkItem = S::SinkItem;
41 type SinkError = S::SinkError;
42
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>43 fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
44 self.stream.start_send(item)
45 }
46
poll_complete(&mut self) -> Poll<(), S::SinkError>47 fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
48 self.stream.poll_complete()
49 }
50
close(&mut self) -> Poll<(), S::SinkError>51 fn close(&mut self) -> Poll<(), S::SinkError> {
52 self.stream.close()
53 }
54 }
55
56
57 impl<S> Chunks<S> where S: Stream {
take(&mut self) -> Vec<S::Item>58 fn take(&mut self) -> Vec<S::Item> {
59 let cap = self.cap;
60 mem::replace(&mut self.items, Vec::with_capacity(cap))
61 }
62
63 /// Acquires a reference to the underlying stream that this combinator is
64 /// pulling from.
get_ref(&self) -> &S65 pub fn get_ref(&self) -> &S {
66 self.stream.get_ref()
67 }
68
69 /// Acquires a mutable reference to the underlying stream that this
70 /// combinator is pulling from.
71 ///
72 /// Note that care must be taken to avoid tampering with the state of the
73 /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut S74 pub fn get_mut(&mut self) -> &mut S {
75 self.stream.get_mut()
76 }
77
78 /// Consumes this combinator, returning the underlying stream.
79 ///
80 /// Note that this may discard intermediate state of this combinator, so
81 /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> S82 pub fn into_inner(self) -> S {
83 self.stream.into_inner()
84 }
85 }
86
87 impl<S> Stream for Chunks<S>
88 where S: Stream
89 {
90 type Item = Vec<<S as Stream>::Item>;
91 type Error = <S as Stream>::Error;
92
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>93 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
94 if let Some(err) = self.err.take() {
95 return Err(err)
96 }
97
98 loop {
99 match self.stream.poll() {
100 Ok(Async::NotReady) => return Ok(Async::NotReady),
101
102 // Push the item into the buffer and check whether it is full.
103 // If so, replace our buffer with a new and empty one and return
104 // the full one.
105 Ok(Async::Ready(Some(item))) => {
106 self.items.push(item);
107 if self.items.len() >= self.cap {
108 return Ok(Some(self.take()).into())
109 }
110 }
111
112 // Since the underlying stream ran out of values, return what we
113 // have buffered, if we have anything.
114 Ok(Async::Ready(None)) => {
115 return if self.items.len() > 0 {
116 let full_buf = mem::replace(&mut self.items, Vec::new());
117 Ok(Some(full_buf).into())
118 } else {
119 Ok(Async::Ready(None))
120 }
121 }
122
123 // If we've got buffered items be sure to return them first,
124 // we'll defer our error for later.
125 Err(e) => {
126 if self.items.len() == 0 {
127 return Err(e)
128 } else {
129 self.err = Some(e);
130 return Ok(Some(self.take()).into())
131 }
132 }
133 }
134 }
135 }
136 }
137