1 use crate::stream::Stream; 2 3 use pin_project_lite::pin_project; 4 use std::pin::Pin; 5 use std::task::{Context, Poll}; 6 7 pin_project! { 8 /// Stream returned by [`fuse()`][super::StreamExt::fuse]. 9 #[derive(Debug)] 10 pub struct Fuse<T> { 11 #[pin] 12 stream: Option<T>, 13 } 14 } 15 16 impl<T> Fuse<T> 17 where 18 T: Stream, 19 { new(stream: T) -> Fuse<T>20 pub(crate) fn new(stream: T) -> Fuse<T> { 21 Fuse { 22 stream: Some(stream), 23 } 24 } 25 } 26 27 impl<T> Stream for Fuse<T> 28 where 29 T: Stream, 30 { 31 type Item = T::Item; 32 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>>33 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { 34 let res = match Option::as_pin_mut(self.as_mut().project().stream) { 35 Some(stream) => ready!(stream.poll_next(cx)), 36 None => return Poll::Ready(None), 37 }; 38 39 if res.is_none() { 40 // Do not poll the stream anymore 41 self.as_mut().project().stream.set(None); 42 } 43 44 Poll::Ready(res) 45 } 46 size_hint(&self) -> (usize, Option<usize>)47 fn size_hint(&self) -> (usize, Option<usize>) { 48 match self.stream { 49 Some(ref stream) => stream.size_hint(), 50 None => (0, Some(0)), 51 } 52 } 53 } 54