1 use {Async, Poll};
2 use stream::{Stream, Fuse};
3 
4 /// A `Stream` that implements a `peek` method.
5 ///
6 /// The `peek` method can be used to retrieve a reference
7 /// to the next `Stream::Item` if available. A subsequent
8 /// call to `poll` will return the owned item.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Peekable<S: Stream> {
12     stream: Fuse<S>,
13     peeked: Option<S::Item>,
14 }
15 
16 
new<S: Stream>(stream: S) -> Peekable<S>17 pub fn new<S: Stream>(stream: S) -> Peekable<S> {
18     Peekable {
19         stream: stream.fuse(),
20         peeked: None
21     }
22 }
23 
24 // Forwarding impl of Sink from the underlying stream
25 impl<S> ::sink::Sink for Peekable<S>
26     where S: ::sink::Sink + Stream
27 {
28     type SinkItem = S::SinkItem;
29     type SinkError = S::SinkError;
30 
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>31     fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
32         self.stream.start_send(item)
33     }
34 
poll_complete(&mut self) -> Poll<(), S::SinkError>35     fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
36         self.stream.poll_complete()
37     }
38 
close(&mut self) -> Poll<(), S::SinkError>39     fn close(&mut self) -> Poll<(), S::SinkError> {
40         self.stream.close()
41     }
42 }
43 
44 impl<S: Stream> Stream for Peekable<S> {
45     type Item = S::Item;
46     type Error = S::Error;
47 
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>48     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
49         if let Some(item) = self.peeked.take() {
50             return Ok(Async::Ready(Some(item)))
51         }
52         self.stream.poll()
53     }
54 }
55 
56 
57 impl<S: Stream> Peekable<S> {
58     /// Peek retrieves a reference to the next item in the stream.
59     ///
60     /// This method polls the underlying stream and return either a reference
61     /// to the next item if the stream is ready or passes through any errors.
peek(&mut self) -> Poll<Option<&S::Item>, S::Error>62     pub fn peek(&mut self) -> Poll<Option<&S::Item>, S::Error> {
63         if self.peeked.is_some() {
64             return Ok(Async::Ready(self.peeked.as_ref()))
65         }
66         match try_ready!(self.poll()) {
67             None => Ok(Async::Ready(None)),
68             Some(item) => {
69                 self.peeked = Some(item);
70                 Ok(Async::Ready(self.peeked.as_ref()))
71             }
72         }
73     }
74 }
75