1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future, TryFuture}; 4 use futures_core::stream::TryStream; 5 use futures_core::task::{Context, Poll}; 6 use pin_project::{pin_project, project}; 7 8 /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. 9 #[pin_project] 10 #[must_use = "futures do nothing unless you `.await` or poll them"] 11 pub struct TryFold<St, Fut, T, F> { 12 #[pin] 13 stream: St, 14 f: F, 15 accum: Option<T>, 16 #[pin] 17 future: Option<Fut>, 18 } 19 20 impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F> 21 where 22 St: fmt::Debug, 23 Fut: fmt::Debug, 24 T: fmt::Debug, 25 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 27 f.debug_struct("TryFold") 28 .field("stream", &self.stream) 29 .field("accum", &self.accum) 30 .field("future", &self.future) 31 .finish() 32 } 33 } 34 35 impl<St, Fut, T, F> TryFold<St, Fut, T, F> 36 where St: TryStream, 37 F: FnMut(T, St::Ok) -> Fut, 38 Fut: TryFuture<Ok = T, Error = St::Error>, 39 { new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F>40 pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> { 41 TryFold { 42 stream, 43 f, 44 accum: Some(t), 45 future: None, 46 } 47 } 48 } 49 50 impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F> 51 where St: TryStream, 52 F: FnMut(T, St::Ok) -> Fut, 53 Fut: TryFuture<Ok = T, Error = St::Error>, 54 { is_terminated(&self) -> bool55 fn is_terminated(&self) -> bool { 56 self.accum.is_none() && self.future.is_none() 57 } 58 } 59 60 impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> 61 where St: TryStream, 62 F: FnMut(T, St::Ok) -> Fut, 63 Fut: TryFuture<Ok = T, Error = St::Error>, 64 { 65 type Output = Result<T, St::Error>; 66 67 #[project] poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>68 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 69 #[project] 70 let TryFold { mut stream, f, accum, mut future } = self.project(); 71 Poll::Ready(loop { 72 if let Some(fut) = future.as_mut().as_pin_mut() { 73 // we're currently processing a future to produce a new accum value 74 let res = ready!(fut.try_poll(cx)); 75 future.set(None); 76 match res { 77 Ok(a) => *accum = Some(a), 78 Err(e) => break Err(e), 79 } 80 } else if accum.is_some() { 81 // we're waiting on a new item from the stream 82 let res = ready!(stream.as_mut().try_poll_next(cx)); 83 let a = accum.take().unwrap(); 84 match res { 85 Some(Ok(item)) => future.set(Some(f(a, item))), 86 Some(Err(e)) => break Err(e), 87 None => break Ok(a), 88 } 89 } else { 90 panic!("Fold polled after completion") 91 } 92 }) 93 } 94 } 95