1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::Future; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 #[cfg(feature = "sink")] 7 use futures_sink::Sink; 8 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 9 10 /// Stream for the [`filter`](super::StreamExt::filter) method. 11 #[must_use = "streams do nothing unless polled"] 12 pub struct Filter<St, Fut, F> 13 where St: Stream, 14 { 15 stream: St, 16 f: F, 17 pending_fut: Option<Fut>, 18 pending_item: Option<St::Item>, 19 } 20 21 impl<St, Fut, F> Unpin for Filter<St, Fut, F> 22 where 23 St: Stream + Unpin, 24 Fut: Unpin, 25 {} 26 27 impl<St, Fut, F> fmt::Debug for Filter<St, Fut, F> 28 where 29 St: Stream + fmt::Debug, 30 St::Item: fmt::Debug, 31 Fut: fmt::Debug, 32 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 34 f.debug_struct("Filter") 35 .field("stream", &self.stream) 36 .field("pending_fut", &self.pending_fut) 37 .field("pending_item", &self.pending_item) 38 .finish() 39 } 40 } 41 42 impl<St, Fut, F> Filter<St, Fut, F> 43 where St: Stream, 44 F: FnMut(&St::Item) -> Fut, 45 Fut: Future<Output = bool>, 46 { 47 unsafe_pinned!(stream: St); 48 unsafe_unpinned!(f: F); 49 unsafe_pinned!(pending_fut: Option<Fut>); 50 unsafe_unpinned!(pending_item: Option<St::Item>); 51 new(stream: St, f: F) -> Filter<St, Fut, F>52 pub(super) fn new(stream: St, f: F) -> Filter<St, Fut, F> { 53 Filter { 54 stream, 55 f, 56 pending_fut: None, 57 pending_item: None, 58 } 59 } 60 61 /// Acquires a reference to the underlying stream that this combinator is 62 /// pulling from. get_ref(&self) -> &St63 pub fn get_ref(&self) -> &St { 64 &self.stream 65 } 66 67 /// Acquires a mutable reference to the underlying stream that this 68 /// combinator is pulling from. 69 /// 70 /// Note that care must be taken to avoid tampering with the state of the 71 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> &mut St72 pub fn get_mut(&mut self) -> &mut St { 73 &mut self.stream 74 } 75 76 /// Acquires a pinned mutable reference to the underlying stream that this 77 /// combinator is pulling from. 78 /// 79 /// Note that care must be taken to avoid tampering with the state of the 80 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>81 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { 82 self.stream() 83 } 84 85 /// Consumes this combinator, returning the underlying stream. 86 /// 87 /// Note that this may discard intermediate state of this combinator, so 88 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> St89 pub fn into_inner(self) -> St { 90 self.stream 91 } 92 } 93 94 impl<St, Fut, F> FusedStream for Filter<St, Fut, F> 95 where St: Stream + FusedStream, 96 F: FnMut(&St::Item) -> Fut, 97 Fut: Future<Output = bool>, 98 { is_terminated(&self) -> bool99 fn is_terminated(&self) -> bool { 100 self.pending_fut.is_none() && self.stream.is_terminated() 101 } 102 } 103 104 impl<St, Fut, F> Stream for Filter<St, Fut, F> 105 where St: Stream, 106 F: FnMut(&St::Item) -> Fut, 107 Fut: Future<Output = bool>, 108 { 109 type Item = St::Item; 110 poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>111 fn poll_next( 112 mut self: Pin<&mut Self>, 113 cx: &mut Context<'_>, 114 ) -> Poll<Option<St::Item>> { 115 loop { 116 if self.pending_fut.is_none() { 117 let item = match ready!(self.as_mut().stream().poll_next(cx)) { 118 Some(e) => e, 119 None => return Poll::Ready(None), 120 }; 121 let fut = (self.as_mut().f())(&item); 122 self.as_mut().pending_fut().set(Some(fut)); 123 *self.as_mut().pending_item() = Some(item); 124 } 125 126 let yield_item = ready!(self.as_mut().pending_fut().as_pin_mut().unwrap().poll(cx)); 127 self.as_mut().pending_fut().set(None); 128 let item = self.as_mut().pending_item().take().unwrap(); 129 130 if yield_item { 131 return Poll::Ready(Some(item)); 132 } 133 } 134 } 135 size_hint(&self) -> (usize, Option<usize>)136 fn size_hint(&self) -> (usize, Option<usize>) { 137 let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; 138 let (_, upper) = self.stream.size_hint(); 139 let upper = match upper { 140 Some(x) => x.checked_add(pending_len), 141 None => None, 142 }; 143 (0, upper) // can't know a lower bound, due to the predicate 144 } 145 } 146 147 // Forwarding impl of Sink from the underlying stream 148 #[cfg(feature = "sink")] 149 impl<S, Fut, F, Item> Sink<Item> for Filter<S, Fut, F> 150 where S: Stream + Sink<Item>, 151 F: FnMut(&S::Item) -> Fut, 152 Fut: Future<Output = bool>, 153 { 154 type Error = S::Error; 155 156 delegate_sink!(stream, Item); 157 } 158