1 //! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
2 //!
3 //! A channel can be used as a communication primitive between tasks running on
4 //! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
5 //! handles. `Receiver` implements `Stream` and allows a task to read values
6 //! out of the channel. If there is no message to read from the channel, the
7 //! current task will be notified when a new value is sent. `Sender` implements
8 //! the `Sink` trait and allows a task to send messages into the channel. If
9 //! the channel is at capacity, then send will be rejected and the task will be
10 //! notified when additional capacity is available.
11 //!
12 //! # Disconnection
13 //!
14 //! When all `Sender` handles have been dropped, it is no longer possible to
15 //! send values into the channel. This is considered the termination event of
16 //! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
17 //!
18 //! If the receiver handle is dropped, then messages can no longer be read out
19 //! of the channel. In this case, a `send` will result in an error.
20 //!
21 //! # Clean Shutdown
22 //!
23 //! If the `Receiver` is simply dropped, then it is possible for there to be
24 //! messages still in the channel that will not be processed. As such, it is
25 //! usually desirable to perform a "clean" shutdown. To do this, the receiver
26 //! will first call `close`, which will prevent any further messages to be sent
27 //! into the channel. Then, the receiver consumes the channel to completion, at
28 //! which point the receiver can be dropped.
29
30 // At the core, the channel uses an atomic FIFO queue for message passing. This
31 // queue is used as the primary coordination primitive. In order to enforce
32 // capacity limits and handle back pressure, a secondary FIFO queue is used to
33 // send parked task handles.
34 //
35 // The general idea is that the channel is created with a `buffer` size of `n`.
36 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
37 // slot to hold a message. This allows `Sender` to know for a fact that a send
38 // will succeed *before* starting to do the actual work of sending the value.
39 // Since most of this work is lock-free, once the work starts, it is impossible
40 // to safely revert.
41 //
42 // If the sender is unable to process a send operation, then the current
43 // task is parked and the handle is sent on the parked task queue.
44 //
45 // Note that the implementation guarantees that the channel capacity will never
46 // exceed the configured limit, however there is no *strict* guarantee that the
47 // receiver will wake up a parked task *immediately* when a slot becomes
48 // available. However, it will almost always unpark a task when a slot becomes
49 // available and it is *guaranteed* that a sender will be unparked when the
50 // message that caused the sender to become parked is read out of the channel.
51 //
52 // The steps for sending a message are roughly:
53 //
54 // 1) Increment the channel message count
55 // 2) If the channel is at capacity, push the task handle onto the wait queue
56 // 3) Push the message onto the message queue.
57 //
58 // The steps for receiving a message are roughly:
59 //
60 // 1) Pop a message from the message queue
61 // 2) Pop a task handle from the wait queue
62 // 3) Decrement the channel message count.
63 //
64 // It's important for the order of operations on lock-free structures to happen
65 // in reverse order between the sender and receiver. This makes the message
66 // queue the primary coordination structure and establishes the necessary
67 // happens-before semantics required for the acquire / release semantics used
68 // by the queue structure.
69
70 use std::fmt;
71 use std::error::Error;
72 use std::any::Any;
73 use std::sync::atomic::AtomicUsize;
74 use std::sync::atomic::Ordering::SeqCst;
75 use std::sync::{Arc, Mutex};
76 use std::thread;
77 use std::usize;
78
79 use sync::mpsc::queue::{Queue, PopResult};
80 use sync::oneshot;
81 use task::{self, Task};
82 use future::Executor;
83 use sink::SendAll;
84 use resultstream::{self, Results};
85 use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
86
87 mod queue;
88
89 /// The transmission end of a channel which is used to send values.
90 ///
91 /// This is created by the `channel` method.
92 #[derive(Debug)]
93 pub struct Sender<T> {
94 // Channel state shared between the sender and receiver.
95 inner: Arc<Inner<T>>,
96
97 // Handle to the task that is blocked on this sender. This handle is sent
98 // to the receiver half in order to be notified when the sender becomes
99 // unblocked.
100 sender_task: Arc<Mutex<SenderTask>>,
101
102 // True if the sender might be blocked. This is an optimization to avoid
103 // having to lock the mutex most of the time.
104 maybe_parked: bool,
105 }
106
107 /// The transmission end of a channel which is used to send values.
108 ///
109 /// This is created by the `unbounded` method.
110 #[derive(Debug)]
111 pub struct UnboundedSender<T>(Sender<T>);
112
113 trait AssertKinds: Send + Sync + Clone {}
114 impl AssertKinds for UnboundedSender<u32> {}
115
116
117 /// The receiving end of a channel which implements the `Stream` trait.
118 ///
119 /// This is a concrete implementation of a stream which can be used to represent
120 /// a stream of values being computed elsewhere. This is created by the
121 /// `channel` method.
122 #[derive(Debug)]
123 pub struct Receiver<T> {
124 inner: Arc<Inner<T>>,
125 }
126
127 /// The receiving end of a channel which implements the `Stream` trait.
128 ///
129 /// This is a concrete implementation of a stream which can be used to represent
130 /// a stream of values being computed elsewhere. This is created by the
131 /// `unbounded` method.
132 #[derive(Debug)]
133 pub struct UnboundedReceiver<T>(Receiver<T>);
134
135 /// Error type for sending, used when the receiving end of a channel is
136 /// dropped
137 #[derive(Clone, PartialEq, Eq)]
138 pub struct SendError<T>(T);
139
140 /// Error type returned from `try_send`
141 #[derive(Clone, PartialEq, Eq)]
142 pub struct TrySendError<T> {
143 kind: TrySendErrorKind<T>,
144 }
145
146 #[derive(Clone, PartialEq, Eq)]
147 enum TrySendErrorKind<T> {
148 Full(T),
149 Disconnected(T),
150 }
151
152 impl<T> fmt::Debug for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result153 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
154 fmt.debug_tuple("SendError")
155 .field(&"...")
156 .finish()
157 }
158 }
159
160 impl<T> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result161 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
162 write!(fmt, "send failed because receiver is gone")
163 }
164 }
165
166 impl<T: Any> Error for SendError<T>
167 {
description(&self) -> &str168 fn description(&self) -> &str {
169 "send failed because receiver is gone"
170 }
171 }
172
173 impl<T> SendError<T> {
174 /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T175 pub fn into_inner(self) -> T {
176 self.0
177 }
178 }
179
180 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result181 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
182 fmt.debug_tuple("TrySendError")
183 .field(&"...")
184 .finish()
185 }
186 }
187
188 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result189 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
190 if self.is_full() {
191 write!(fmt, "send failed because channel is full")
192 } else {
193 write!(fmt, "send failed because receiver is gone")
194 }
195 }
196 }
197
198 impl<T: Any> Error for TrySendError<T> {
description(&self) -> &str199 fn description(&self) -> &str {
200 if self.is_full() {
201 "send failed because channel is full"
202 } else {
203 "send failed because receiver is gone"
204 }
205 }
206 }
207
208 impl<T> TrySendError<T> {
209 /// Returns true if this error is a result of the channel being full
is_full(&self) -> bool210 pub fn is_full(&self) -> bool {
211 use self::TrySendErrorKind::*;
212
213 match self.kind {
214 Full(_) => true,
215 _ => false,
216 }
217 }
218
219 /// Returns true if this error is a result of the receiver being dropped
is_disconnected(&self) -> bool220 pub fn is_disconnected(&self) -> bool {
221 use self::TrySendErrorKind::*;
222
223 match self.kind {
224 Disconnected(_) => true,
225 _ => false,
226 }
227 }
228
229 /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T230 pub fn into_inner(self) -> T {
231 use self::TrySendErrorKind::*;
232
233 match self.kind {
234 Full(v) | Disconnected(v) => v,
235 }
236 }
237 }
238
239 #[derive(Debug)]
240 struct Inner<T> {
241 // Max buffer size of the channel. If `None` then the channel is unbounded.
242 buffer: Option<usize>,
243
244 // Internal channel state. Consists of the number of messages stored in the
245 // channel as well as a flag signalling that the channel is closed.
246 state: AtomicUsize,
247
248 // Atomic, FIFO queue used to send messages to the receiver
249 message_queue: Queue<Option<T>>,
250
251 // Atomic, FIFO queue used to send parked task handles to the receiver.
252 parked_queue: Queue<Arc<Mutex<SenderTask>>>,
253
254 // Number of senders in existence
255 num_senders: AtomicUsize,
256
257 // Handle to the receiver's task.
258 recv_task: Mutex<ReceiverTask>,
259 }
260
261 // Struct representation of `Inner::state`.
262 #[derive(Debug, Clone, Copy)]
263 struct State {
264 // `true` when the channel is open
265 is_open: bool,
266
267 // Number of messages in the channel
268 num_messages: usize,
269 }
270
271 #[derive(Debug)]
272 struct ReceiverTask {
273 unparked: bool,
274 task: Option<Task>,
275 }
276
277 // Returned from Receiver::try_park()
278 enum TryPark {
279 Parked,
280 Closed,
281 NotEmpty,
282 }
283
284 // The `is_open` flag is stored in the left-most bit of `Inner::state`
285 const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
286
287 // When a new channel is created, it is created in the open state with no
288 // pending messages.
289 const INIT_STATE: usize = OPEN_MASK;
290
291 // The maximum number of messages that a channel can track is `usize::MAX >> 1`
292 const MAX_CAPACITY: usize = !(OPEN_MASK);
293
294 // The maximum requested buffer size must be less than the maximum capacity of
295 // a channel. This is because each sender gets a guaranteed slot.
296 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
297
298 // Sent to the consumer to wake up blocked producers
299 #[derive(Debug)]
300 struct SenderTask {
301 task: Option<Task>,
302 is_parked: bool,
303 }
304
305 impl SenderTask {
new() -> Self306 fn new() -> Self {
307 SenderTask {
308 task: None,
309 is_parked: false,
310 }
311 }
312
notify(&mut self)313 fn notify(&mut self) {
314 self.is_parked = false;
315
316 if let Some(task) = self.task.take() {
317 task.notify();
318 }
319 }
320 }
321
322 /// Creates an in-memory channel implementation of the `Stream` trait with
323 /// bounded capacity.
324 ///
325 /// This method creates a concrete implementation of the `Stream` trait which
326 /// can be used to send values across threads in a streaming fashion. This
327 /// channel is unique in that it implements back pressure to ensure that the
328 /// sender never outpaces the receiver. The channel capacity is equal to
329 /// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
330 /// in the channel capacity, and on top of that there are `buffer` "first come,
331 /// first serve" slots available to all senders.
332 ///
333 /// The `Receiver` returned implements the `Stream` trait and has access to any
334 /// number of the associated combinators for transforming the result.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)335 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
336 // Check that the requested buffer size does not exceed the maximum buffer
337 // size permitted by the system.
338 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
339 channel2(Some(buffer))
340 }
341
342 /// Creates an in-memory channel implementation of the `Stream` trait with
343 /// unbounded capacity.
344 ///
345 /// This method creates a concrete implementation of the `Stream` trait which
346 /// can be used to send values across threads in a streaming fashion. A `send`
347 /// on this channel will always succeed as long as the receive half has not
348 /// been closed. If the receiver falls behind, messages will be buffered
349 /// internally.
350 ///
351 /// **Note** that the amount of available system memory is an implicit bound to
352 /// the channel. Using an `unbounded` channel has the ability of causing the
353 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)354 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
355 let (tx, rx) = channel2(None);
356 (UnboundedSender(tx), UnboundedReceiver(rx))
357 }
358
channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>)359 fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
360 let inner = Arc::new(Inner {
361 buffer: buffer,
362 state: AtomicUsize::new(INIT_STATE),
363 message_queue: Queue::new(),
364 parked_queue: Queue::new(),
365 num_senders: AtomicUsize::new(1),
366 recv_task: Mutex::new(ReceiverTask {
367 unparked: false,
368 task: None,
369 }),
370 });
371
372 let tx = Sender {
373 inner: inner.clone(),
374 sender_task: Arc::new(Mutex::new(SenderTask::new())),
375 maybe_parked: false,
376 };
377
378 let rx = Receiver {
379 inner: inner,
380 };
381
382 (tx, rx)
383 }
384
385 /*
386 *
387 * ===== impl Sender =====
388 *
389 */
390
391 impl<T> Sender<T> {
392 /// Attempts to send a message on this `Sender<T>` without blocking.
393 ///
394 /// This function, unlike `start_send`, is safe to call whether it's being
395 /// called on a task or not. Note that this function, however, will *not*
396 /// attempt to block the current task if the message cannot be sent.
397 ///
398 /// It is not recommended to call this function from inside of a future,
399 /// only from an external thread where you've otherwise arranged to be
400 /// notified when the channel is no longer full.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>401 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
402 // If the sender is currently blocked, reject the message
403 if !self.poll_unparked(false).is_ready() {
404 return Err(TrySendError {
405 kind: TrySendErrorKind::Full(msg),
406 });
407 }
408
409 // The channel has capacity to accept the message, so send it
410 self.do_send(Some(msg), false)
411 .map_err(|SendError(v)| {
412 TrySendError {
413 kind: TrySendErrorKind::Disconnected(v),
414 }
415 })
416 }
417
418 // Do the send without failing
419 // None means close
do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>>420 fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
421 // First, increment the number of messages contained by the channel.
422 // This operation will also atomically determine if the sender task
423 // should be parked.
424 //
425 // None is returned in the case that the channel has been closed by the
426 // receiver. This happens when `Receiver::close` is called or the
427 // receiver is dropped.
428 let park_self = match self.inc_num_messages(msg.is_none()) {
429 Some(park_self) => park_self,
430 None => {
431 // The receiver has closed the channel. Only abort if actually
432 // sending a message. It is important that the stream
433 // termination (None) is always sent. This technically means
434 // that it is possible for the queue to contain the following
435 // number of messages:
436 //
437 // num-senders + buffer + 1
438 //
439 if let Some(msg) = msg {
440 return Err(SendError(msg));
441 } else {
442 return Ok(());
443 }
444 }
445 };
446
447 // If the channel has reached capacity, then the sender task needs to
448 // be parked. This will send the task handle on the parked task queue.
449 //
450 // However, when `do_send` is called while dropping the `Sender`,
451 // `task::current()` can't be called safely. In this case, in order to
452 // maintain internal consistency, a blank message is pushed onto the
453 // parked task queue.
454 if park_self {
455 self.park(do_park);
456 }
457
458 self.queue_push_and_signal(msg);
459
460 Ok(())
461 }
462
463 // Do the send without parking current task.
464 //
465 // To be called from unbounded sender.
do_send_nb(&self, msg: T) -> Result<(), SendError<T>>466 fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
467 match self.inc_num_messages(false) {
468 Some(park_self) => assert!(!park_self),
469 None => return Err(SendError(msg)),
470 };
471
472 self.queue_push_and_signal(Some(msg));
473
474 Ok(())
475 }
476
477 // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: Option<T>)478 fn queue_push_and_signal(&self, msg: Option<T>) {
479 // Push the message onto the message queue
480 self.inner.message_queue.push(msg);
481
482 // Signal to the receiver that a message has been enqueued. If the
483 // receiver is parked, this will unpark the task.
484 self.signal();
485 }
486
487 // Increment the number of queued messages. Returns if the sender should
488 // block.
inc_num_messages(&self, close: bool) -> Option<bool>489 fn inc_num_messages(&self, close: bool) -> Option<bool> {
490 let mut curr = self.inner.state.load(SeqCst);
491
492 loop {
493 let mut state = decode_state(curr);
494
495 // The receiver end closed the channel.
496 if !state.is_open {
497 return None;
498 }
499
500 // This probably is never hit? Odds are the process will run out of
501 // memory first. It may be worth to return something else in this
502 // case?
503 assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
504 sending this messages would overflow the state");
505
506 state.num_messages += 1;
507
508 // The channel is closed by all sender handles being dropped.
509 if close {
510 state.is_open = false;
511 }
512
513 let next = encode_state(&state);
514 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
515 Ok(_) => {
516 // Block if the current number of pending messages has exceeded
517 // the configured buffer size
518 let park_self = match self.inner.buffer {
519 Some(buffer) => state.num_messages > buffer,
520 None => false,
521 };
522
523 return Some(park_self)
524 }
525 Err(actual) => curr = actual,
526 }
527 }
528 }
529
530 // Signal to the receiver task that a message has been enqueued
signal(&self)531 fn signal(&self) {
532 // TODO
533 // This logic can probably be improved by guarding the lock with an
534 // atomic.
535 //
536 // Do this step first so that the lock is dropped when
537 // `unpark` is called
538 let task = {
539 let mut recv_task = self.inner.recv_task.lock().unwrap();
540
541 // If the receiver has already been unparked, then there is nothing
542 // more to do
543 if recv_task.unparked {
544 return;
545 }
546
547 // Setting this flag enables the receiving end to detect that
548 // an unpark event happened in order to avoid unnecessarily
549 // parking.
550 recv_task.unparked = true;
551 recv_task.task.take()
552 };
553
554 if let Some(task) = task {
555 task.notify();
556 }
557 }
558
park(&mut self, can_park: bool)559 fn park(&mut self, can_park: bool) {
560 // TODO: clean up internal state if the task::current will fail
561
562 let task = if can_park {
563 Some(task::current())
564 } else {
565 None
566 };
567
568 {
569 let mut sender = self.sender_task.lock().unwrap();
570 sender.task = task;
571 sender.is_parked = true;
572 }
573
574 // Send handle over queue
575 let t = self.sender_task.clone();
576 self.inner.parked_queue.push(t);
577
578 // Check to make sure we weren't closed after we sent our task on the
579 // queue
580 let state = decode_state(self.inner.state.load(SeqCst));
581 self.maybe_parked = state.is_open;
582 }
583
584 /// Polls the channel to determine if there is guaranteed to be capacity to send at least one
585 /// item without waiting.
586 ///
587 /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
588 /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns
589 /// `Err(SendError(_))` if the receiver has been dropped.
590 ///
591 /// # Panics
592 ///
593 /// This method will panic if called from outside the context of a task or future.
poll_ready(&mut self) -> Poll<(), SendError<()>>594 pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
595 let state = decode_state(self.inner.state.load(SeqCst));
596 if !state.is_open {
597 return Err(SendError(()));
598 }
599
600 Ok(self.poll_unparked(true))
601 }
602
603 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool604 pub fn is_closed(&self) -> bool {
605 !decode_state(self.inner.state.load(SeqCst)).is_open
606 }
607
poll_unparked(&mut self, do_park: bool) -> Async<()>608 fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
609 // First check the `maybe_parked` variable. This avoids acquiring the
610 // lock in most cases
611 if self.maybe_parked {
612 // Get a lock on the task handle
613 let mut task = self.sender_task.lock().unwrap();
614
615 if !task.is_parked {
616 self.maybe_parked = false;
617 return Async::Ready(())
618 }
619
620 // At this point, an unpark request is pending, so there will be an
621 // unpark sometime in the future. We just need to make sure that
622 // the correct task will be notified.
623 //
624 // Update the task in case the `Sender` has been moved to another
625 // task
626 task.task = if do_park {
627 Some(task::current())
628 } else {
629 None
630 };
631
632 Async::NotReady
633 } else {
634 Async::Ready(())
635 }
636 }
637 }
638
639 impl<T> Sink for Sender<T> {
640 type SinkItem = T;
641 type SinkError = SendError<T>;
642
start_send(&mut self, msg: T) -> StartSend<T, SendError<T>>643 fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
644 // If the sender is currently blocked, reject the message before doing
645 // any work.
646 if !self.poll_unparked(true).is_ready() {
647 return Ok(AsyncSink::NotReady(msg));
648 }
649
650 // The channel has capacity to accept the message, so send it.
651 self.do_send(Some(msg), true)?;
652
653 Ok(AsyncSink::Ready)
654 }
655
poll_complete(&mut self) -> Poll<(), SendError<T>>656 fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
657 self.poll_ready()
658 // At this point, the value cannot be returned and `SendError`
659 // cannot be created with a `T` without breaking backwards
660 // comptibility. This means we cannot return an error.
661 //
662 // That said, there is also no guarantee that a `poll_complete`
663 // returning `Ok` implies the receiver sees the message.
664 .or_else(|_| Ok(().into()))
665 }
666
close(&mut self) -> Poll<(), SendError<T>>667 fn close(&mut self) -> Poll<(), SendError<T>> {
668 Ok(Async::Ready(()))
669 }
670 }
671
672 impl<T> UnboundedSender<T> {
673 /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool674 pub fn is_closed(&self) -> bool {
675 self.0.is_closed()
676 }
677
678 /// Sends the provided message along this channel.
679 ///
680 /// This is an unbounded sender, so this function differs from `Sink::send`
681 /// by ensuring the return type reflects that the channel is always ready to
682 /// receive messages.
683 #[deprecated(note = "renamed to `unbounded_send`")]
684 #[doc(hidden)]
send(&self, msg: T) -> Result<(), SendError<T>>685 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
686 self.unbounded_send(msg)
687 }
688
689 /// Sends the provided message along this channel.
690 ///
691 /// This is an unbounded sender, so this function differs from `Sink::send`
692 /// by ensuring the return type reflects that the channel is always ready to
693 /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), SendError<T>>694 pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
695 self.0.do_send_nb(msg)
696 }
697 }
698
699 impl<T> Sink for UnboundedSender<T> {
700 type SinkItem = T;
701 type SinkError = SendError<T>;
702
start_send(&mut self, msg: T) -> StartSend<T, SendError<T>>703 fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
704 self.0.start_send(msg)
705 }
706
poll_complete(&mut self) -> Poll<(), SendError<T>>707 fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
708 self.0.poll_complete()
709 }
710
close(&mut self) -> Poll<(), SendError<T>>711 fn close(&mut self) -> Poll<(), SendError<T>> {
712 Ok(Async::Ready(()))
713 }
714 }
715
716 impl<'a, T> Sink for &'a UnboundedSender<T> {
717 type SinkItem = T;
718 type SinkError = SendError<T>;
719
start_send(&mut self, msg: T) -> StartSend<T, SendError<T>>720 fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
721 self.0.do_send_nb(msg)?;
722 Ok(AsyncSink::Ready)
723 }
724
poll_complete(&mut self) -> Poll<(), SendError<T>>725 fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
726 Ok(Async::Ready(()))
727 }
728
close(&mut self) -> Poll<(), SendError<T>>729 fn close(&mut self) -> Poll<(), SendError<T>> {
730 Ok(Async::Ready(()))
731 }
732 }
733
734 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> UnboundedSender<T>735 fn clone(&self) -> UnboundedSender<T> {
736 UnboundedSender(self.0.clone())
737 }
738 }
739
740
741 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>742 fn clone(&self) -> Sender<T> {
743 // Since this atomic op isn't actually guarding any memory and we don't
744 // care about any orderings besides the ordering on the single atomic
745 // variable, a relaxed ordering is acceptable.
746 let mut curr = self.inner.num_senders.load(SeqCst);
747
748 loop {
749 // If the maximum number of senders has been reached, then fail
750 if curr == self.inner.max_senders() {
751 panic!("cannot clone `Sender` -- too many outstanding senders");
752 }
753
754 debug_assert!(curr < self.inner.max_senders());
755
756 let next = curr + 1;
757 let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
758
759 // The ABA problem doesn't matter here. We only care that the
760 // number of senders never exceeds the maximum.
761 if actual == curr {
762 return Sender {
763 inner: self.inner.clone(),
764 sender_task: Arc::new(Mutex::new(SenderTask::new())),
765 maybe_parked: false,
766 };
767 }
768
769 curr = actual;
770 }
771 }
772 }
773
774 impl<T> Drop for Sender<T> {
drop(&mut self)775 fn drop(&mut self) {
776 // Ordering between variables don't matter here
777 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
778
779 if prev == 1 {
780 let _ = self.do_send(None, false);
781 }
782 }
783 }
784
785 /*
786 *
787 * ===== impl Receiver =====
788 *
789 */
790
791 impl<T> Receiver<T> {
792 /// Closes the receiving half
793 ///
794 /// This prevents any further messages from being sent on the channel while
795 /// still enabling the receiver to drain messages that are buffered.
close(&mut self)796 pub fn close(&mut self) {
797 let mut curr = self.inner.state.load(SeqCst);
798
799 loop {
800 let mut state = decode_state(curr);
801
802 if !state.is_open {
803 break
804 }
805
806 state.is_open = false;
807
808 let next = encode_state(&state);
809 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
810 Ok(_) => break,
811 Err(actual) => curr = actual,
812 }
813 }
814
815 // Wake up any threads waiting as they'll see that we've closed the
816 // channel and will continue on their merry way.
817 loop {
818 match unsafe { self.inner.parked_queue.pop() } {
819 PopResult::Data(task) => {
820 task.lock().unwrap().notify();
821 }
822 PopResult::Empty => break,
823 PopResult::Inconsistent => thread::yield_now(),
824 }
825 }
826 }
827
next_message(&mut self) -> Async<Option<T>>828 fn next_message(&mut self) -> Async<Option<T>> {
829 // Pop off a message
830 loop {
831 match unsafe { self.inner.message_queue.pop() } {
832 PopResult::Data(msg) => {
833 // If there are any parked task handles in the parked queue,
834 // pop one and unpark it.
835 self.unpark_one();
836 // Decrement number of messages
837 self.dec_num_messages();
838
839 return Async::Ready(msg);
840 }
841 PopResult::Empty => {
842 // The queue is empty, return NotReady
843 return Async::NotReady;
844 }
845 PopResult::Inconsistent => {
846 // Inconsistent means that there will be a message to pop
847 // in a short time. This branch can only be reached if
848 // values are being produced from another thread, so there
849 // are a few ways that we can deal with this:
850 //
851 // 1) Spin
852 // 2) thread::yield_now()
853 // 3) task::current().unwrap() & return NotReady
854 //
855 // For now, thread::yield_now() is used, but it would
856 // probably be better to spin a few times then yield.
857 thread::yield_now();
858 }
859 }
860 }
861 }
862
863 // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)864 fn unpark_one(&mut self) {
865 loop {
866 match unsafe { self.inner.parked_queue.pop() } {
867 PopResult::Data(task) => {
868 task.lock().unwrap().notify();
869 return;
870 }
871 PopResult::Empty => {
872 // Queue empty, no task to wake up.
873 return;
874 }
875 PopResult::Inconsistent => {
876 // Same as above
877 thread::yield_now();
878 }
879 }
880 }
881 }
882
883 // Try to park the receiver task
try_park(&self) -> TryPark884 fn try_park(&self) -> TryPark {
885 let curr = self.inner.state.load(SeqCst);
886 let state = decode_state(curr);
887
888 // If the channel is closed, then there is no need to park.
889 if state.is_closed() {
890 return TryPark::Closed;
891 }
892
893 // First, track the task in the `recv_task` slot
894 let mut recv_task = self.inner.recv_task.lock().unwrap();
895
896 if recv_task.unparked {
897 // Consume the `unpark` signal without actually parking
898 recv_task.unparked = false;
899 return TryPark::NotEmpty;
900 }
901
902 recv_task.task = Some(task::current());
903 TryPark::Parked
904 }
905
dec_num_messages(&self)906 fn dec_num_messages(&self) {
907 let mut curr = self.inner.state.load(SeqCst);
908
909 loop {
910 let mut state = decode_state(curr);
911
912 state.num_messages -= 1;
913
914 let next = encode_state(&state);
915 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
916 Ok(_) => break,
917 Err(actual) => curr = actual,
918 }
919 }
920 }
921 }
922
923 impl<T> Stream for Receiver<T> {
924 type Item = T;
925 type Error = ();
926
poll(&mut self) -> Poll<Option<T>, ()>927 fn poll(&mut self) -> Poll<Option<T>, ()> {
928 loop {
929 // Try to read a message off of the message queue.
930 match self.next_message() {
931 Async::Ready(msg) => return Ok(Async::Ready(msg)),
932 Async::NotReady => {
933 // There are no messages to read, in this case, attempt to
934 // park. The act of parking will verify that the channel is
935 // still empty after the park operation has completed.
936 match self.try_park() {
937 TryPark::Parked => {
938 // The task was parked, and the channel is still
939 // empty, return NotReady.
940 return Ok(Async::NotReady);
941 }
942 TryPark::Closed => {
943 // The channel is closed, there will be no further
944 // messages.
945 return Ok(Async::Ready(None));
946 }
947 TryPark::NotEmpty => {
948 // A message has been sent while attempting to
949 // park. Loop again, the next iteration is
950 // guaranteed to get the message.
951 continue;
952 }
953 }
954 }
955 }
956 }
957 }
958 }
959
960 impl<T> Drop for Receiver<T> {
drop(&mut self)961 fn drop(&mut self) {
962 // Drain the channel of all pending messages
963 self.close();
964
965 loop {
966 match self.next_message() {
967 Async::Ready(_) => {}
968 Async::NotReady => {
969 let curr = self.inner.state.load(SeqCst);
970 let state = decode_state(curr);
971
972 // If the channel is closed, then there is no need to park.
973 if state.is_closed() {
974 return;
975 }
976
977 // TODO: Spinning isn't ideal, it might be worth
978 // investigating using a condvar or some other strategy
979 // here. That said, if this case is hit, then another thread
980 // is about to push the value into the queue and this isn't
981 // the only spinlock in the impl right now.
982 thread::yield_now();
983 }
984 }
985 }
986 }
987 }
988
989 impl<T> UnboundedReceiver<T> {
990 /// Closes the receiving half
991 ///
992 /// This prevents any further messages from being sent on the channel while
993 /// still enabling the receiver to drain messages that are buffered.
close(&mut self)994 pub fn close(&mut self) {
995 self.0.close();
996 }
997 }
998
999 impl<T> Stream for UnboundedReceiver<T> {
1000 type Item = T;
1001 type Error = ();
1002
poll(&mut self) -> Poll<Option<T>, ()>1003 fn poll(&mut self) -> Poll<Option<T>, ()> {
1004 self.0.poll()
1005 }
1006 }
1007
1008 /// Handle returned from the `spawn` function.
1009 ///
1010 /// This handle is a stream that proxies a stream on a separate `Executor`.
1011 /// Created through the `mpsc::spawn` function, this handle will produce
1012 /// the same values as the proxied stream, as they are produced in the executor,
1013 /// and uses a limited buffer to exert back-pressure on the remote stream.
1014 ///
1015 /// If this handle is dropped, then the stream will no longer be polled and is
1016 /// scheduled to be dropped.
1017 pub struct SpawnHandle<Item, Error> {
1018 rx: Receiver<Result<Item, Error>>,
1019 _cancel_tx: oneshot::Sender<()>,
1020 }
1021
1022 /// Type of future which `Executor` instances must be able to execute for `spawn`.
1023 pub struct Execute<S: Stream> {
1024 inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>,
1025 cancel_rx: oneshot::Receiver<()>,
1026 }
1027
1028 /// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
1029 /// returning a handle representing the remote stream.
1030 ///
1031 /// The `stream` will be canceled if the `SpawnHandle` is dropped.
1032 ///
1033 /// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
1034 /// When `stream` has additional items available, then the `SpawnHandle`
1035 /// will have those same items available.
1036 ///
1037 /// At most `buffer + 1` elements will be buffered at a time. If the buffer
1038 /// is full, then `stream` will stop progressing until more space is available.
1039 /// This allows the `SpawnHandle` to exert backpressure on the `stream`.
1040 ///
1041 /// # Panics
1042 ///
1043 /// This function will panic if `executor` is unable spawn a `Future` containing
1044 /// the entirety of the `stream`.
spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error> where S: Stream, E: Executor<Execute<S>>1045 pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error>
1046 where S: Stream,
1047 E: Executor<Execute<S>>
1048 {
1049 let (cancel_tx, cancel_rx) = oneshot::channel();
1050 let (tx, rx) = channel(buffer);
1051 executor.execute(Execute {
1052 inner: tx.send_all(resultstream::new(stream)),
1053 cancel_rx: cancel_rx,
1054 }).expect("failed to spawn stream");
1055 SpawnHandle {
1056 rx: rx,
1057 _cancel_tx: cancel_tx,
1058 }
1059 }
1060
1061 /// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
1062 /// returning a handle representing the remote stream, with unbounded buffering.
1063 ///
1064 /// The `stream` will be canceled if the `SpawnHandle` is dropped.
1065 ///
1066 /// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
1067 /// When `stream` has additional items available, then the `SpawnHandle`
1068 /// will have those same items available.
1069 ///
1070 /// An unbounded buffer is used, which means that values will be buffered as
1071 /// fast as `stream` can produce them, without any backpressure. Therefore, if
1072 /// `stream` is an infinite stream, it can use an unbounded amount of memory, and
1073 /// potentially hog CPU resources.
1074 ///
1075 /// # Panics
1076 ///
1077 /// This function will panic if `executor` is unable spawn a `Future` containing
1078 /// the entirety of the `stream`.
spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error> where S: Stream, E: Executor<Execute<S>>1079 pub fn spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error>
1080 where S: Stream,
1081 E: Executor<Execute<S>>
1082 {
1083 let (cancel_tx, cancel_rx) = oneshot::channel();
1084 let (tx, rx) = channel2(None);
1085 executor.execute(Execute {
1086 inner: tx.send_all(resultstream::new(stream)),
1087 cancel_rx: cancel_rx,
1088 }).expect("failed to spawn stream");
1089 SpawnHandle {
1090 rx: rx,
1091 _cancel_tx: cancel_tx,
1092 }
1093 }
1094
1095 impl<I, E> Stream for SpawnHandle<I, E> {
1096 type Item = I;
1097 type Error = E;
1098
poll(&mut self) -> Poll<Option<I>, E>1099 fn poll(&mut self) -> Poll<Option<I>, E> {
1100 match self.rx.poll() {
1101 Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))),
1102 Ok(Async::Ready(Some(Err(e)))) => Err(e),
1103 Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
1104 Ok(Async::NotReady) => Ok(Async::NotReady),
1105 Err(_) => unreachable!("mpsc::Receiver should never return Err"),
1106 }
1107 }
1108 }
1109
1110 impl<I, E> fmt::Debug for SpawnHandle<I, E> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1111 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1112 f.debug_struct("SpawnHandle")
1113 .finish()
1114 }
1115 }
1116
1117 impl<S: Stream> Future for Execute<S> {
1118 type Item = ();
1119 type Error = ();
1120
poll(&mut self) -> Poll<(), ()>1121 fn poll(&mut self) -> Poll<(), ()> {
1122 match self.cancel_rx.poll() {
1123 Ok(Async::NotReady) => (),
1124 _ => return Ok(Async::Ready(())),
1125 }
1126 match self.inner.poll() {
1127 Ok(Async::NotReady) => Ok(Async::NotReady),
1128 _ => Ok(Async::Ready(()))
1129 }
1130 }
1131 }
1132
1133 impl<S: Stream> fmt::Debug for Execute<S> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1134 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1135 f.debug_struct("Execute")
1136 .finish()
1137 }
1138 }
1139
1140 /*
1141 *
1142 * ===== impl Inner =====
1143 *
1144 */
1145
1146 impl<T> Inner<T> {
1147 // The return value is such that the total number of messages that can be
1148 // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1149 fn max_senders(&self) -> usize {
1150 match self.buffer {
1151 Some(buffer) => MAX_CAPACITY - buffer,
1152 None => MAX_BUFFER,
1153 }
1154 }
1155 }
1156
1157 unsafe impl<T: Send> Send for Inner<T> {}
1158 unsafe impl<T: Send> Sync for Inner<T> {}
1159
1160 impl State {
is_closed(&self) -> bool1161 fn is_closed(&self) -> bool {
1162 !self.is_open && self.num_messages == 0
1163 }
1164 }
1165
1166 /*
1167 *
1168 * ===== Helpers =====
1169 *
1170 */
1171
decode_state(num: usize) -> State1172 fn decode_state(num: usize) -> State {
1173 State {
1174 is_open: num & OPEN_MASK == OPEN_MASK,
1175 num_messages: num & MAX_CAPACITY,
1176 }
1177 }
1178
encode_state(state: &State) -> usize1179 fn encode_state(state: &State) -> usize {
1180 let mut num = state.num_messages;
1181
1182 if state.is_open {
1183 num |= OPEN_MASK;
1184 }
1185
1186 num
1187 }
1188