1 //! Asynchronous sinks.
2 //!
3 //! This module contains:
4 //!
5 //! - The [`Sink`] trait, which allows you to asynchronously write data.
6 //! - The [`SinkExt`] trait, which provides adapters for chaining and composing
7 //!   sinks.
8 
9 use crate::future::{assert_future, Either};
10 use core::pin::Pin;
11 use futures_core::future::Future;
12 use futures_core::stream::{Stream, TryStream};
13 use futures_core::task::{Context, Poll};
14 
15 #[cfg(feature = "compat")]
16 use crate::compat::CompatSink;
17 
18 pub use futures_sink::Sink;
19 
20 mod close;
21 pub use self::close::Close;
22 
23 mod drain;
24 pub use self::drain::{drain, Drain};
25 
26 mod fanout;
27 pub use self::fanout::Fanout;
28 
29 mod feed;
30 pub use self::feed::Feed;
31 
32 mod flush;
33 pub use self::flush::Flush;
34 
35 mod err_into;
36 pub use self::err_into::SinkErrInto;
37 
38 mod map_err;
39 pub use self::map_err::SinkMapErr;
40 
41 mod send;
42 pub use self::send::Send;
43 
44 mod send_all;
45 pub use self::send_all::SendAll;
46 
47 mod unfold;
48 pub use self::unfold::{unfold, Unfold};
49 
50 mod with;
51 pub use self::with::With;
52 
53 mod with_flat_map;
54 pub use self::with_flat_map::WithFlatMap;
55 
56 #[cfg(feature = "alloc")]
57 mod buffer;
58 #[cfg(feature = "alloc")]
59 pub use self::buffer::Buffer;
60 
61 impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
62 
63 /// An extension trait for `Sink`s that provides a variety of convenient
64 /// combinator functions.
65 pub trait SinkExt<Item>: Sink<Item> {
66     /// Composes a function *in front of* the sink.
67     ///
68     /// This adapter produces a new sink that passes each value through the
69     /// given function `f` before sending it to `self`.
70     ///
71     /// To process each value, `f` produces a *future*, which is then polled to
72     /// completion before passing its result down to the underlying sink. If the
73     /// future produces an error, that error is returned by the new sink.
74     ///
75     /// Note that this function consumes the given sink, returning a wrapped
76     /// version, much like `Iterator::map`.
with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> where F: FnMut(U) -> Fut, Fut: Future<Output = Result<Item, E>>, E: From<Self::Error>, Self: Sized,77     fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
78     where
79         F: FnMut(U) -> Fut,
80         Fut: Future<Output = Result<Item, E>>,
81         E: From<Self::Error>,
82         Self: Sized,
83     {
84         assert_sink::<U, E, _>(With::new(self, f))
85     }
86 
87     /// Composes a function *in front of* the sink.
88     ///
89     /// This adapter produces a new sink that passes each value through the
90     /// given function `f` before sending it to `self`.
91     ///
92     /// To process each value, `f` produces a *stream*, of which each value
93     /// is passed to the underlying sink. A new value will not be accepted until
94     /// the stream has been drained
95     ///
96     /// Note that this function consumes the given sink, returning a wrapped
97     /// version, much like `Iterator::flat_map`.
98     ///
99     /// # Examples
100     ///
101     /// ```
102     /// # futures::executor::block_on(async {
103     /// use futures::channel::mpsc;
104     /// use futures::sink::SinkExt;
105     /// use futures::stream::{self, StreamExt};
106     ///
107     /// let (tx, rx) = mpsc::channel(5);
108     ///
109     /// let mut tx = tx.with_flat_map(|x| {
110     ///     stream::iter(vec![Ok(42); x])
111     /// });
112     ///
113     /// tx.send(5).await.unwrap();
114     /// drop(tx);
115     /// let received: Vec<i32> = rx.collect().await;
116     /// assert_eq!(received, vec![42, 42, 42, 42, 42]);
117     /// # });
118     /// ```
with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> where F: FnMut(U) -> St, St: Stream<Item = Result<Item, Self::Error>>, Self: Sized,119     fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
120     where
121         F: FnMut(U) -> St,
122         St: Stream<Item = Result<Item, Self::Error>>,
123         Self: Sized,
124     {
125         assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f))
126     }
127 
128     /*
129     fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
130         where F: FnMut(U) -> Self::SinkItem,
131               Self: Sized;
132 
133     fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
134         where F: FnMut(Self::SinkItem) -> bool,
135               Self: Sized;
136 
137     fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
138         where F: FnMut(U) -> Option<Self::SinkItem>,
139               Self: Sized;
140      */
141 
142     /// Transforms the error returned by the sink.
sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> where F: FnOnce(Self::Error) -> E, Self: Sized,143     fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
144     where
145         F: FnOnce(Self::Error) -> E,
146         Self: Sized,
147     {
148         assert_sink::<Item, E, _>(SinkMapErr::new(self, f))
149     }
150 
151     /// Map this sink's error to a different error type using the `Into` trait.
152     ///
153     /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E> where Self: Sized, Self::Error: Into<E>,154     fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
155     where
156         Self: Sized,
157         Self::Error: Into<E>,
158     {
159         assert_sink::<Item, E, _>(SinkErrInto::new(self))
160     }
161 
162     /// Adds a fixed-size buffer to the current sink.
163     ///
164     /// The resulting sink will buffer up to `capacity` items when the
165     /// underlying sink is unwilling to accept additional items. Calling `flush`
166     /// on the buffered sink will attempt to both empty the buffer and complete
167     /// processing on the underlying sink.
168     ///
169     /// Note that this function consumes the given sink, returning a wrapped
170     /// version, much like `Iterator::map`.
171     ///
172     /// This method is only available when the `std` or `alloc` feature of this
173     /// library is activated, and it is activated by default.
174     #[cfg(feature = "alloc")]
buffer(self, capacity: usize) -> Buffer<Self, Item> where Self: Sized,175     fn buffer(self, capacity: usize) -> Buffer<Self, Item>
176     where
177         Self: Sized,
178     {
179         assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity))
180     }
181 
182     /// Close the sink.
close(&mut self) -> Close<'_, Self, Item> where Self: Unpin,183     fn close(&mut self) -> Close<'_, Self, Item>
184     where
185         Self: Unpin,
186     {
187         assert_future::<Result<(), Self::Error>, _>(Close::new(self))
188     }
189 
190     /// Fanout items to multiple sinks.
191     ///
192     /// This adapter clones each incoming item and forwards it to both this as well as
193     /// the other sink at the same time.
fanout<Si>(self, other: Si) -> Fanout<Self, Si> where Self: Sized, Item: Clone, Si: Sink<Item, Error = Self::Error>,194     fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
195     where
196         Self: Sized,
197         Item: Clone,
198         Si: Sink<Item, Error = Self::Error>,
199     {
200         assert_sink::<Item, Self::Error, _>(Fanout::new(self, other))
201     }
202 
203     /// Flush the sink, processing all pending items.
204     ///
205     /// This adapter is intended to be used when you want to stop sending to the sink
206     /// until all current requests are processed.
flush(&mut self) -> Flush<'_, Self, Item> where Self: Unpin,207     fn flush(&mut self) -> Flush<'_, Self, Item>
208     where
209         Self: Unpin,
210     {
211         assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
212     }
213 
214     /// A future that completes after the given item has been fully processed
215     /// into the sink, including flushing.
216     ///
217     /// Note that, **because of the flushing requirement, it is usually better
218     /// to batch together items to send via `feed` or `send_all`,
219     /// rather than flushing between each item.**
send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin,220     fn send(&mut self, item: Item) -> Send<'_, Self, Item>
221     where
222         Self: Unpin,
223     {
224         assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
225     }
226 
227     /// A future that completes after the given item has been received
228     /// by the sink.
229     ///
230     /// Unlike `send`, the returned future does not flush the sink.
231     /// It is the caller's responsibility to ensure all pending items
232     /// are processed, which can be done via `flush` or `close`.
feed(&mut self, item: Item) -> Feed<'_, Self, Item> where Self: Unpin,233     fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
234     where
235         Self: Unpin,
236     {
237         assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
238     }
239 
240     /// A future that completes after the given stream has been fully processed
241     /// into the sink, including flushing.
242     ///
243     /// This future will drive the stream to keep producing items until it is
244     /// exhausted, sending each item to the sink. It will complete once both the
245     /// stream is exhausted, the sink has received all items, and the sink has
246     /// been flushed. Note that the sink is **not** closed. If the stream produces
247     /// an error, that error will be returned by this future without flushing the sink.
248     ///
249     /// Doing `sink.send_all(stream)` is roughly equivalent to
250     /// `stream.forward(sink)`. The returned future will exhaust all items from
251     /// `stream` and send them to `self`.
send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, Self: Unpin,252     fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
253     where
254         St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
255         // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
256         Self: Unpin,
257     {
258         // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
259         // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
260         SendAll::new(self, stream)
261     }
262 
263     /// Wrap this sink in an `Either` sink, making it the left-hand variant
264     /// of that `Either`.
265     ///
266     /// This can be used in combination with the `right_sink` method to write `if`
267     /// statements that evaluate to different streams in different branches.
left_sink<Si2>(self) -> Either<Self, Si2> where Si2: Sink<Item, Error = Self::Error>, Self: Sized,268     fn left_sink<Si2>(self) -> Either<Self, Si2>
269     where
270         Si2: Sink<Item, Error = Self::Error>,
271         Self: Sized,
272     {
273         assert_sink::<Item, Self::Error, _>(Either::Left(self))
274     }
275 
276     /// Wrap this stream in an `Either` stream, making it the right-hand variant
277     /// of that `Either`.
278     ///
279     /// This can be used in combination with the `left_sink` method to write `if`
280     /// statements that evaluate to different streams in different branches.
right_sink<Si1>(self) -> Either<Si1, Self> where Si1: Sink<Item, Error = Self::Error>, Self: Sized,281     fn right_sink<Si1>(self) -> Either<Si1, Self>
282     where
283         Si1: Sink<Item, Error = Self::Error>,
284         Self: Sized,
285     {
286         assert_sink::<Item, Self::Error, _>(Either::Right(self))
287     }
288 
289     /// Wraps a [`Sink`] into a sink compatible with libraries using
290     /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
291     #[cfg(feature = "compat")]
292     #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
compat(self) -> CompatSink<Self, Item> where Self: Sized + Unpin,293     fn compat(self) -> CompatSink<Self, Item>
294     where
295         Self: Sized + Unpin,
296     {
297         CompatSink::new(self)
298     }
299 
300     /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
301     /// sink types.
poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> where Self: Unpin,302     fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
303     where
304         Self: Unpin,
305     {
306         Pin::new(self).poll_ready(cx)
307     }
308 
309     /// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
310     /// sink types.
start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> where Self: Unpin,311     fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
312     where
313         Self: Unpin,
314     {
315         Pin::new(self).start_send(item)
316     }
317 
318     /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
319     /// sink types.
poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> where Self: Unpin,320     fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
321     where
322         Self: Unpin,
323     {
324         Pin::new(self).poll_flush(cx)
325     }
326 
327     /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
328     /// sink types.
poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> where Self: Unpin,329     fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
330     where
331         Self: Unpin,
332     {
333         Pin::new(self).poll_close(cx)
334     }
335 }
336 
337 // Just a helper function to ensure the sinks we're returning all have the
338 // right implementations.
assert_sink<T, E, S>(sink: S) -> S where S: Sink<T, Error = E>,339 pub(crate) fn assert_sink<T, E, S>(sink: S) -> S
340 where
341     S: Sink<T, Error = E>,
342 {
343     sink
344 }
345