1 //! Asynchronous sinks 2 //! 3 //! This module contains the `Sink` trait, along with a number of adapter types 4 //! for it. An overview is available in the documentation for the trait itself. 5 //! 6 //! You can find more information/tutorials about streams [online at 7 //! https://tokio.rs][online] 8 //! 9 //! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ 10 11 use {IntoFuture, Poll, StartSend}; 12 use stream::Stream; 13 14 mod with; 15 mod with_flat_map; 16 // mod with_map; 17 // mod with_filter; 18 // mod with_filter_map; 19 mod flush; 20 mod from_err; 21 mod send; 22 mod send_all; 23 mod map_err; 24 mod fanout; 25 26 if_std! { 27 mod buffer; 28 mod wait; 29 30 pub use self::buffer::Buffer; 31 pub use self::wait::Wait; 32 33 // TODO: consider expanding this via e.g. FromIterator 34 impl<T> Sink for ::std::vec::Vec<T> { 35 type SinkItem = T; 36 type SinkError = (); // Change this to ! once it stabilizes 37 38 fn start_send(&mut self, item: Self::SinkItem) 39 -> StartSend<Self::SinkItem, Self::SinkError> 40 { 41 self.push(item); 42 Ok(::AsyncSink::Ready) 43 } 44 45 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { 46 Ok(::Async::Ready(())) 47 } 48 49 fn close(&mut self) -> Poll<(), Self::SinkError> { 50 Ok(::Async::Ready(())) 51 } 52 } 53 54 /// A type alias for `Box<Sink + Send>` 55 pub type BoxSink<T, E> = ::std::boxed::Box<Sink<SinkItem = T, SinkError = E> + 56 ::core::marker::Send>; 57 58 impl<S: ?Sized + Sink> Sink for ::std::boxed::Box<S> { 59 type SinkItem = S::SinkItem; 60 type SinkError = S::SinkError; 61 62 fn start_send(&mut self, item: Self::SinkItem) 63 -> StartSend<Self::SinkItem, Self::SinkError> { 64 (**self).start_send(item) 65 } 66 67 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { 68 (**self).poll_complete() 69 } 70 71 fn close(&mut self) -> Poll<(), Self::SinkError> { 72 (**self).close() 73 } 74 } 75 } 76 77 pub use self::with::With; 78 pub use self::with_flat_map::WithFlatMap; 79 pub use self::flush::Flush; 80 pub use self::send::Send; 81 pub use self::send_all::SendAll; 82 pub use self::map_err::SinkMapErr; 83 pub use self::from_err::SinkFromErr; 84 pub use self::fanout::Fanout; 85 86 /// A `Sink` is a value into which other values can be sent, asynchronously. 87 /// 88 /// Basic examples of sinks include the sending side of: 89 /// 90 /// - Channels 91 /// - Sockets 92 /// - Pipes 93 /// 94 /// In addition to such "primitive" sinks, it's typical to layer additional 95 /// functionality, such as buffering, on top of an existing sink. 96 /// 97 /// Sending to a sink is "asynchronous" in the sense that the value may not be 98 /// sent in its entirety immediately. Instead, values are sent in a two-phase 99 /// way: first by initiating a send, and then by polling for completion. This 100 /// two-phase setup is analogous to buffered writing in synchronous code, where 101 /// writes often succeed immediately, but internally are buffered and are 102 /// *actually* written only upon flushing. 103 /// 104 /// In addition, the `Sink` may be *full*, in which case it is not even possible 105 /// to start the sending process. 106 /// 107 /// As with `Future` and `Stream`, the `Sink` trait is built from a few core 108 /// required methods, and a host of default methods for working in a 109 /// higher-level way. The `Sink::send_all` combinator is of particular 110 /// importance: you can use it to send an entire stream to a sink, which is 111 /// the simplest way to ultimately consume a sink. 112 /// 113 /// You can find more information/tutorials about streams [online at 114 /// https://tokio.rs][online] 115 /// 116 /// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ 117 pub trait Sink { 118 /// The type of value that the sink accepts. 119 type SinkItem; 120 121 /// The type of value produced by the sink when an error occurs. 122 type SinkError; 123 124 /// Begin the process of sending a value to the sink. 125 /// 126 /// As the name suggests, this method only *begins* the process of sending 127 /// the item. If the sink employs buffering, the item isn't fully processed 128 /// until the buffer is fully flushed. Since sinks are designed to work with 129 /// asynchronous I/O, the process of actually writing out the data to an 130 /// underlying object takes place asynchronously. **You *must* use 131 /// `poll_complete` in order to drive completion of a send**. In particular, 132 /// `start_send` does not begin the flushing process 133 /// 134 /// # Return value 135 /// 136 /// This method returns `AsyncSink::Ready` if the sink was able to start 137 /// sending `item`. In that case, you *must* ensure that you call 138 /// `poll_complete` to process the sent item to completion. Note, however, 139 /// that several calls to `start_send` can be made prior to calling 140 /// `poll_complete`, which will work on completing all pending items. 141 /// 142 /// The method returns `AsyncSink::NotReady` if the sink was unable to begin 143 /// sending, usually due to being full. The sink must have attempted to 144 /// complete processing any outstanding requests (equivalent to 145 /// `poll_complete`) before yielding this result. The current task will be 146 /// automatically scheduled for notification when the sink may be ready to 147 /// receive new values. 148 /// 149 /// # Errors 150 /// 151 /// If the sink encounters an error other than being temporarily full, it 152 /// uses the `Err` variant to signal that error. In most cases, such errors 153 /// mean that the sink will permanently be unable to receive items. 154 /// 155 /// # Panics 156 /// 157 /// This method may panic in a few situations, depending on the specific 158 /// sink: 159 /// 160 /// - It is called outside of the context of a task. 161 /// - A previous call to `start_send` or `poll_complete` yielded an error. start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>162 fn start_send(&mut self, item: Self::SinkItem) 163 -> StartSend<Self::SinkItem, Self::SinkError>; 164 165 /// Flush all output from this sink, if necessary. 166 /// 167 /// Some sinks may buffer intermediate data as an optimization to improve 168 /// throughput. In other words, if a sink has a corresponding receiver then 169 /// a successful `start_send` above may not guarantee that the value is 170 /// actually ready to be received by the receiver. This function is intended 171 /// to be used to ensure that values do indeed make their way to the 172 /// receiver. 173 /// 174 /// This function will attempt to process any pending requests on behalf of 175 /// the sink and drive it to completion. 176 /// 177 /// # Return value 178 /// 179 /// Returns `Ok(Async::Ready(()))` when no buffered items remain. If this 180 /// value is returned then it is guaranteed that all previous values sent 181 /// via `start_send` will be guaranteed to be available to a listening 182 /// receiver. 183 /// 184 /// Returns `Ok(Async::NotReady)` if there is more work left to do, in which 185 /// case the current task is scheduled to wake up when more progress may be 186 /// possible. 187 /// 188 /// # Errors 189 /// 190 /// Returns `Err` if the sink encounters an error while processing one of 191 /// its pending requests. Due to the buffered nature of requests, it is not 192 /// generally possible to correlate the error with a particular request. As 193 /// with `start_send`, these errors are generally "fatal" for continued use 194 /// of the sink. 195 /// 196 /// # Panics 197 /// 198 /// This method may panic in a few situations, depending on the specific sink: 199 /// 200 /// - It is called outside of the context of a task. 201 /// - A previous call to `start_send` or `poll_complete` yielded an error. 202 /// 203 /// # Compatibility nodes 204 /// 205 /// The name of this method may be slightly misleading as the original 206 /// intention was to have this method be more general than just flushing 207 /// requests. Over time though it was decided to trim back the ambitions of 208 /// this method to what it's always done, just flushing. 209 /// 210 /// In the 0.2 release series of futures this method will be renamed to 211 /// `poll_flush`. For 0.1, however, the breaking change is not happening 212 /// yet. poll_complete(&mut self) -> Poll<(), Self::SinkError>213 fn poll_complete(&mut self) -> Poll<(), Self::SinkError>; 214 215 /// A method to indicate that no more values will ever be pushed into this 216 /// sink. 217 /// 218 /// This method is used to indicate that a sink will no longer even be given 219 /// another value by the caller. That is, the `start_send` method above will 220 /// be called no longer (nor `poll_complete`). This method is intended to 221 /// model "graceful shutdown" in various protocols where the intent to shut 222 /// down is followed by a little more blocking work. 223 /// 224 /// Callers of this function should work it it in a similar fashion to 225 /// `poll_complete`. Once called it may return `NotReady` which indicates 226 /// that more external work needs to happen to make progress. The current 227 /// task will be scheduled to receive a notification in such an event, 228 /// however. 229 /// 230 /// Note that this function will imply `poll_complete` above. That is, if a 231 /// sink has buffered data, then it'll be flushed out during a `close` 232 /// operation. It is not necessary to have `poll_complete` return `Ready` 233 /// before a `close` is called. Once a `close` is called, though, 234 /// `poll_complete` cannot be called. 235 /// 236 /// # Return value 237 /// 238 /// This function, like `poll_complete`, returns a `Poll`. The value is 239 /// `Ready` once the close operation has completed. At that point it should 240 /// be safe to drop the sink and deallocate associated resources. 241 /// 242 /// If the value returned is `NotReady` then the sink is not yet closed and 243 /// work needs to be done to close it. The work has been scheduled and the 244 /// current task will receive a notification when it's next ready to call 245 /// this method again. 246 /// 247 /// Finally, this function may also return an error. 248 /// 249 /// # Errors 250 /// 251 /// This function will return an `Err` if any operation along the way during 252 /// the close operation fails. An error typically is fatal for a sink and is 253 /// unable to be recovered from, but in specific situations this may not 254 /// always be true. 255 /// 256 /// Note that it's also typically an error to call `start_send` or 257 /// `poll_complete` after the `close` function is called. This method will 258 /// *initiate* a close, and continuing to send values after that (or attempt 259 /// to flush) may result in strange behavior, panics, errors, etc. Once this 260 /// method is called, it must be the only method called on this `Sink`. 261 /// 262 /// # Panics 263 /// 264 /// This method may panic or cause panics if: 265 /// 266 /// * It is called outside the context of a future's task 267 /// * It is called and then `start_send` or `poll_complete` is called 268 /// 269 /// # Compatibility notes 270 /// 271 /// Note that this function is currently by default a provided function, 272 /// defaulted to calling `poll_complete` above. This function was added 273 /// in the 0.1 series of the crate as a backwards-compatible addition. It 274 /// is intended that in the 0.2 series the method will no longer be a 275 /// default method. 276 /// 277 /// It is highly recommended to consider this method a required method and 278 /// to implement it whenever you implement `Sink` locally. It is especially 279 /// crucial to be sure to close inner sinks, if applicable. 280 #[cfg(feature = "with-deprecated")] close(&mut self) -> Poll<(), Self::SinkError>281 fn close(&mut self) -> Poll<(), Self::SinkError> { 282 self.poll_complete() 283 } 284 285 /// dox (you should see the above, not this) 286 #[cfg(not(feature = "with-deprecated"))] close(&mut self) -> Poll<(), Self::SinkError>287 fn close(&mut self) -> Poll<(), Self::SinkError>; 288 289 /// Creates a new object which will produce a synchronous sink. 290 /// 291 /// The sink returned does **not** implement the `Sink` trait, and instead 292 /// only has two methods: `send` and `flush`. These two methods correspond 293 /// to `start_send` and `poll_complete` above except are executed in a 294 /// blocking fashion. 295 #[cfg(feature = "use_std")] wait(self) -> Wait<Self> where Self: Sized296 fn wait(self) -> Wait<Self> 297 where Self: Sized 298 { 299 wait::new(self) 300 } 301 302 /// Composes a function *in front of* the sink. 303 /// 304 /// This adapter produces a new sink that passes each value through the 305 /// given function `f` before sending it to `self`. 306 /// 307 /// To process each value, `f` produces a *future*, which is then polled to 308 /// completion before passing its result down to the underlying sink. If the 309 /// future produces an error, that error is returned by the new sink. 310 /// 311 /// Note that this function consumes the given sink, returning a wrapped 312 /// version, much like `Iterator::map`. with<U, F, Fut>(self, f: F) -> With<Self, U, F, Fut> where F: FnMut(U) -> Fut, Fut: IntoFuture<Item = Self::SinkItem>, Fut::Error: From<Self::SinkError>, Self: Sized313 fn with<U, F, Fut>(self, f: F) -> With<Self, U, F, Fut> 314 where F: FnMut(U) -> Fut, 315 Fut: IntoFuture<Item = Self::SinkItem>, 316 Fut::Error: From<Self::SinkError>, 317 Self: Sized 318 { 319 with::new(self, f) 320 } 321 322 /// Composes a function *in front of* the sink. 323 /// 324 /// This adapter produces a new sink that passes each value through the 325 /// given function `f` before sending it to `self`. 326 /// 327 /// To process each value, `f` produces a *stream*, of which each value 328 /// is passed to the underlying sink. A new value will not be accepted until 329 /// the stream has been drained 330 /// 331 /// Note that this function consumes the given sink, returning a wrapped 332 /// version, much like `Iterator::flat_map`. 333 /// 334 /// # Examples 335 /// --- 336 /// Using this function with an iterator through use of the `stream::iter_ok()` 337 /// function 338 /// 339 /// ``` 340 /// use futures::prelude::*; 341 /// use futures::stream; 342 /// use futures::sync::mpsc; 343 /// 344 /// let (tx, rx) = mpsc::channel::<i32>(5); 345 /// 346 /// let tx = tx.with_flat_map(|x| { 347 /// stream::iter_ok(vec![42; x].into_iter().map(|y| y)) 348 /// }); 349 /// tx.send(5).wait().unwrap(); 350 /// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42])) 351 /// ``` with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St> where F: FnMut(U) -> St, St: Stream<Item = Self::SinkItem, Error=Self::SinkError>, Self: Sized352 fn with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St> 353 where F: FnMut(U) -> St, 354 St: Stream<Item = Self::SinkItem, Error=Self::SinkError>, 355 Self: Sized 356 { 357 with_flat_map::new(self, f) 358 } 359 360 /* 361 fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F> 362 where F: FnMut(U) -> Self::SinkItem, 363 Self: Sized; 364 365 fn with_filter<F>(self, f: F) -> WithFilter<Self, F> 366 where F: FnMut(Self::SinkItem) -> bool, 367 Self: Sized; 368 369 fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F> 370 where F: FnMut(U) -> Option<Self::SinkItem>, 371 Self: Sized; 372 */ 373 374 /// Transforms the error returned by the sink. sink_map_err<F, E>(self, f: F) -> SinkMapErr<Self, F> where F: FnOnce(Self::SinkError) -> E, Self: Sized,375 fn sink_map_err<F, E>(self, f: F) -> SinkMapErr<Self, F> 376 where F: FnOnce(Self::SinkError) -> E, 377 Self: Sized, 378 { 379 map_err::new(self, f) 380 } 381 382 /// Map this sink's error to any error implementing `From` for this sink's 383 /// `Error`, returning a new sink. 384 /// 385 /// If wanting to map errors of a `Sink + Stream`, use `.sink_from_err().from_err()`. sink_from_err<E: From<Self::SinkError>>(self) -> from_err::SinkFromErr<Self, E> where Self: Sized,386 fn sink_from_err<E: From<Self::SinkError>>(self) -> from_err::SinkFromErr<Self, E> 387 where Self: Sized, 388 { 389 from_err::new(self) 390 } 391 392 393 /// Adds a fixed-size buffer to the current sink. 394 /// 395 /// The resulting sink will buffer up to `amt` items when the underlying 396 /// sink is unwilling to accept additional items. Calling `poll_complete` on 397 /// the buffered sink will attempt to both empty the buffer and complete 398 /// processing on the underlying sink. 399 /// 400 /// Note that this function consumes the given sink, returning a wrapped 401 /// version, much like `Iterator::map`. 402 /// 403 /// This method is only available when the `use_std` feature of this 404 /// library is activated, and it is activated by default. 405 #[cfg(feature = "use_std")] buffer(self, amt: usize) -> Buffer<Self> where Self: Sized406 fn buffer(self, amt: usize) -> Buffer<Self> 407 where Self: Sized 408 { 409 buffer::new(self, amt) 410 } 411 412 /// Fanout items to multiple sinks. 413 /// 414 /// This adapter clones each incoming item and forwards it to both this as well as 415 /// the other sink at the same time. fanout<S>(self, other: S) -> Fanout<Self, S> where Self: Sized, Self::SinkItem: Clone, S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError>416 fn fanout<S>(self, other: S) -> Fanout<Self, S> 417 where Self: Sized, 418 Self::SinkItem: Clone, 419 S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError> 420 { 421 fanout::new(self, other) 422 } 423 424 /// A future that completes when the sink has finished processing all 425 /// pending requests. 426 /// 427 /// The sink itself is returned after flushing is complete; this adapter is 428 /// intended to be used when you want to stop sending to the sink until 429 /// all current requests are processed. flush(self) -> Flush<Self> where Self: Sized430 fn flush(self) -> Flush<Self> 431 where Self: Sized 432 { 433 flush::new(self) 434 } 435 436 /// A future that completes after the given item has been fully processed 437 /// into the sink, including flushing. 438 /// 439 /// Note that, **because of the flushing requirement, it is usually better 440 /// to batch together items to send via `send_all`, rather than flushing 441 /// between each item.** 442 /// 443 /// On completion, the sink is returned. send(self, item: Self::SinkItem) -> Send<Self> where Self: Sized444 fn send(self, item: Self::SinkItem) -> Send<Self> 445 where Self: Sized 446 { 447 send::new(self, item) 448 } 449 450 /// A future that completes after the given stream has been fully processed 451 /// into the sink, including flushing. 452 /// 453 /// This future will drive the stream to keep producing items until it is 454 /// exhausted, sending each item to the sink. It will complete once both the 455 /// stream is exhausted, the sink has received all items, the sink has been 456 /// flushed, and the sink has been closed. 457 /// 458 /// Doing `sink.send_all(stream)` is roughly equivalent to 459 /// `stream.forward(sink)`. The returned future will exhaust all items from 460 /// `stream` and send them to `self`, closing `self` when all items have been 461 /// received. 462 /// 463 /// On completion, the pair `(sink, source)` is returned. send_all<S>(self, stream: S) -> SendAll<Self, S> where S: Stream<Item = Self::SinkItem>, Self::SinkError: From<S::Error>, Self: Sized464 fn send_all<S>(self, stream: S) -> SendAll<Self, S> 465 where S: Stream<Item = Self::SinkItem>, 466 Self::SinkError: From<S::Error>, 467 Self: Sized 468 { 469 send_all::new(self, stream) 470 } 471 } 472 473 impl<'a, S: ?Sized + Sink> Sink for &'a mut S { 474 type SinkItem = S::SinkItem; 475 type SinkError = S::SinkError; 476 start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>477 fn start_send(&mut self, item: Self::SinkItem) 478 -> StartSend<Self::SinkItem, Self::SinkError> { 479 (**self).start_send(item) 480 } 481 poll_complete(&mut self) -> Poll<(), Self::SinkError>482 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { 483 (**self).poll_complete() 484 } 485 close(&mut self) -> Poll<(), Self::SinkError>486 fn close(&mut self) -> Poll<(), Self::SinkError> { 487 (**self).close() 488 } 489 } 490