1 use {Async, Poll};
2 use stream::{Stream, Fuse};
3
4 /// A `Stream` that implements a `peek` method.
5 ///
6 /// The `peek` method can be used to retrieve a reference
7 /// to the next `Stream::Item` if available. A subsequent
8 /// call to `poll` will return the owned item.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Peekable<S: Stream> {
12 stream: Fuse<S>,
13 peeked: Option<S::Item>,
14 }
15
16
new<S: Stream>(stream: S) -> Peekable<S>17 pub fn new<S: Stream>(stream: S) -> Peekable<S> {
18 Peekable {
19 stream: stream.fuse(),
20 peeked: None
21 }
22 }
23
24 // Forwarding impl of Sink from the underlying stream
25 impl<S> ::sink::Sink for Peekable<S>
26 where S: ::sink::Sink + Stream
27 {
28 type SinkItem = S::SinkItem;
29 type SinkError = S::SinkError;
30
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>31 fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
32 self.stream.start_send(item)
33 }
34
poll_complete(&mut self) -> Poll<(), S::SinkError>35 fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
36 self.stream.poll_complete()
37 }
38
close(&mut self) -> Poll<(), S::SinkError>39 fn close(&mut self) -> Poll<(), S::SinkError> {
40 self.stream.close()
41 }
42 }
43
44 impl<S: Stream> Stream for Peekable<S> {
45 type Item = S::Item;
46 type Error = S::Error;
47
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>48 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
49 if let Some(item) = self.peeked.take() {
50 return Ok(Async::Ready(Some(item)))
51 }
52 self.stream.poll()
53 }
54 }
55
56
57 impl<S: Stream> Peekable<S> {
58 /// Peek retrieves a reference to the next item in the stream.
59 ///
60 /// This method polls the underlying stream and return either a reference
61 /// to the next item if the stream is ready or passes through any errors.
peek(&mut self) -> Poll<Option<&S::Item>, S::Error>62 pub fn peek(&mut self) -> Poll<Option<&S::Item>, S::Error> {
63 if self.peeked.is_some() {
64 return Ok(Async::Ready(self.peeked.as_ref()))
65 }
66 match try_ready!(self.poll()) {
67 None => Ok(Async::Ready(None)),
68 Some(item) => {
69 self.peeked = Some(item);
70 Ok(Async::Ready(self.peeked.as_ref()))
71 }
72 }
73 }
74 }
75