1 use super::assert_sink;
2 use crate::unfold_state::UnfoldState;
3 use core::{future::Future, pin::Pin};
4 use futures_core::ready;
5 use futures_core::task::{Context, Poll};
6 use futures_sink::Sink;
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Sink for the [`unfold`] function.
11     #[derive(Debug)]
12     #[must_use = "sinks do nothing unless polled"]
13     pub struct Unfold<T, F, R> {
14         function: F,
15         #[pin]
16         state: UnfoldState<T, R>,
17     }
18 }
19 
20 /// Create a sink from a function which processes one item at a time.
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// # futures::executor::block_on(async {
26 /// use futures::sink::{self, SinkExt};
27 ///
28 /// let unfold = sink::unfold(0, |mut sum, i: i32| {
29 ///     async move {
30 ///         sum += i;
31 ///         eprintln!("{}", i);
32 ///         Ok::<_, futures::never::Never>(sum)
33 ///     }
34 /// });
35 /// futures::pin_mut!(unfold);
36 /// unfold.send(5).await?;
37 /// # Ok::<(), futures::never::Never>(()) }).unwrap();
38 /// ```
unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R> where F: FnMut(T, Item) -> R, R: Future<Output = Result<T, E>>,39 pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
40 where
41     F: FnMut(T, Item) -> R,
42     R: Future<Output = Result<T, E>>,
43 {
44     assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
45 }
46 
47 impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
48 where
49     F: FnMut(T, Item) -> R,
50     R: Future<Output = Result<T, E>>,
51 {
52     type Error = E;
53 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>54     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55         self.poll_flush(cx)
56     }
57 
start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>58     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
59         let mut this = self.project();
60         let future = match this.state.as_mut().take_value() {
61             Some(value) => (this.function)(value, item),
62             None => panic!("start_send called without poll_ready being called first"),
63         };
64         this.state.set(UnfoldState::Future { future });
65         Ok(())
66     }
67 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>68     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
69         let mut this = self.project();
70         Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
71             match ready!(future.poll(cx)) {
72                 Ok(state) => {
73                     this.state.set(UnfoldState::Value { value: state });
74                     Ok(())
75                 }
76                 Err(err) => Err(err),
77             }
78         } else {
79             Ok(())
80         })
81     }
82 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>83     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84         self.poll_flush(cx)
85     }
86 }
87