1 use core::fmt; 2 use core::marker::PhantomData; 3 use core::pin::Pin; 4 use futures_core::future::Future; 5 use futures_core::ready; 6 use futures_core::stream::Stream; 7 use futures_core::task::{Context, Poll}; 8 use futures_sink::Sink; 9 use pin_project_lite::pin_project; 10 11 pin_project! { 12 /// Sink for the [`with`](super::SinkExt::with) method. 13 #[must_use = "sinks do nothing unless polled"] 14 pub struct With<Si, Item, U, Fut, F> { 15 #[pin] 16 sink: Si, 17 f: F, 18 #[pin] 19 state: Option<Fut>, 20 _phantom: PhantomData<fn(U) -> Item>, 21 } 22 } 23 24 impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F> 25 where 26 Si: fmt::Debug, 27 Fut: fmt::Debug, 28 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 f.debug_struct("With").field("sink", &self.sink).field("state", &self.state).finish() 31 } 32 } 33 34 impl<Si, Item, U, Fut, F> With<Si, Item, U, Fut, F> 35 where 36 Si: Sink<Item>, 37 F: FnMut(U) -> Fut, 38 Fut: Future, 39 { new<E>(sink: Si, f: F) -> Self where Fut: Future<Output = Result<Item, E>>, E: From<Si::Error>,40 pub(super) fn new<E>(sink: Si, f: F) -> Self 41 where 42 Fut: Future<Output = Result<Item, E>>, 43 E: From<Si::Error>, 44 { 45 Self { state: None, sink, f, _phantom: PhantomData } 46 } 47 } 48 49 impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F> 50 where 51 Si: Clone, 52 F: Clone, 53 Fut: Clone, 54 { clone(&self) -> Self55 fn clone(&self) -> Self { 56 Self { 57 state: self.state.clone(), 58 sink: self.sink.clone(), 59 f: self.f.clone(), 60 _phantom: PhantomData, 61 } 62 } 63 } 64 65 // Forwarding impl of Stream from the underlying sink 66 impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F> 67 where 68 S: Stream + Sink<Item>, 69 F: FnMut(U) -> Fut, 70 Fut: Future, 71 { 72 type Item = S::Item; 73 74 delegate_stream!(sink); 75 } 76 77 impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F> 78 where 79 Si: Sink<Item>, 80 F: FnMut(U) -> Fut, 81 Fut: Future<Output = Result<Item, E>>, 82 E: From<Si::Error>, 83 { 84 delegate_access_inner!(sink, Si, ()); 85 86 /// Completes the processing of previous item if any. poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), E>>87 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), E>> { 88 let mut this = self.project(); 89 90 let item = match this.state.as_mut().as_pin_mut() { 91 None => return Poll::Ready(Ok(())), 92 Some(fut) => ready!(fut.poll(cx))?, 93 }; 94 this.state.set(None); 95 this.sink.start_send(item)?; 96 Poll::Ready(Ok(())) 97 } 98 } 99 100 impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F> 101 where 102 Si: Sink<Item>, 103 F: FnMut(U) -> Fut, 104 Fut: Future<Output = Result<Item, E>>, 105 E: From<Si::Error>, 106 { 107 type Error = E; 108 poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>109 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 110 ready!(self.as_mut().poll(cx))?; 111 ready!(self.project().sink.poll_ready(cx)?); 112 Poll::Ready(Ok(())) 113 } 114 start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error>115 fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { 116 let mut this = self.project(); 117 118 assert!(this.state.is_none()); 119 this.state.set(Some((this.f)(item))); 120 Ok(()) 121 } 122 poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>123 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 124 ready!(self.as_mut().poll(cx))?; 125 ready!(self.project().sink.poll_flush(cx)?); 126 Poll::Ready(Ok(())) 127 } 128 poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>129 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 130 ready!(self.as_mut().poll(cx))?; 131 ready!(self.project().sink.poll_close(cx)?); 132 Poll::Ready(Ok(())) 133 } 134 } 135