1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 7 8 /// Stream for the [`skip`](super::StreamExt::skip) method. 9 #[derive(Debug)] 10 #[must_use = "streams do nothing unless polled"] 11 pub struct Skip<St> { 12 stream: St, 13 remaining: u64, 14 } 15 16 impl<St: Unpin> Unpin for Skip<St> {} 17 18 impl<St: Stream> Skip<St> { 19 unsafe_pinned!(stream: St); 20 unsafe_unpinned!(remaining: u64); 21 new(stream: St, n: u64) -> Skip<St>22 pub(super) fn new(stream: St, n: u64) -> Skip<St> { 23 Skip { 24 stream, 25 remaining: n, 26 } 27 } 28 29 /// Acquires a reference to the underlying stream that this combinator is 30 /// pulling from. get_ref(&self) -> &St31 pub fn get_ref(&self) -> &St { 32 &self.stream 33 } 34 35 /// Acquires a mutable reference to the underlying stream that this 36 /// combinator is pulling from. 37 /// 38 /// Note that care must be taken to avoid tampering with the state of the 39 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> &mut St40 pub fn get_mut(&mut self) -> &mut St { 41 &mut self.stream 42 } 43 44 /// Acquires a pinned mutable reference to the underlying stream that this 45 /// combinator is pulling from. 46 /// 47 /// Note that care must be taken to avoid tampering with the state of the 48 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>49 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { 50 self.stream() 51 } 52 53 /// Consumes this combinator, returning the underlying stream. 54 /// 55 /// Note that this may discard intermediate state of this combinator, so 56 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> St57 pub fn into_inner(self) -> St { 58 self.stream 59 } 60 } 61 62 impl<St: FusedStream> FusedStream for Skip<St> { is_terminated(&self) -> bool63 fn is_terminated(&self) -> bool { 64 self.stream.is_terminated() 65 } 66 } 67 68 impl<St: Stream> Stream for Skip<St> { 69 type Item = St::Item; 70 poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>71 fn poll_next( 72 mut self: Pin<&mut Self>, 73 cx: &mut Context<'_>, 74 ) -> Poll<Option<St::Item>> { 75 while self.remaining > 0 { 76 match ready!(self.as_mut().stream().poll_next(cx)) { 77 Some(_) => *self.as_mut().remaining() -= 1, 78 None => return Poll::Ready(None), 79 } 80 } 81 82 self.as_mut().stream().poll_next(cx) 83 } 84 size_hint(&self) -> (usize, Option<usize>)85 fn size_hint(&self) -> (usize, Option<usize>) { 86 let (lower, upper) = self.stream.size_hint(); 87 88 let lower = lower.saturating_sub(self.remaining as usize); 89 let upper = match upper { 90 Some(x) => Some(x.saturating_sub(self.remaining as usize)), 91 None => None, 92 }; 93 94 (lower, upper) 95 } 96 } 97 98 // Forwarding impl of Sink from the underlying stream 99 #[cfg(feature = "sink")] 100 impl<S, Item> Sink<Item> for Skip<S> 101 where 102 S: Stream + Sink<Item>, 103 { 104 type Error = S::Error; 105 106 delegate_sink!(stream, Item); 107 } 108