1 //! Asynchronous sinks
2 //!
3 //! This module contains the `Sink` trait, along with a number of adapter types
4 //! for it. An overview is available in the documentation for the trait itself.
5 //!
6 //! You can find more information/tutorials about streams [online at
7 //! https://tokio.rs][online]
8 //!
9 //! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
10 
11 use {IntoFuture, Poll, StartSend};
12 use stream::Stream;
13 
14 mod with;
15 mod with_flat_map;
16 // mod with_map;
17 // mod with_filter;
18 // mod with_filter_map;
19 mod flush;
20 mod from_err;
21 mod send;
22 mod send_all;
23 mod map_err;
24 mod fanout;
25 
26 if_std! {
27     mod buffer;
28     mod wait;
29 
30     pub use self::buffer::Buffer;
31     pub use self::wait::Wait;
32 
33     // TODO: consider expanding this via e.g. FromIterator
34     impl<T> Sink for ::std::vec::Vec<T> {
35         type SinkItem = T;
36         type SinkError = (); // Change this to ! once it stabilizes
37 
38         fn start_send(&mut self, item: Self::SinkItem)
39                       -> StartSend<Self::SinkItem, Self::SinkError>
40         {
41             self.push(item);
42             Ok(::AsyncSink::Ready)
43         }
44 
45         fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
46             Ok(::Async::Ready(()))
47         }
48 
49         fn close(&mut self) -> Poll<(), Self::SinkError> {
50             Ok(::Async::Ready(()))
51         }
52     }
53 
54     /// A type alias for `Box<Sink + Send>`
55     pub type BoxSink<T, E> = ::std::boxed::Box<Sink<SinkItem = T, SinkError = E> +
56                                                ::core::marker::Send>;
57 
58     impl<S: ?Sized + Sink> Sink for ::std::boxed::Box<S> {
59         type SinkItem = S::SinkItem;
60         type SinkError = S::SinkError;
61 
62         fn start_send(&mut self, item: Self::SinkItem)
63                       -> StartSend<Self::SinkItem, Self::SinkError> {
64             (**self).start_send(item)
65         }
66 
67         fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
68             (**self).poll_complete()
69         }
70 
71         fn close(&mut self) -> Poll<(), Self::SinkError> {
72             (**self).close()
73         }
74     }
75 }
76 
77 pub use self::with::With;
78 pub use self::with_flat_map::WithFlatMap;
79 pub use self::flush::Flush;
80 pub use self::send::Send;
81 pub use self::send_all::SendAll;
82 pub use self::map_err::SinkMapErr;
83 pub use self::from_err::SinkFromErr;
84 pub use self::fanout::Fanout;
85 
86 /// A `Sink` is a value into which other values can be sent, asynchronously.
87 ///
88 /// Basic examples of sinks include the sending side of:
89 ///
90 /// - Channels
91 /// - Sockets
92 /// - Pipes
93 ///
94 /// In addition to such "primitive" sinks, it's typical to layer additional
95 /// functionality, such as buffering, on top of an existing sink.
96 ///
97 /// Sending to a sink is "asynchronous" in the sense that the value may not be
98 /// sent in its entirety immediately. Instead, values are sent in a two-phase
99 /// way: first by initiating a send, and then by polling for completion. This
100 /// two-phase setup is analogous to buffered writing in synchronous code, where
101 /// writes often succeed immediately, but internally are buffered and are
102 /// *actually* written only upon flushing.
103 ///
104 /// In addition, the `Sink` may be *full*, in which case it is not even possible
105 /// to start the sending process.
106 ///
107 /// As with `Future` and `Stream`, the `Sink` trait is built from a few core
108 /// required methods, and a host of default methods for working in a
109 /// higher-level way. The `Sink::send_all` combinator is of particular
110 /// importance: you can use it to send an entire stream to a sink, which is
111 /// the simplest way to ultimately consume a sink.
112 ///
113 /// You can find more information/tutorials about streams [online at
114 /// https://tokio.rs][online]
115 ///
116 /// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
117 pub trait Sink {
118     /// The type of value that the sink accepts.
119     type SinkItem;
120 
121     /// The type of value produced by the sink when an error occurs.
122     type SinkError;
123 
124     /// Begin the process of sending a value to the sink.
125     ///
126     /// As the name suggests, this method only *begins* the process of sending
127     /// the item. If the sink employs buffering, the item isn't fully processed
128     /// until the buffer is fully flushed. Since sinks are designed to work with
129     /// asynchronous I/O, the process of actually writing out the data to an
130     /// underlying object takes place asynchronously. **You *must* use
131     /// `poll_complete` in order to drive completion of a send**. In particular,
132     /// `start_send` does not begin the flushing process
133     ///
134     /// # Return value
135     ///
136     /// This method returns `AsyncSink::Ready` if the sink was able to start
137     /// sending `item`. In that case, you *must* ensure that you call
138     /// `poll_complete` to process the sent item to completion. Note, however,
139     /// that several calls to `start_send` can be made prior to calling
140     /// `poll_complete`, which will work on completing all pending items.
141     ///
142     /// The method returns `AsyncSink::NotReady` if the sink was unable to begin
143     /// sending, usually due to being full. The sink must have attempted to
144     /// complete processing any outstanding requests (equivalent to
145     /// `poll_complete`) before yielding this result. The current task will be
146     /// automatically scheduled for notification when the sink may be ready to
147     /// receive new values.
148     ///
149     /// # Errors
150     ///
151     /// If the sink encounters an error other than being temporarily full, it
152     /// uses the `Err` variant to signal that error. In most cases, such errors
153     /// mean that the sink will permanently be unable to receive items.
154     ///
155     /// # Panics
156     ///
157     /// This method may panic in a few situations, depending on the specific
158     /// sink:
159     ///
160     /// - It is called outside of the context of a task.
161     /// - A previous call to `start_send` or `poll_complete` yielded an error.
start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>162     fn start_send(&mut self, item: Self::SinkItem)
163                   -> StartSend<Self::SinkItem, Self::SinkError>;
164 
165     /// Flush all output from this sink, if necessary.
166     ///
167     /// Some sinks may buffer intermediate data as an optimization to improve
168     /// throughput. In other words, if a sink has a corresponding receiver then
169     /// a successful `start_send` above may not guarantee that the value is
170     /// actually ready to be received by the receiver. This function is intended
171     /// to be used to ensure that values do indeed make their way to the
172     /// receiver.
173     ///
174     /// This function will attempt to process any pending requests on behalf of
175     /// the sink and drive it to completion.
176     ///
177     /// # Return value
178     ///
179     /// Returns `Ok(Async::Ready(()))` when no buffered items remain. If this
180     /// value is returned then it is guaranteed that all previous values sent
181     /// via `start_send` will be guaranteed to be available to a listening
182     /// receiver.
183     ///
184     /// Returns `Ok(Async::NotReady)` if there is more work left to do, in which
185     /// case the current task is scheduled to wake up when more progress may be
186     /// possible.
187     ///
188     /// # Errors
189     ///
190     /// Returns `Err` if the sink encounters an error while processing one of
191     /// its pending requests. Due to the buffered nature of requests, it is not
192     /// generally possible to correlate the error with a particular request. As
193     /// with `start_send`, these errors are generally "fatal" for continued use
194     /// of the sink.
195     ///
196     /// # Panics
197     ///
198     /// This method may panic in a few situations, depending on the specific sink:
199     ///
200     /// - It is called outside of the context of a task.
201     /// - A previous call to `start_send` or `poll_complete` yielded an error.
202     ///
203     /// # Compatibility nodes
204     ///
205     /// The name of this method may be slightly misleading as the original
206     /// intention was to have this method be more general than just flushing
207     /// requests. Over time though it was decided to trim back the ambitions of
208     /// this method to what it's always done, just flushing.
209     ///
210     /// In the 0.2 release series of futures this method will be renamed to
211     /// `poll_flush`. For 0.1, however, the breaking change is not happening
212     /// yet.
poll_complete(&mut self) -> Poll<(), Self::SinkError>213     fn poll_complete(&mut self) -> Poll<(), Self::SinkError>;
214 
215     /// A method to indicate that no more values will ever be pushed into this
216     /// sink.
217     ///
218     /// This method is used to indicate that a sink will no longer even be given
219     /// another value by the caller. That is, the `start_send` method above will
220     /// be called no longer (nor `poll_complete`). This method is intended to
221     /// model "graceful shutdown" in various protocols where the intent to shut
222     /// down is followed by a little more blocking work.
223     ///
224     /// Callers of this function should work it it in a similar fashion to
225     /// `poll_complete`. Once called it may return `NotReady` which indicates
226     /// that more external work needs to happen to make progress. The current
227     /// task will be scheduled to receive a notification in such an event,
228     /// however.
229     ///
230     /// Note that this function will imply `poll_complete` above. That is, if a
231     /// sink has buffered data, then it'll be flushed out during a `close`
232     /// operation. It is not necessary to have `poll_complete` return `Ready`
233     /// before a `close` is called. Once a `close` is called, though,
234     /// `poll_complete` cannot be called.
235     ///
236     /// # Return value
237     ///
238     /// This function, like `poll_complete`, returns a `Poll`. The value is
239     /// `Ready` once the close operation has completed. At that point it should
240     /// be safe to drop the sink and deallocate associated resources.
241     ///
242     /// If the value returned is `NotReady` then the sink is not yet closed and
243     /// work needs to be done to close it. The work has been scheduled and the
244     /// current task will receive a notification when it's next ready to call
245     /// this method again.
246     ///
247     /// Finally, this function may also return an error.
248     ///
249     /// # Errors
250     ///
251     /// This function will return an `Err` if any operation along the way during
252     /// the close operation fails. An error typically is fatal for a sink and is
253     /// unable to be recovered from, but in specific situations this may not
254     /// always be true.
255     ///
256     /// Note that it's also typically an error to call `start_send` or
257     /// `poll_complete` after the `close` function is called. This method will
258     /// *initiate* a close, and continuing to send values after that (or attempt
259     /// to flush) may result in strange behavior, panics, errors, etc. Once this
260     /// method is called, it must be the only method called on this `Sink`.
261     ///
262     /// # Panics
263     ///
264     /// This method may panic or cause panics if:
265     ///
266     /// * It is called outside the context of a future's task
267     /// * It is called and then `start_send` or `poll_complete` is called
268     ///
269     /// # Compatibility notes
270     ///
271     /// Note that this function is currently by default a provided function,
272     /// defaulted to calling `poll_complete` above. This function was added
273     /// in the 0.1 series of the crate as a backwards-compatible addition. It
274     /// is intended that in the 0.2 series the method will no longer be a
275     /// default method.
276     ///
277     /// It is highly recommended to consider this method a required method and
278     /// to implement it whenever you implement `Sink` locally. It is especially
279     /// crucial to be sure to close inner sinks, if applicable.
280     #[cfg(feature = "with-deprecated")]
close(&mut self) -> Poll<(), Self::SinkError>281     fn close(&mut self) -> Poll<(), Self::SinkError> {
282         self.poll_complete()
283     }
284 
285     /// dox (you should see the above, not this)
286     #[cfg(not(feature = "with-deprecated"))]
close(&mut self) -> Poll<(), Self::SinkError>287     fn close(&mut self) -> Poll<(), Self::SinkError>;
288 
289     /// Creates a new object which will produce a synchronous sink.
290     ///
291     /// The sink returned does **not** implement the `Sink` trait, and instead
292     /// only has two methods: `send` and `flush`. These two methods correspond
293     /// to `start_send` and `poll_complete` above except are executed in a
294     /// blocking fashion.
295     #[cfg(feature = "use_std")]
wait(self) -> Wait<Self> where Self: Sized296     fn wait(self) -> Wait<Self>
297         where Self: Sized
298     {
299         wait::new(self)
300     }
301 
302     /// Composes a function *in front of* the sink.
303     ///
304     /// This adapter produces a new sink that passes each value through the
305     /// given function `f` before sending it to `self`.
306     ///
307     /// To process each value, `f` produces a *future*, which is then polled to
308     /// completion before passing its result down to the underlying sink. If the
309     /// future produces an error, that error is returned by the new sink.
310     ///
311     /// Note that this function consumes the given sink, returning a wrapped
312     /// version, much like `Iterator::map`.
with<U, F, Fut>(self, f: F) -> With<Self, U, F, Fut> where F: FnMut(U) -> Fut, Fut: IntoFuture<Item = Self::SinkItem>, Fut::Error: From<Self::SinkError>, Self: Sized313     fn with<U, F, Fut>(self, f: F) -> With<Self, U, F, Fut>
314         where F: FnMut(U) -> Fut,
315               Fut: IntoFuture<Item = Self::SinkItem>,
316               Fut::Error: From<Self::SinkError>,
317               Self: Sized
318     {
319         with::new(self, f)
320     }
321 
322     /// Composes a function *in front of* the sink.
323     ///
324     /// This adapter produces a new sink that passes each value through the
325     /// given function `f` before sending it to `self`.
326     ///
327     /// To process each value, `f` produces a *stream*, of which each value
328     /// is passed to the underlying sink. A new value will not be accepted until
329     /// the stream has been drained
330     ///
331     /// Note that this function consumes the given sink, returning a wrapped
332     /// version, much like `Iterator::flat_map`.
333     ///
334     /// # Examples
335     /// ---
336     /// Using this function with an iterator through use of the `stream::iter_ok()`
337     /// function
338     ///
339     /// ```
340     /// use futures::prelude::*;
341     /// use futures::stream;
342     /// use futures::sync::mpsc;
343     ///
344     /// let (tx, rx) = mpsc::channel::<i32>(5);
345     ///
346     /// let tx = tx.with_flat_map(|x| {
347     ///     stream::iter_ok(vec![42; x].into_iter().map(|y| y))
348     /// });
349     /// tx.send(5).wait().unwrap();
350     /// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42]))
351     /// ```
with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St> where F: FnMut(U) -> St, St: Stream<Item = Self::SinkItem, Error=Self::SinkError>, Self: Sized352     fn with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St>
353         where F: FnMut(U) -> St,
354               St: Stream<Item = Self::SinkItem, Error=Self::SinkError>,
355               Self: Sized
356         {
357             with_flat_map::new(self, f)
358         }
359 
360     /*
361     fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
362         where F: FnMut(U) -> Self::SinkItem,
363               Self: Sized;
364 
365     fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
366         where F: FnMut(Self::SinkItem) -> bool,
367               Self: Sized;
368 
369     fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
370         where F: FnMut(U) -> Option<Self::SinkItem>,
371               Self: Sized;
372      */
373 
374     /// Transforms the error returned by the sink.
sink_map_err<F, E>(self, f: F) -> SinkMapErr<Self, F> where F: FnOnce(Self::SinkError) -> E, Self: Sized,375     fn sink_map_err<F, E>(self, f: F) -> SinkMapErr<Self, F>
376         where F: FnOnce(Self::SinkError) -> E,
377               Self: Sized,
378     {
379         map_err::new(self, f)
380     }
381 
382     /// Map this sink's error to any error implementing `From` for this sink's
383     /// `Error`, returning a new sink.
384     ///
385     /// If wanting to map errors of a `Sink + Stream`, use `.sink_from_err().from_err()`.
sink_from_err<E: From<Self::SinkError>>(self) -> from_err::SinkFromErr<Self, E> where Self: Sized,386     fn sink_from_err<E: From<Self::SinkError>>(self) -> from_err::SinkFromErr<Self, E>
387         where Self: Sized,
388     {
389         from_err::new(self)
390     }
391 
392 
393     /// Adds a fixed-size buffer to the current sink.
394     ///
395     /// The resulting sink will buffer up to `amt` items when the underlying
396     /// sink is unwilling to accept additional items. Calling `poll_complete` on
397     /// the buffered sink will attempt to both empty the buffer and complete
398     /// processing on the underlying sink.
399     ///
400     /// Note that this function consumes the given sink, returning a wrapped
401     /// version, much like `Iterator::map`.
402     ///
403     /// This method is only available when the `use_std` feature of this
404     /// library is activated, and it is activated by default.
405     #[cfg(feature = "use_std")]
buffer(self, amt: usize) -> Buffer<Self> where Self: Sized406     fn buffer(self, amt: usize) -> Buffer<Self>
407         where Self: Sized
408     {
409         buffer::new(self, amt)
410     }
411 
412     /// Fanout items to multiple sinks.
413     ///
414     /// This adapter clones each incoming item and forwards it to both this as well as
415     /// the other sink at the same time.
fanout<S>(self, other: S) -> Fanout<Self, S> where Self: Sized, Self::SinkItem: Clone, S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError>416     fn fanout<S>(self, other: S) -> Fanout<Self, S>
417         where Self: Sized,
418               Self::SinkItem: Clone,
419               S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError>
420     {
421         fanout::new(self, other)
422     }
423 
424     /// A future that completes when the sink has finished processing all
425     /// pending requests.
426     ///
427     /// The sink itself is returned after flushing is complete; this adapter is
428     /// intended to be used when you want to stop sending to the sink until
429     /// all current requests are processed.
flush(self) -> Flush<Self> where Self: Sized430     fn flush(self) -> Flush<Self>
431         where Self: Sized
432     {
433         flush::new(self)
434     }
435 
436     /// A future that completes after the given item has been fully processed
437     /// into the sink, including flushing.
438     ///
439     /// Note that, **because of the flushing requirement, it is usually better
440     /// to batch together items to send via `send_all`, rather than flushing
441     /// between each item.**
442     ///
443     /// On completion, the sink is returned.
send(self, item: Self::SinkItem) -> Send<Self> where Self: Sized444     fn send(self, item: Self::SinkItem) -> Send<Self>
445         where Self: Sized
446     {
447         send::new(self, item)
448     }
449 
450     /// A future that completes after the given stream has been fully processed
451     /// into the sink, including flushing.
452     ///
453     /// This future will drive the stream to keep producing items until it is
454     /// exhausted, sending each item to the sink. It will complete once both the
455     /// stream is exhausted, the sink has received all items, the sink has been
456     /// flushed, and the sink has been closed.
457     ///
458     /// Doing `sink.send_all(stream)` is roughly equivalent to
459     /// `stream.forward(sink)`. The returned future will exhaust all items from
460     /// `stream` and send them to `self`, closing `self` when all items have been
461     /// received.
462     ///
463     /// On completion, the pair `(sink, source)` is returned.
send_all<S>(self, stream: S) -> SendAll<Self, S> where S: Stream<Item = Self::SinkItem>, Self::SinkError: From<S::Error>, Self: Sized464     fn send_all<S>(self, stream: S) -> SendAll<Self, S>
465         where S: Stream<Item = Self::SinkItem>,
466               Self::SinkError: From<S::Error>,
467               Self: Sized
468     {
469         send_all::new(self, stream)
470     }
471 }
472 
473 impl<'a, S: ?Sized + Sink> Sink for &'a mut S {
474     type SinkItem = S::SinkItem;
475     type SinkError = S::SinkError;
476 
start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>477     fn start_send(&mut self, item: Self::SinkItem)
478                   -> StartSend<Self::SinkItem, Self::SinkError> {
479         (**self).start_send(item)
480     }
481 
poll_complete(&mut self) -> Poll<(), Self::SinkError>482     fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
483         (**self).poll_complete()
484     }
485 
close(&mut self) -> Poll<(), Self::SinkError>486     fn close(&mut self) -> Poll<(), Self::SinkError> {
487         (**self).close()
488     }
489 }
490