1 //! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
2 //!
3 //! A channel can be used as a communication primitive between tasks running on
4 //! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
5 //! handles. `Receiver` implements `Stream` and allows a task to read values
6 //! out of the channel. If there is no message to read from the channel, the
7 //! current task will be notified when a new value is sent. `Sender` implements
8 //! the `Sink` trait and allows a task to send messages into the channel. If
9 //! the channel is at capacity, then send will be rejected and the task will be
10 //! notified when additional capacity is available.
11 //!
12 //! # Disconnection
13 //!
14 //! When all `Sender` handles have been dropped, it is no longer possible to
15 //! send values into the channel. This is considered the termination event of
16 //! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
17 //!
18 //! If the receiver handle is dropped, then messages can no longer be read out
19 //! of the channel. In this case, a `send` will result in an error.
20 //!
21 //! # Clean Shutdown
22 //!
23 //! If the `Receiver` is simply dropped, then it is possible for there to be
24 //! messages still in the channel that will not be processed. As such, it is
25 //! usually desirable to perform a "clean" shutdown. To do this, the receiver
26 //! will first call `close`, which will prevent any further messages to be sent
27 //! into the channel. Then, the receiver consumes the channel to completion, at
28 //! which point the receiver can be dropped.
29 
30 // At the core, the channel uses an atomic FIFO queue for message passing. This
31 // queue is used as the primary coordination primitive. In order to enforce
32 // capacity limits and handle back pressure, a secondary FIFO queue is used to
33 // send parked task handles.
34 //
35 // The general idea is that the channel is created with a `buffer` size of `n`.
36 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
37 // slot to hold a message. This allows `Sender` to know for a fact that a send
38 // will succeed *before* starting to do the actual work of sending the value.
39 // Since most of this work is lock-free, once the work starts, it is impossible
40 // to safely revert.
41 //
42 // If the sender is unable to process a send operation, then the current
43 // task is parked and the handle is sent on the parked task queue.
44 //
45 // Note that the implementation guarantees that the channel capacity will never
46 // exceed the configured limit, however there is no *strict* guarantee that the
47 // receiver will wake up a parked task *immediately* when a slot becomes
48 // available. However, it will almost always unpark a task when a slot becomes
49 // available and it is *guaranteed* that a sender will be unparked when the
50 // message that caused the sender to become parked is read out of the channel.
51 //
52 // The steps for sending a message are roughly:
53 //
54 // 1) Increment the channel message count
55 // 2) If the channel is at capacity, push the task handle onto the wait queue
56 // 3) Push the message onto the message queue.
57 //
58 // The steps for receiving a message are roughly:
59 //
60 // 1) Pop a message from the message queue
61 // 2) Pop a task handle from the wait queue
62 // 3) Decrement the channel message count.
63 //
64 // It's important for the order of operations on lock-free structures to happen
65 // in reverse order between the sender and receiver. This makes the message
66 // queue the primary coordination structure and establishes the necessary
67 // happens-before semantics required for the acquire / release semantics used
68 // by the queue structure.
69 
70 use std::fmt;
71 use std::error::Error;
72 use std::any::Any;
73 use std::sync::atomic::AtomicUsize;
74 use std::sync::atomic::Ordering::SeqCst;
75 use std::sync::{Arc, Mutex};
76 use std::thread;
77 use std::usize;
78 
79 use sync::mpsc::queue::{Queue, PopResult};
80 use sync::oneshot;
81 use task::{self, Task};
82 use future::Executor;
83 use sink::SendAll;
84 use resultstream::{self, Results};
85 use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
86 
87 mod queue;
88 
89 /// The transmission end of a channel which is used to send values.
90 ///
91 /// This is created by the `channel` method.
92 #[derive(Debug)]
93 pub struct Sender<T> {
94     // Channel state shared between the sender and receiver.
95     inner: Arc<Inner<T>>,
96 
97     // Handle to the task that is blocked on this sender. This handle is sent
98     // to the receiver half in order to be notified when the sender becomes
99     // unblocked.
100     sender_task: Arc<Mutex<SenderTask>>,
101 
102     // True if the sender might be blocked. This is an optimization to avoid
103     // having to lock the mutex most of the time.
104     maybe_parked: bool,
105 }
106 
107 /// The transmission end of a channel which is used to send values.
108 ///
109 /// This is created by the `unbounded` method.
110 #[derive(Debug)]
111 pub struct UnboundedSender<T>(Sender<T>);
112 
113 trait AssertKinds: Send + Sync + Clone {}
114 impl AssertKinds for UnboundedSender<u32> {}
115 
116 
117 /// The receiving end of a channel which implements the `Stream` trait.
118 ///
119 /// This is a concrete implementation of a stream which can be used to represent
120 /// a stream of values being computed elsewhere. This is created by the
121 /// `channel` method.
122 #[derive(Debug)]
123 pub struct Receiver<T> {
124     inner: Arc<Inner<T>>,
125 }
126 
127 /// The receiving end of a channel which implements the `Stream` trait.
128 ///
129 /// This is a concrete implementation of a stream which can be used to represent
130 /// a stream of values being computed elsewhere. This is created by the
131 /// `unbounded` method.
132 #[derive(Debug)]
133 pub struct UnboundedReceiver<T>(Receiver<T>);
134 
135 /// Error type for sending, used when the receiving end of a channel is
136 /// dropped
137 #[derive(Clone, PartialEq, Eq)]
138 pub struct SendError<T>(T);
139 
140 /// Error type returned from `try_send`
141 #[derive(Clone, PartialEq, Eq)]
142 pub struct TrySendError<T> {
143     kind: TrySendErrorKind<T>,
144 }
145 
146 #[derive(Clone, PartialEq, Eq)]
147 enum TrySendErrorKind<T> {
148     Full(T),
149     Disconnected(T),
150 }
151 
152 impl<T> fmt::Debug for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result153     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
154         fmt.debug_tuple("SendError")
155             .field(&"...")
156             .finish()
157     }
158 }
159 
160 impl<T> fmt::Display for SendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result161     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
162         write!(fmt, "send failed because receiver is gone")
163     }
164 }
165 
166 impl<T: Any> Error for SendError<T>
167 {
description(&self) -> &str168     fn description(&self) -> &str {
169         "send failed because receiver is gone"
170     }
171 }
172 
173 impl<T> SendError<T> {
174     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T175     pub fn into_inner(self) -> T {
176         self.0
177     }
178 }
179 
180 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result181     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
182         fmt.debug_tuple("TrySendError")
183             .field(&"...")
184             .finish()
185     }
186 }
187 
188 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result189     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
190         if self.is_full() {
191             write!(fmt, "send failed because channel is full")
192         } else {
193             write!(fmt, "send failed because receiver is gone")
194         }
195     }
196 }
197 
198 impl<T: Any> Error for TrySendError<T> {
description(&self) -> &str199     fn description(&self) -> &str {
200         if self.is_full() {
201             "send failed because channel is full"
202         } else {
203             "send failed because receiver is gone"
204         }
205     }
206 }
207 
208 impl<T> TrySendError<T> {
209     /// Returns true if this error is a result of the channel being full
is_full(&self) -> bool210     pub fn is_full(&self) -> bool {
211         use self::TrySendErrorKind::*;
212 
213         match self.kind {
214             Full(_) => true,
215             _ => false,
216         }
217     }
218 
219     /// Returns true if this error is a result of the receiver being dropped
is_disconnected(&self) -> bool220     pub fn is_disconnected(&self) -> bool {
221         use self::TrySendErrorKind::*;
222 
223         match self.kind {
224             Disconnected(_) => true,
225             _ => false,
226         }
227     }
228 
229     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T230     pub fn into_inner(self) -> T {
231         use self::TrySendErrorKind::*;
232 
233         match self.kind {
234             Full(v) | Disconnected(v) => v,
235         }
236     }
237 }
238 
239 #[derive(Debug)]
240 struct Inner<T> {
241     // Max buffer size of the channel. If `None` then the channel is unbounded.
242     buffer: Option<usize>,
243 
244     // Internal channel state. Consists of the number of messages stored in the
245     // channel as well as a flag signalling that the channel is closed.
246     state: AtomicUsize,
247 
248     // Atomic, FIFO queue used to send messages to the receiver
249     message_queue: Queue<Option<T>>,
250 
251     // Atomic, FIFO queue used to send parked task handles to the receiver.
252     parked_queue: Queue<Arc<Mutex<SenderTask>>>,
253 
254     // Number of senders in existence
255     num_senders: AtomicUsize,
256 
257     // Handle to the receiver's task.
258     recv_task: Mutex<ReceiverTask>,
259 }
260 
261 // Struct representation of `Inner::state`.
262 #[derive(Debug, Clone, Copy)]
263 struct State {
264     // `true` when the channel is open
265     is_open: bool,
266 
267     // Number of messages in the channel
268     num_messages: usize,
269 }
270 
271 #[derive(Debug)]
272 struct ReceiverTask {
273     unparked: bool,
274     task: Option<Task>,
275 }
276 
277 // Returned from Receiver::try_park()
278 enum TryPark {
279     Parked,
280     Closed,
281     NotEmpty,
282 }
283 
284 // The `is_open` flag is stored in the left-most bit of `Inner::state`
285 const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
286 
287 // When a new channel is created, it is created in the open state with no
288 // pending messages.
289 const INIT_STATE: usize = OPEN_MASK;
290 
291 // The maximum number of messages that a channel can track is `usize::MAX >> 1`
292 const MAX_CAPACITY: usize = !(OPEN_MASK);
293 
294 // The maximum requested buffer size must be less than the maximum capacity of
295 // a channel. This is because each sender gets a guaranteed slot.
296 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
297 
298 // Sent to the consumer to wake up blocked producers
299 #[derive(Debug)]
300 struct SenderTask {
301     task: Option<Task>,
302     is_parked: bool,
303 }
304 
305 impl SenderTask {
new() -> Self306     fn new() -> Self {
307         SenderTask {
308             task: None,
309             is_parked: false,
310         }
311     }
312 
notify(&mut self)313     fn notify(&mut self) {
314         self.is_parked = false;
315 
316         if let Some(task) = self.task.take() {
317             task.notify();
318         }
319     }
320 }
321 
322 /// Creates an in-memory channel implementation of the `Stream` trait with
323 /// bounded capacity.
324 ///
325 /// This method creates a concrete implementation of the `Stream` trait which
326 /// can be used to send values across threads in a streaming fashion. This
327 /// channel is unique in that it implements back pressure to ensure that the
328 /// sender never outpaces the receiver. The channel capacity is equal to
329 /// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
330 /// in the channel capacity, and on top of that there are `buffer` "first come,
331 /// first serve" slots available to all senders.
332 ///
333 /// The `Receiver` returned implements the `Stream` trait and has access to any
334 /// number of the associated combinators for transforming the result.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)335 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
336     // Check that the requested buffer size does not exceed the maximum buffer
337     // size permitted by the system.
338     assert!(buffer < MAX_BUFFER, "requested buffer size too large");
339     channel2(Some(buffer))
340 }
341 
342 /// Creates an in-memory channel implementation of the `Stream` trait with
343 /// unbounded capacity.
344 ///
345 /// This method creates a concrete implementation of the `Stream` trait which
346 /// can be used to send values across threads in a streaming fashion. A `send`
347 /// on this channel will always succeed as long as the receive half has not
348 /// been closed. If the receiver falls behind, messages will be buffered
349 /// internally.
350 ///
351 /// **Note** that the amount of available system memory is an implicit bound to
352 /// the channel. Using an `unbounded` channel has the ability of causing the
353 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)354 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
355     let (tx, rx) = channel2(None);
356     (UnboundedSender(tx), UnboundedReceiver(rx))
357 }
358 
channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>)359 fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
360     let inner = Arc::new(Inner {
361         buffer: buffer,
362         state: AtomicUsize::new(INIT_STATE),
363         message_queue: Queue::new(),
364         parked_queue: Queue::new(),
365         num_senders: AtomicUsize::new(1),
366         recv_task: Mutex::new(ReceiverTask {
367             unparked: false,
368             task: None,
369         }),
370     });
371 
372     let tx = Sender {
373         inner: inner.clone(),
374         sender_task: Arc::new(Mutex::new(SenderTask::new())),
375         maybe_parked: false,
376     };
377 
378     let rx = Receiver {
379         inner: inner,
380     };
381 
382     (tx, rx)
383 }
384 
385 /*
386  *
387  * ===== impl Sender =====
388  *
389  */
390 
391 impl<T> Sender<T> {
392     /// Attempts to send a message on this `Sender<T>` without blocking.
393     ///
394     /// This function, unlike `start_send`, is safe to call whether it's being
395     /// called on a task or not. Note that this function, however, will *not*
396     /// attempt to block the current task if the message cannot be sent.
397     ///
398     /// It is not recommended to call this function from inside of a future,
399     /// only from an external thread where you've otherwise arranged to be
400     /// notified when the channel is no longer full.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>401     pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
402         // If the sender is currently blocked, reject the message
403         if !self.poll_unparked(false).is_ready() {
404             return Err(TrySendError {
405                 kind: TrySendErrorKind::Full(msg),
406             });
407         }
408 
409         // The channel has capacity to accept the message, so send it
410         self.do_send(Some(msg), false)
411             .map_err(|SendError(v)| {
412                 TrySendError {
413                     kind: TrySendErrorKind::Disconnected(v),
414                 }
415             })
416     }
417 
418     // Do the send without failing
419     // None means close
do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>>420     fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
421         // First, increment the number of messages contained by the channel.
422         // This operation will also atomically determine if the sender task
423         // should be parked.
424         //
425         // None is returned in the case that the channel has been closed by the
426         // receiver. This happens when `Receiver::close` is called or the
427         // receiver is dropped.
428         let park_self = match self.inc_num_messages(msg.is_none()) {
429             Some(park_self) => park_self,
430             None => {
431                 // The receiver has closed the channel. Only abort if actually
432                 // sending a message. It is important that the stream
433                 // termination (None) is always sent. This technically means
434                 // that it is possible for the queue to contain the following
435                 // number of messages:
436                 //
437                 //     num-senders + buffer + 1
438                 //
439                 if let Some(msg) = msg {
440                     return Err(SendError(msg));
441                 } else {
442                     return Ok(());
443                 }
444             }
445         };
446 
447         // If the channel has reached capacity, then the sender task needs to
448         // be parked. This will send the task handle on the parked task queue.
449         //
450         // However, when `do_send` is called while dropping the `Sender`,
451         // `task::current()` can't be called safely. In this case, in order to
452         // maintain internal consistency, a blank message is pushed onto the
453         // parked task queue.
454         if park_self {
455             self.park(do_park);
456         }
457 
458         self.queue_push_and_signal(msg);
459 
460         Ok(())
461     }
462 
463     // Do the send without parking current task.
464     //
465     // To be called from unbounded sender.
do_send_nb(&self, msg: T) -> Result<(), SendError<T>>466     fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
467         match self.inc_num_messages(false) {
468             Some(park_self) => assert!(!park_self),
469             None => return Err(SendError(msg)),
470         };
471 
472         self.queue_push_and_signal(Some(msg));
473 
474         Ok(())
475     }
476 
477     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: Option<T>)478     fn queue_push_and_signal(&self, msg: Option<T>) {
479         // Push the message onto the message queue
480         self.inner.message_queue.push(msg);
481 
482         // Signal to the receiver that a message has been enqueued. If the
483         // receiver is parked, this will unpark the task.
484         self.signal();
485     }
486 
487     // Increment the number of queued messages. Returns if the sender should
488     // block.
inc_num_messages(&self, close: bool) -> Option<bool>489     fn inc_num_messages(&self, close: bool) -> Option<bool> {
490         let mut curr = self.inner.state.load(SeqCst);
491 
492         loop {
493             let mut state = decode_state(curr);
494 
495             // The receiver end closed the channel.
496             if !state.is_open {
497                 return None;
498             }
499 
500             // This probably is never hit? Odds are the process will run out of
501             // memory first. It may be worth to return something else in this
502             // case?
503             assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
504                     sending this messages would overflow the state");
505 
506             state.num_messages += 1;
507 
508             // The channel is closed by all sender handles being dropped.
509             if close {
510                 state.is_open = false;
511             }
512 
513             let next = encode_state(&state);
514             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
515                 Ok(_) => {
516                     // Block if the current number of pending messages has exceeded
517                     // the configured buffer size
518                     let park_self = match self.inner.buffer {
519                         Some(buffer) => state.num_messages > buffer,
520                         None => false,
521                     };
522 
523                     return Some(park_self)
524                 }
525                 Err(actual) => curr = actual,
526             }
527         }
528     }
529 
530     // Signal to the receiver task that a message has been enqueued
signal(&self)531     fn signal(&self) {
532         // TODO
533         // This logic can probably be improved by guarding the lock with an
534         // atomic.
535         //
536         // Do this step first so that the lock is dropped when
537         // `unpark` is called
538         let task = {
539             let mut recv_task = self.inner.recv_task.lock().unwrap();
540 
541             // If the receiver has already been unparked, then there is nothing
542             // more to do
543             if recv_task.unparked {
544                 return;
545             }
546 
547             // Setting this flag enables the receiving end to detect that
548             // an unpark event happened in order to avoid unnecessarily
549             // parking.
550             recv_task.unparked = true;
551             recv_task.task.take()
552         };
553 
554         if let Some(task) = task {
555             task.notify();
556         }
557     }
558 
park(&mut self, can_park: bool)559     fn park(&mut self, can_park: bool) {
560         // TODO: clean up internal state if the task::current will fail
561 
562         let task = if can_park {
563             Some(task::current())
564         } else {
565             None
566         };
567 
568         {
569             let mut sender = self.sender_task.lock().unwrap();
570             sender.task = task;
571             sender.is_parked = true;
572         }
573 
574         // Send handle over queue
575         let t = self.sender_task.clone();
576         self.inner.parked_queue.push(t);
577 
578         // Check to make sure we weren't closed after we sent our task on the
579         // queue
580         let state = decode_state(self.inner.state.load(SeqCst));
581         self.maybe_parked = state.is_open;
582     }
583 
584     /// Polls the channel to determine if there is guaranteed to be capacity to send at least one
585     /// item without waiting.
586     ///
587     /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
588     /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns
589     /// `Err(SendError(_))` if the receiver has been dropped.
590     ///
591     /// # Panics
592     ///
593     /// This method will panic if called from outside the context of a task or future.
poll_ready(&mut self) -> Poll<(), SendError<()>>594     pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
595         let state = decode_state(self.inner.state.load(SeqCst));
596         if !state.is_open {
597             return Err(SendError(()));
598         }
599 
600         Ok(self.poll_unparked(true))
601     }
602 
603     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool604     pub fn is_closed(&self) -> bool {
605         !decode_state(self.inner.state.load(SeqCst)).is_open
606     }
607 
poll_unparked(&mut self, do_park: bool) -> Async<()>608     fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
609         // First check the `maybe_parked` variable. This avoids acquiring the
610         // lock in most cases
611         if self.maybe_parked {
612             // Get a lock on the task handle
613             let mut task = self.sender_task.lock().unwrap();
614 
615             if !task.is_parked {
616                 self.maybe_parked = false;
617                 return Async::Ready(())
618             }
619 
620             // At this point, an unpark request is pending, so there will be an
621             // unpark sometime in the future. We just need to make sure that
622             // the correct task will be notified.
623             //
624             // Update the task in case the `Sender` has been moved to another
625             // task
626             task.task = if do_park {
627                 Some(task::current())
628             } else {
629                 None
630             };
631 
632             Async::NotReady
633         } else {
634             Async::Ready(())
635         }
636     }
637 }
638 
639 impl<T> Sink for Sender<T> {
640     type SinkItem = T;
641     type SinkError = SendError<T>;
642 
start_send(&mut self, msg: T) -> StartSend<T, SendError<T>>643     fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
644         // If the sender is currently blocked, reject the message before doing
645         // any work.
646         if !self.poll_unparked(true).is_ready() {
647             return Ok(AsyncSink::NotReady(msg));
648         }
649 
650         // The channel has capacity to accept the message, so send it.
651         self.do_send(Some(msg), true)?;
652 
653         Ok(AsyncSink::Ready)
654     }
655 
poll_complete(&mut self) -> Poll<(), SendError<T>>656     fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
657         self.poll_ready()
658             // At this point, the value cannot be returned and `SendError`
659             // cannot be created with a `T` without breaking backwards
660             // comptibility. This means we cannot return an error.
661             //
662             // That said, there is also no guarantee that a `poll_complete`
663             // returning `Ok` implies the receiver sees the message.
664             .or_else(|_| Ok(().into()))
665     }
666 
close(&mut self) -> Poll<(), SendError<T>>667     fn close(&mut self) -> Poll<(), SendError<T>> {
668         Ok(Async::Ready(()))
669     }
670 }
671 
672 impl<T> UnboundedSender<T> {
673     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool674     pub fn is_closed(&self) -> bool {
675         self.0.is_closed()
676     }
677 
678     /// Sends the provided message along this channel.
679     ///
680     /// This is an unbounded sender, so this function differs from `Sink::send`
681     /// by ensuring the return type reflects that the channel is always ready to
682     /// receive messages.
683     #[deprecated(note = "renamed to `unbounded_send`")]
684     #[doc(hidden)]
send(&self, msg: T) -> Result<(), SendError<T>>685     pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
686         self.unbounded_send(msg)
687     }
688 
689     /// Sends the provided message along this channel.
690     ///
691     /// This is an unbounded sender, so this function differs from `Sink::send`
692     /// by ensuring the return type reflects that the channel is always ready to
693     /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), SendError<T>>694     pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
695         self.0.do_send_nb(msg)
696     }
697 }
698 
699 impl<T> Sink for UnboundedSender<T> {
700     type SinkItem = T;
701     type SinkError = SendError<T>;
702 
start_send(&mut self, msg: T) -> StartSend<T, SendError<T>>703     fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
704         self.0.start_send(msg)
705     }
706 
poll_complete(&mut self) -> Poll<(), SendError<T>>707     fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
708         self.0.poll_complete()
709     }
710 
close(&mut self) -> Poll<(), SendError<T>>711     fn close(&mut self) -> Poll<(), SendError<T>> {
712         Ok(Async::Ready(()))
713     }
714 }
715 
716 impl<'a, T> Sink for &'a UnboundedSender<T> {
717     type SinkItem = T;
718     type SinkError = SendError<T>;
719 
start_send(&mut self, msg: T) -> StartSend<T, SendError<T>>720     fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
721         self.0.do_send_nb(msg)?;
722         Ok(AsyncSink::Ready)
723     }
724 
poll_complete(&mut self) -> Poll<(), SendError<T>>725     fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
726         Ok(Async::Ready(()))
727     }
728 
close(&mut self) -> Poll<(), SendError<T>>729     fn close(&mut self) -> Poll<(), SendError<T>> {
730         Ok(Async::Ready(()))
731     }
732 }
733 
734 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> UnboundedSender<T>735     fn clone(&self) -> UnboundedSender<T> {
736         UnboundedSender(self.0.clone())
737     }
738 }
739 
740 
741 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>742     fn clone(&self) -> Sender<T> {
743         // Since this atomic op isn't actually guarding any memory and we don't
744         // care about any orderings besides the ordering on the single atomic
745         // variable, a relaxed ordering is acceptable.
746         let mut curr = self.inner.num_senders.load(SeqCst);
747 
748         loop {
749             // If the maximum number of senders has been reached, then fail
750             if curr == self.inner.max_senders() {
751                 panic!("cannot clone `Sender` -- too many outstanding senders");
752             }
753 
754             debug_assert!(curr < self.inner.max_senders());
755 
756             let next = curr + 1;
757             let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
758 
759             // The ABA problem doesn't matter here. We only care that the
760             // number of senders never exceeds the maximum.
761             if actual == curr {
762                 return Sender {
763                     inner: self.inner.clone(),
764                     sender_task: Arc::new(Mutex::new(SenderTask::new())),
765                     maybe_parked: false,
766                 };
767             }
768 
769             curr = actual;
770         }
771     }
772 }
773 
774 impl<T> Drop for Sender<T> {
drop(&mut self)775     fn drop(&mut self) {
776         // Ordering between variables don't matter here
777         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
778 
779         if prev == 1 {
780             let _ = self.do_send(None, false);
781         }
782     }
783 }
784 
785 /*
786  *
787  * ===== impl Receiver =====
788  *
789  */
790 
791 impl<T> Receiver<T> {
792     /// Closes the receiving half
793     ///
794     /// This prevents any further messages from being sent on the channel while
795     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)796     pub fn close(&mut self) {
797         let mut curr = self.inner.state.load(SeqCst);
798 
799         loop {
800             let mut state = decode_state(curr);
801 
802             if !state.is_open {
803                 break
804             }
805 
806             state.is_open = false;
807 
808             let next = encode_state(&state);
809             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
810                 Ok(_) => break,
811                 Err(actual) => curr = actual,
812             }
813         }
814 
815         // Wake up any threads waiting as they'll see that we've closed the
816         // channel and will continue on their merry way.
817         loop {
818             match unsafe { self.inner.parked_queue.pop() } {
819                 PopResult::Data(task) => {
820                     task.lock().unwrap().notify();
821                 }
822                 PopResult::Empty => break,
823                 PopResult::Inconsistent => thread::yield_now(),
824             }
825         }
826     }
827 
next_message(&mut self) -> Async<Option<T>>828     fn next_message(&mut self) -> Async<Option<T>> {
829         // Pop off a message
830         loop {
831             match unsafe { self.inner.message_queue.pop() } {
832                 PopResult::Data(msg) => {
833                     // If there are any parked task handles in the parked queue,
834                     // pop one and unpark it.
835                     self.unpark_one();
836                     // Decrement number of messages
837                     self.dec_num_messages();
838 
839                     return Async::Ready(msg);
840                 }
841                 PopResult::Empty => {
842                     // The queue is empty, return NotReady
843                     return Async::NotReady;
844                 }
845                 PopResult::Inconsistent => {
846                     // Inconsistent means that there will be a message to pop
847                     // in a short time. This branch can only be reached if
848                     // values are being produced from another thread, so there
849                     // are a few ways that we can deal with this:
850                     //
851                     // 1) Spin
852                     // 2) thread::yield_now()
853                     // 3) task::current().unwrap() & return NotReady
854                     //
855                     // For now, thread::yield_now() is used, but it would
856                     // probably be better to spin a few times then yield.
857                     thread::yield_now();
858                 }
859             }
860         }
861     }
862 
863     // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)864     fn unpark_one(&mut self) {
865         loop {
866             match unsafe { self.inner.parked_queue.pop() } {
867                 PopResult::Data(task) => {
868                     task.lock().unwrap().notify();
869                     return;
870                 }
871                 PopResult::Empty => {
872                     // Queue empty, no task to wake up.
873                     return;
874                 }
875                 PopResult::Inconsistent => {
876                     // Same as above
877                     thread::yield_now();
878                 }
879             }
880         }
881     }
882 
883     // Try to park the receiver task
try_park(&self) -> TryPark884     fn try_park(&self) -> TryPark {
885         let curr = self.inner.state.load(SeqCst);
886         let state = decode_state(curr);
887 
888         // If the channel is closed, then there is no need to park.
889         if state.is_closed() {
890             return TryPark::Closed;
891         }
892 
893         // First, track the task in the `recv_task` slot
894         let mut recv_task = self.inner.recv_task.lock().unwrap();
895 
896         if recv_task.unparked {
897             // Consume the `unpark` signal without actually parking
898             recv_task.unparked = false;
899             return TryPark::NotEmpty;
900         }
901 
902         recv_task.task = Some(task::current());
903         TryPark::Parked
904     }
905 
dec_num_messages(&self)906     fn dec_num_messages(&self) {
907         let mut curr = self.inner.state.load(SeqCst);
908 
909         loop {
910             let mut state = decode_state(curr);
911 
912             state.num_messages -= 1;
913 
914             let next = encode_state(&state);
915             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
916                 Ok(_) => break,
917                 Err(actual) => curr = actual,
918             }
919         }
920     }
921 }
922 
923 impl<T> Stream for Receiver<T> {
924     type Item = T;
925     type Error = ();
926 
poll(&mut self) -> Poll<Option<T>, ()>927     fn poll(&mut self) -> Poll<Option<T>, ()> {
928         loop {
929             // Try to read a message off of the message queue.
930             match self.next_message() {
931                 Async::Ready(msg) => return Ok(Async::Ready(msg)),
932                 Async::NotReady => {
933                     // There are no messages to read, in this case, attempt to
934                     // park. The act of parking will verify that the channel is
935                     // still empty after the park operation has completed.
936                     match self.try_park() {
937                         TryPark::Parked => {
938                             // The task was parked, and the channel is still
939                             // empty, return NotReady.
940                             return Ok(Async::NotReady);
941                         }
942                         TryPark::Closed => {
943                             // The channel is closed, there will be no further
944                             // messages.
945                             return Ok(Async::Ready(None));
946                         }
947                         TryPark::NotEmpty => {
948                             // A message has been sent while attempting to
949                             // park. Loop again, the next iteration is
950                             // guaranteed to get the message.
951                             continue;
952                         }
953                     }
954                 }
955             }
956         }
957     }
958 }
959 
960 impl<T> Drop for Receiver<T> {
drop(&mut self)961     fn drop(&mut self) {
962         // Drain the channel of all pending messages
963         self.close();
964 
965         loop {
966             match self.next_message() {
967                 Async::Ready(_) => {}
968                 Async::NotReady => {
969                     let curr = self.inner.state.load(SeqCst);
970                     let state = decode_state(curr);
971 
972                     // If the channel is closed, then there is no need to park.
973                     if state.is_closed() {
974                         return;
975                     }
976 
977                     // TODO: Spinning isn't ideal, it might be worth
978                     // investigating using a condvar or some other strategy
979                     // here. That said, if this case is hit, then another thread
980                     // is about to push the value into the queue and this isn't
981                     // the only spinlock in the impl right now.
982                     thread::yield_now();
983                 }
984             }
985         }
986     }
987 }
988 
989 impl<T> UnboundedReceiver<T> {
990     /// Closes the receiving half
991     ///
992     /// This prevents any further messages from being sent on the channel while
993     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)994     pub fn close(&mut self) {
995         self.0.close();
996     }
997 }
998 
999 impl<T> Stream for UnboundedReceiver<T> {
1000     type Item = T;
1001     type Error = ();
1002 
poll(&mut self) -> Poll<Option<T>, ()>1003     fn poll(&mut self) -> Poll<Option<T>, ()> {
1004         self.0.poll()
1005     }
1006 }
1007 
1008 /// Handle returned from the `spawn` function.
1009 ///
1010 /// This handle is a stream that proxies a stream on a separate `Executor`.
1011 /// Created through the `mpsc::spawn` function, this handle will produce
1012 /// the same values as the proxied stream, as they are produced in the executor,
1013 /// and uses a limited buffer to exert back-pressure on the remote stream.
1014 ///
1015 /// If this handle is dropped, then the stream will no longer be polled and is
1016 /// scheduled to be dropped.
1017 pub struct SpawnHandle<Item, Error> {
1018     rx: Receiver<Result<Item, Error>>,
1019     _cancel_tx: oneshot::Sender<()>,
1020 }
1021 
1022 /// Type of future which `Executor` instances must be able to execute for `spawn`.
1023 pub struct Execute<S: Stream> {
1024     inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>,
1025     cancel_rx: oneshot::Receiver<()>,
1026 }
1027 
1028 /// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
1029 /// returning a handle representing the remote stream.
1030 ///
1031 /// The `stream` will be canceled if the `SpawnHandle` is dropped.
1032 ///
1033 /// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
1034 /// When `stream` has additional items available, then the `SpawnHandle`
1035 /// will have those same items available.
1036 ///
1037 /// At most `buffer + 1` elements will be buffered at a time. If the buffer
1038 /// is full, then `stream` will stop progressing until more space is available.
1039 /// This allows the `SpawnHandle` to exert backpressure on the `stream`.
1040 ///
1041 /// # Panics
1042 ///
1043 /// This function will panic if `executor` is unable spawn a `Future` containing
1044 /// the entirety of the `stream`.
spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error> where S: Stream, E: Executor<Execute<S>>1045 pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error>
1046     where S: Stream,
1047           E: Executor<Execute<S>>
1048 {
1049     let (cancel_tx, cancel_rx) = oneshot::channel();
1050     let (tx, rx) = channel(buffer);
1051     executor.execute(Execute {
1052         inner: tx.send_all(resultstream::new(stream)),
1053         cancel_rx: cancel_rx,
1054     }).expect("failed to spawn stream");
1055     SpawnHandle {
1056         rx: rx,
1057         _cancel_tx: cancel_tx,
1058     }
1059 }
1060 
1061 /// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
1062 /// returning a handle representing the remote stream, with unbounded buffering.
1063 ///
1064 /// The `stream` will be canceled if the `SpawnHandle` is dropped.
1065 ///
1066 /// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
1067 /// When `stream` has additional items available, then the `SpawnHandle`
1068 /// will have those same items available.
1069 ///
1070 /// An unbounded buffer is used, which means that values will be buffered as
1071 /// fast as `stream` can produce them, without any backpressure. Therefore, if
1072 /// `stream` is an infinite stream, it can use an unbounded amount of memory, and
1073 /// potentially hog CPU resources.
1074 ///
1075 /// # Panics
1076 ///
1077 /// This function will panic if `executor` is unable spawn a `Future` containing
1078 /// the entirety of the `stream`.
spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error> where S: Stream, E: Executor<Execute<S>>1079 pub fn spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error>
1080     where S: Stream,
1081           E: Executor<Execute<S>>
1082 {
1083     let (cancel_tx, cancel_rx) = oneshot::channel();
1084     let (tx, rx) = channel2(None);
1085     executor.execute(Execute {
1086         inner: tx.send_all(resultstream::new(stream)),
1087         cancel_rx: cancel_rx,
1088     }).expect("failed to spawn stream");
1089     SpawnHandle {
1090         rx: rx,
1091         _cancel_tx: cancel_tx,
1092     }
1093 }
1094 
1095 impl<I, E> Stream for SpawnHandle<I, E> {
1096     type Item = I;
1097     type Error = E;
1098 
poll(&mut self) -> Poll<Option<I>, E>1099     fn poll(&mut self) -> Poll<Option<I>, E> {
1100         match self.rx.poll() {
1101             Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))),
1102             Ok(Async::Ready(Some(Err(e)))) => Err(e),
1103             Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
1104             Ok(Async::NotReady) => Ok(Async::NotReady),
1105             Err(_) => unreachable!("mpsc::Receiver should never return Err"),
1106         }
1107     }
1108 }
1109 
1110 impl<I, E> fmt::Debug for SpawnHandle<I, E> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1111     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1112         f.debug_struct("SpawnHandle")
1113          .finish()
1114     }
1115 }
1116 
1117 impl<S: Stream> Future for Execute<S> {
1118     type Item = ();
1119     type Error = ();
1120 
poll(&mut self) -> Poll<(), ()>1121     fn poll(&mut self) -> Poll<(), ()> {
1122         match self.cancel_rx.poll() {
1123             Ok(Async::NotReady) => (),
1124             _ => return Ok(Async::Ready(())),
1125         }
1126         match self.inner.poll() {
1127             Ok(Async::NotReady) => Ok(Async::NotReady),
1128             _ => Ok(Async::Ready(()))
1129         }
1130     }
1131 }
1132 
1133 impl<S: Stream> fmt::Debug for Execute<S> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1134     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1135         f.debug_struct("Execute")
1136          .finish()
1137     }
1138 }
1139 
1140 /*
1141  *
1142  * ===== impl Inner =====
1143  *
1144  */
1145 
1146 impl<T> Inner<T> {
1147     // The return value is such that the total number of messages that can be
1148     // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1149     fn max_senders(&self) -> usize {
1150         match self.buffer {
1151             Some(buffer) => MAX_CAPACITY - buffer,
1152             None => MAX_BUFFER,
1153         }
1154     }
1155 }
1156 
1157 unsafe impl<T: Send> Send for Inner<T> {}
1158 unsafe impl<T: Send> Sync for Inner<T> {}
1159 
1160 impl State {
is_closed(&self) -> bool1161     fn is_closed(&self) -> bool {
1162         !self.is_open && self.num_messages == 0
1163     }
1164 }
1165 
1166 /*
1167  *
1168  * ===== Helpers =====
1169  *
1170  */
1171 
decode_state(num: usize) -> State1172 fn decode_state(num: usize) -> State {
1173     State {
1174         is_open: num & OPEN_MASK == OPEN_MASK,
1175         num_messages: num & MAX_CAPACITY,
1176     }
1177 }
1178 
encode_state(state: &State) -> usize1179 fn encode_state(state: &State) -> usize {
1180     let mut num = state.num_messages;
1181 
1182     if state.is_open {
1183         num |= OPEN_MASK;
1184     }
1185 
1186     num
1187 }
1188