1 use {Async, Poll, IntoFuture, Future};
2 use stream::Stream;
3
4 /// A stream combinator which takes elements from a stream while a predicate
5 /// holds.
6 ///
7 /// This structure is produced by the `Stream::take_while` method.
8 #[derive(Debug)]
9 #[must_use = "streams do nothing unless polled"]
10 pub struct TakeWhile<S, P, R> where S: Stream, R: IntoFuture {
11 stream: S,
12 pred: P,
13 pending: Option<(R::Future, S::Item)>,
14 done_taking: bool,
15 }
16
new<S, P, R>(s: S, p: P) -> TakeWhile<S, P, R> where S: Stream, P: FnMut(&S::Item) -> R, R: IntoFuture<Item=bool, Error=S::Error>,17 pub fn new<S, P, R>(s: S, p: P) -> TakeWhile<S, P, R>
18 where S: Stream,
19 P: FnMut(&S::Item) -> R,
20 R: IntoFuture<Item=bool, Error=S::Error>,
21 {
22 TakeWhile {
23 stream: s,
24 pred: p,
25 pending: None,
26 done_taking: false,
27 }
28 }
29
30 impl<S, P, R> TakeWhile<S, P, R> where S: Stream, R: IntoFuture {
31 /// Acquires a reference to the underlying stream that this combinator is
32 /// pulling from.
get_ref(&self) -> &S33 pub fn get_ref(&self) -> &S {
34 &self.stream
35 }
36
37 /// Acquires a mutable reference to the underlying stream that this
38 /// combinator is pulling from.
39 ///
40 /// Note that care must be taken to avoid tampering with the state of the
41 /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut S42 pub fn get_mut(&mut self) -> &mut S {
43 &mut self.stream
44 }
45
46 /// Consumes this combinator, returning the underlying stream.
47 ///
48 /// Note that this may discard intermediate state of this combinator, so
49 /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> S50 pub fn into_inner(self) -> S {
51 self.stream
52 }
53 }
54
55 // Forwarding impl of Sink from the underlying stream
56 impl<S, P, R> ::sink::Sink for TakeWhile<S, P, R>
57 where S: ::sink::Sink + Stream, R: IntoFuture
58 {
59 type SinkItem = S::SinkItem;
60 type SinkError = S::SinkError;
61
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>62 fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
63 self.stream.start_send(item)
64 }
65
poll_complete(&mut self) -> Poll<(), S::SinkError>66 fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
67 self.stream.poll_complete()
68 }
69
close(&mut self) -> Poll<(), S::SinkError>70 fn close(&mut self) -> Poll<(), S::SinkError> {
71 self.stream.close()
72 }
73 }
74
75 impl<S, P, R> Stream for TakeWhile<S, P, R>
76 where S: Stream,
77 P: FnMut(&S::Item) -> R,
78 R: IntoFuture<Item=bool, Error=S::Error>,
79 {
80 type Item = S::Item;
81 type Error = S::Error;
82
poll(&mut self) -> Poll<Option<S::Item>, S::Error>83 fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
84 if self.done_taking {
85 return Ok(Async::Ready(None));
86 }
87
88 if self.pending.is_none() {
89 let item = match try_ready!(self.stream.poll()) {
90 Some(e) => e,
91 None => return Ok(Async::Ready(None)),
92 };
93 self.pending = Some(((self.pred)(&item).into_future(), item));
94 }
95
96 assert!(self.pending.is_some());
97 match self.pending.as_mut().unwrap().0.poll() {
98 Ok(Async::Ready(true)) => {
99 let (_, item) = self.pending.take().unwrap();
100 Ok(Async::Ready(Some(item)))
101 },
102 Ok(Async::Ready(false)) => {
103 self.done_taking = true;
104 Ok(Async::Ready(None))
105 }
106 Ok(Async::NotReady) => Ok(Async::NotReady),
107 Err(e) => {
108 self.pending = None;
109 Err(e)
110 }
111 }
112 }
113 }
114