1 //! A channel for sending a single message between asynchronous tasks.
2 //!
3 //! This is a single-producer, single-consumer channel.
4 
5 use alloc::sync::Arc;
6 use core::fmt;
7 use core::pin::Pin;
8 use core::sync::atomic::AtomicBool;
9 use core::sync::atomic::Ordering::SeqCst;
10 use futures_core::future::{FusedFuture, Future};
11 use futures_core::task::{Context, Poll, Waker};
12 
13 use crate::lock::Lock;
14 
15 /// A future for a value that will be provided by another asynchronous task.
16 ///
17 /// This is created by the [`channel`](channel) function.
18 #[must_use = "futures do nothing unless you `.await` or poll them"]
19 pub struct Receiver<T> {
20     inner: Arc<Inner<T>>,
21 }
22 
23 /// A means of transmitting a single value to another task.
24 ///
25 /// This is created by the [`channel`](channel) function.
26 pub struct Sender<T> {
27     inner: Arc<Inner<T>>,
28 }
29 
30 // The channels do not ever project Pin to the inner T
31 impl<T> Unpin for Receiver<T> {}
32 impl<T> Unpin for Sender<T> {}
33 
34 /// Internal state of the `Receiver`/`Sender` pair above. This is all used as
35 /// the internal synchronization between the two for send/recv operations.
36 struct Inner<T> {
37     /// Indicates whether this oneshot is complete yet. This is filled in both
38     /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
39     /// appropriately.
40     ///
41     /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
42     /// unlocked and ready to be inspected.
43     ///
44     /// For `Sender` if this is `true` then the oneshot has gone away and it
45     /// can return ready from `poll_canceled`.
46     complete: AtomicBool,
47 
48     /// The actual data being transferred as part of this `Receiver`. This is
49     /// filled in by `Sender::complete` and read by `Receiver::poll`.
50     ///
51     /// Note that this is protected by `Lock`, but it is in theory safe to
52     /// replace with an `UnsafeCell` as it's actually protected by `complete`
53     /// above. I wouldn't recommend doing this, however, unless someone is
54     /// supremely confident in the various atomic orderings here and there.
55     data: Lock<Option<T>>,
56 
57     /// Field to store the task which is blocked in `Receiver::poll`.
58     ///
59     /// This is filled in when a oneshot is polled but not ready yet. Note that
60     /// the `Lock` here, unlike in `data` above, is important to resolve races.
61     /// Both the `Receiver` and the `Sender` halves understand that if they
62     /// can't acquire the lock then some important interference is happening.
63     rx_task: Lock<Option<Waker>>,
64 
65     /// Like `rx_task` above, except for the task blocked in
66     /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
67     tx_task: Lock<Option<Waker>>,
68 }
69 
70 /// Creates a new one-shot channel for sending a single value across asynchronous tasks.
71 ///
72 /// The channel works for a spsc (single-producer, single-consumer) scheme.
73 ///
74 /// This function is similar to Rust's channel constructor found in the standard
75 /// library. Two halves are returned, the first of which is a `Sender` handle,
76 /// used to signal the end of a computation and provide its value. The second
77 /// half is a `Receiver` which implements the `Future` trait, resolving to the
78 /// value that was given to the `Sender` handle.
79 ///
80 /// Each half can be separately owned and sent across tasks.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// use futures::channel::oneshot;
86 /// use std::{thread, time::Duration};
87 ///
88 /// let (sender, receiver) = oneshot::channel::<i32>();
89 ///
90 /// thread::spawn(|| {
91 ///     println!("THREAD: sleeping zzz...");
92 ///     thread::sleep(Duration::from_millis(1000));
93 ///     println!("THREAD: i'm awake! sending.");
94 ///     sender.send(3).unwrap();
95 /// });
96 ///
97 /// println!("MAIN: doing some useful stuff");
98 ///
99 /// futures::executor::block_on(async {
100 ///     println!("MAIN: waiting for msg...");
101 ///     println!("MAIN: got: {:?}", receiver.await)
102 /// });
103 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)104 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
105     let inner = Arc::new(Inner::new());
106     let receiver = Receiver { inner: inner.clone() };
107     let sender = Sender { inner };
108     (sender, receiver)
109 }
110 
111 impl<T> Inner<T> {
new() -> Self112     fn new() -> Self {
113         Self {
114             complete: AtomicBool::new(false),
115             data: Lock::new(None),
116             rx_task: Lock::new(None),
117             tx_task: Lock::new(None),
118         }
119     }
120 
send(&self, t: T) -> Result<(), T>121     fn send(&self, t: T) -> Result<(), T> {
122         if self.complete.load(SeqCst) {
123             return Err(t);
124         }
125 
126         // Note that this lock acquisition may fail if the receiver
127         // is closed and sets the `complete` flag to `true`, whereupon
128         // the receiver may call `poll()`.
129         if let Some(mut slot) = self.data.try_lock() {
130             assert!(slot.is_none());
131             *slot = Some(t);
132             drop(slot);
133 
134             // If the receiver called `close()` between the check at the
135             // start of the function, and the lock being released, then
136             // the receiver may not be around to receive it, so try to
137             // pull it back out.
138             if self.complete.load(SeqCst) {
139                 // If lock acquisition fails, then receiver is actually
140                 // receiving it, so we're good.
141                 if let Some(mut slot) = self.data.try_lock() {
142                     if let Some(t) = slot.take() {
143                         return Err(t);
144                     }
145                 }
146             }
147             Ok(())
148         } else {
149             // Must have been closed
150             Err(t)
151         }
152     }
153 
poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()>154     fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
155         // Fast path up first, just read the flag and see if our other half is
156         // gone. This flag is set both in our destructor and the oneshot
157         // destructor, but our destructor hasn't run yet so if it's set then the
158         // oneshot is gone.
159         if self.complete.load(SeqCst) {
160             return Poll::Ready(());
161         }
162 
163         // If our other half is not gone then we need to park our current task
164         // and move it into the `tx_task` slot to get notified when it's
165         // actually gone.
166         //
167         // If `try_lock` fails, then the `Receiver` is in the process of using
168         // it, so we can deduce that it's now in the process of going away and
169         // hence we're canceled. If it succeeds then we just store our handle.
170         //
171         // Crucially we then check `complete` *again* before we return.
172         // While we were storing our handle inside `tx_task` the
173         // `Receiver` may have been dropped. The first thing it does is set the
174         // flag, and if it fails to acquire the lock it assumes that we'll see
175         // the flag later on. So... we then try to see the flag later on!
176         let handle = cx.waker().clone();
177         match self.tx_task.try_lock() {
178             Some(mut p) => *p = Some(handle),
179             None => return Poll::Ready(()),
180         }
181         if self.complete.load(SeqCst) {
182             Poll::Ready(())
183         } else {
184             Poll::Pending
185         }
186     }
187 
is_canceled(&self) -> bool188     fn is_canceled(&self) -> bool {
189         self.complete.load(SeqCst)
190     }
191 
drop_tx(&self)192     fn drop_tx(&self) {
193         // Flag that we're a completed `Sender` and try to wake up a receiver.
194         // Whether or not we actually stored any data will get picked up and
195         // translated to either an item or cancellation.
196         //
197         // Note that if we fail to acquire the `rx_task` lock then that means
198         // we're in one of two situations:
199         //
200         // 1. The receiver is trying to block in `poll`
201         // 2. The receiver is being dropped
202         //
203         // In the first case it'll check the `complete` flag after it's done
204         // blocking to see if it succeeded. In the latter case we don't need to
205         // wake up anyone anyway. So in both cases it's ok to ignore the `None`
206         // case of `try_lock` and bail out.
207         //
208         // The first case crucially depends on `Lock` using `SeqCst` ordering
209         // under the hood. If it instead used `Release` / `Acquire` ordering,
210         // then it would not necessarily synchronize with `inner.complete`
211         // and deadlock might be possible, as was observed in
212         // https://github.com/rust-lang/futures-rs/pull/219.
213         self.complete.store(true, SeqCst);
214 
215         if let Some(mut slot) = self.rx_task.try_lock() {
216             if let Some(task) = slot.take() {
217                 drop(slot);
218                 task.wake();
219             }
220         }
221 
222         // If we registered a task for cancel notification drop it to reduce
223         // spurious wakeups
224         if let Some(mut slot) = self.tx_task.try_lock() {
225             drop(slot.take());
226         }
227     }
228 
close_rx(&self)229     fn close_rx(&self) {
230         // Flag our completion and then attempt to wake up the sender if it's
231         // blocked. See comments in `drop` below for more info
232         self.complete.store(true, SeqCst);
233         if let Some(mut handle) = self.tx_task.try_lock() {
234             if let Some(task) = handle.take() {
235                 drop(handle);
236                 task.wake()
237             }
238         }
239     }
240 
try_recv(&self) -> Result<Option<T>, Canceled>241     fn try_recv(&self) -> Result<Option<T>, Canceled> {
242         // If we're complete, either `::close_rx` or `::drop_tx` was called.
243         // We can assume a successful send if data is present.
244         if self.complete.load(SeqCst) {
245             if let Some(mut slot) = self.data.try_lock() {
246                 if let Some(data) = slot.take() {
247                     return Ok(Some(data));
248                 }
249             }
250             Err(Canceled)
251         } else {
252             Ok(None)
253         }
254     }
255 
recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>>256     fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
257         // Check to see if some data has arrived. If it hasn't then we need to
258         // block our task.
259         //
260         // Note that the acquisition of the `rx_task` lock might fail below, but
261         // the only situation where this can happen is during `Sender::drop`
262         // when we are indeed completed already. If that's happening then we
263         // know we're completed so keep going.
264         let done = if self.complete.load(SeqCst) {
265             true
266         } else {
267             let task = cx.waker().clone();
268             match self.rx_task.try_lock() {
269                 Some(mut slot) => {
270                     *slot = Some(task);
271                     false
272                 }
273                 None => true,
274             }
275         };
276 
277         // If we're `done` via one of the paths above, then look at the data and
278         // figure out what the answer is. If, however, we stored `rx_task`
279         // successfully above we need to check again if we're completed in case
280         // a message was sent while `rx_task` was locked and couldn't notify us
281         // otherwise.
282         //
283         // If we're not done, and we're not complete, though, then we've
284         // successfully blocked our task and we return `Pending`.
285         if done || self.complete.load(SeqCst) {
286             // If taking the lock fails, the sender will realise that the we're
287             // `done` when it checks the `complete` flag on the way out, and
288             // will treat the send as a failure.
289             if let Some(mut slot) = self.data.try_lock() {
290                 if let Some(data) = slot.take() {
291                     return Poll::Ready(Ok(data));
292                 }
293             }
294             Poll::Ready(Err(Canceled))
295         } else {
296             Poll::Pending
297         }
298     }
299 
drop_rx(&self)300     fn drop_rx(&self) {
301         // Indicate to the `Sender` that we're done, so any future calls to
302         // `poll_canceled` are weeded out.
303         self.complete.store(true, SeqCst);
304 
305         // If we've blocked a task then there's no need for it to stick around,
306         // so we need to drop it. If this lock acquisition fails, though, then
307         // it's just because our `Sender` is trying to take the task, so we
308         // let them take care of that.
309         if let Some(mut slot) = self.rx_task.try_lock() {
310             let task = slot.take();
311             drop(slot);
312             drop(task);
313         }
314 
315         // Finally, if our `Sender` wants to get notified of us going away, it
316         // would have stored something in `tx_task`. Here we try to peel that
317         // out and unpark it.
318         //
319         // Note that the `try_lock` here may fail, but only if the `Sender` is
320         // in the process of filling in the task. If that happens then we
321         // already flagged `complete` and they'll pick that up above.
322         if let Some(mut handle) = self.tx_task.try_lock() {
323             if let Some(task) = handle.take() {
324                 drop(handle);
325                 task.wake()
326             }
327         }
328     }
329 }
330 
331 impl<T> Sender<T> {
332     /// Completes this oneshot with a successful result.
333     ///
334     /// This function will consume `self` and indicate to the other end, the
335     /// [`Receiver`](Receiver), that the value provided is the result of the
336     /// computation this represents.
337     ///
338     /// If the value is successfully enqueued for the remote end to receive,
339     /// then `Ok(())` is returned. If the receiving end was dropped before
340     /// this function was called, however, then `Err(t)` is returned.
send(self, t: T) -> Result<(), T>341     pub fn send(self, t: T) -> Result<(), T> {
342         self.inner.send(t)
343     }
344 
345     /// Polls this `Sender` half to detect whether its associated
346     /// [`Receiver`](Receiver) has been dropped.
347     ///
348     /// # Return values
349     ///
350     /// If `Ready(())` is returned then the associated `Receiver` has been
351     /// dropped, which means any work required for sending should be canceled.
352     ///
353     /// If `Pending` is returned then the associated `Receiver` is still
354     /// alive and may be able to receive a message if sent. The current task,
355     /// however, is scheduled to receive a notification if the corresponding
356     /// `Receiver` goes away.
poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()>357     pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
358         self.inner.poll_canceled(cx)
359     }
360 
361     /// Creates a future that resolves when this `Sender`'s corresponding
362     /// [`Receiver`](Receiver) half has hung up.
363     ///
364     /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
365     /// to expose a [`Future`](core::future::Future).
cancellation(&mut self) -> Cancellation<'_, T>366     pub fn cancellation(&mut self) -> Cancellation<'_, T> {
367         Cancellation { inner: self }
368     }
369 
370     /// Tests to see whether this `Sender`'s corresponding `Receiver`
371     /// has been dropped.
372     ///
373     /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
374     /// enqueue a task for wakeup upon cancellation, but merely reports the
375     /// current state, which may be subject to concurrent modification.
is_canceled(&self) -> bool376     pub fn is_canceled(&self) -> bool {
377         self.inner.is_canceled()
378     }
379 
380     /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether
381     /// they were created by the same call to `channel`.
is_connected_to(&self, receiver: &Receiver<T>) -> bool382     pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
383         Arc::ptr_eq(&self.inner, &receiver.inner)
384     }
385 }
386 
387 impl<T> Drop for Sender<T> {
drop(&mut self)388     fn drop(&mut self) {
389         self.inner.drop_tx()
390     }
391 }
392 
393 impl<T: fmt::Debug> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result394     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
395         f.debug_struct("Sender").field("complete", &self.inner.complete).finish()
396     }
397 }
398 
399 /// A future that resolves when the receiving end of a channel has hung up.
400 ///
401 /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
402 #[must_use = "futures do nothing unless you `.await` or poll them"]
403 #[derive(Debug)]
404 pub struct Cancellation<'a, T> {
405     inner: &'a mut Sender<T>,
406 }
407 
408 impl<T> Future for Cancellation<'_, T> {
409     type Output = ();
410 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>411     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
412         self.inner.poll_canceled(cx)
413     }
414 }
415 
416 /// Error returned from a [`Receiver`](Receiver) when the corresponding
417 /// [`Sender`](Sender) is dropped.
418 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
419 pub struct Canceled;
420 
421 impl fmt::Display for Canceled {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result422     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423         write!(f, "oneshot canceled")
424     }
425 }
426 
427 #[cfg(feature = "std")]
428 impl std::error::Error for Canceled {}
429 
430 impl<T> Receiver<T> {
431     /// Gracefully close this receiver, preventing any subsequent attempts to
432     /// send to it.
433     ///
434     /// Any `send` operation which happens after this method returns is
435     /// guaranteed to fail. After calling this method, you can use
436     /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
437     /// message had previously been sent.
close(&mut self)438     pub fn close(&mut self) {
439         self.inner.close_rx()
440     }
441 
442     /// Attempts to receive a message outside of the context of a task.
443     ///
444     /// Does not schedule a task wakeup or have any other side effects.
445     ///
446     /// A return value of `None` must be considered immediately stale (out of
447     /// date) unless [`close`](Receiver::close) has been called first.
448     ///
449     /// Returns an error if the sender was dropped.
try_recv(&mut self) -> Result<Option<T>, Canceled>450     pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
451         self.inner.try_recv()
452     }
453 }
454 
455 impl<T> Future for Receiver<T> {
456     type Output = Result<T, Canceled>;
457 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>>458     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
459         self.inner.recv(cx)
460     }
461 }
462 
463 impl<T> FusedFuture for Receiver<T> {
is_terminated(&self) -> bool464     fn is_terminated(&self) -> bool {
465         if self.inner.complete.load(SeqCst) {
466             if let Some(slot) = self.inner.data.try_lock() {
467                 if slot.is_some() {
468                     return false;
469                 }
470             }
471             true
472         } else {
473             false
474         }
475     }
476 }
477 
478 impl<T> Drop for Receiver<T> {
drop(&mut self)479     fn drop(&mut self) {
480         self.inner.drop_rx()
481     }
482 }
483 
484 impl<T: fmt::Debug> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result485     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
486         f.debug_struct("Receiver").field("complete", &self.inner.complete).finish()
487     }
488 }
489