1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future, TryFuture}; 4 use futures_core::ready; 5 use futures_core::stream::TryStream; 6 use futures_core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct TryFold<St, Fut, T, F> { 13 #[pin] 14 stream: St, 15 f: F, 16 accum: Option<T>, 17 #[pin] 18 future: Option<Fut>, 19 } 20 } 21 22 impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F> 23 where 24 St: fmt::Debug, 25 Fut: fmt::Debug, 26 T: fmt::Debug, 27 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 29 f.debug_struct("TryFold") 30 .field("stream", &self.stream) 31 .field("accum", &self.accum) 32 .field("future", &self.future) 33 .finish() 34 } 35 } 36 37 impl<St, Fut, T, F> TryFold<St, Fut, T, F> 38 where 39 St: TryStream, 40 F: FnMut(T, St::Ok) -> Fut, 41 Fut: TryFuture<Ok = T, Error = St::Error>, 42 { new(stream: St, f: F, t: T) -> Self43 pub(super) fn new(stream: St, f: F, t: T) -> Self { 44 Self { stream, f, accum: Some(t), future: None } 45 } 46 } 47 48 impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F> 49 where 50 St: TryStream, 51 F: FnMut(T, St::Ok) -> Fut, 52 Fut: TryFuture<Ok = T, Error = St::Error>, 53 { is_terminated(&self) -> bool54 fn is_terminated(&self) -> bool { 55 self.accum.is_none() && self.future.is_none() 56 } 57 } 58 59 impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> 60 where 61 St: TryStream, 62 F: FnMut(T, St::Ok) -> Fut, 63 Fut: TryFuture<Ok = T, Error = St::Error>, 64 { 65 type Output = Result<T, St::Error>; 66 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>67 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 68 let mut this = self.project(); 69 70 Poll::Ready(loop { 71 if let Some(fut) = this.future.as_mut().as_pin_mut() { 72 // we're currently processing a future to produce a new accum value 73 let res = ready!(fut.try_poll(cx)); 74 this.future.set(None); 75 match res { 76 Ok(a) => *this.accum = Some(a), 77 Err(e) => break Err(e), 78 } 79 } else if this.accum.is_some() { 80 // we're waiting on a new item from the stream 81 let res = ready!(this.stream.as_mut().try_poll_next(cx)); 82 let a = this.accum.take().unwrap(); 83 match res { 84 Some(Ok(item)) => this.future.set(Some((this.f)(a, item))), 85 Some(Err(e)) => break Err(e), 86 None => break Ok(a), 87 } 88 } else { 89 panic!("Fold polled after completion") 90 } 91 }) 92 } 93 } 94