1 //! A channel for sending a single message between asynchronous tasks.
2
3 use alloc::sync::Arc;
4 use core::fmt;
5 use core::pin::Pin;
6 use core::sync::atomic::AtomicBool;
7 use core::sync::atomic::Ordering::SeqCst;
8 use futures_core::future::Future;
9 use futures_core::task::{Context, Poll, Waker};
10
11 use crate::lock::Lock;
12
13 /// A future for a value that will be provided by another asynchronous task.
14 ///
15 /// This is created by the [`channel`] function.
16 #[must_use = "futures do nothing unless you `.await` or poll them"]
17 #[derive(Debug)]
18 pub struct Receiver<T> {
19 inner: Arc<Inner<T>>,
20 }
21
22 /// A means of transmitting a single value to another task.
23 ///
24 /// This is created by the [`channel`] function.
25 #[derive(Debug)]
26 pub struct Sender<T> {
27 inner: Arc<Inner<T>>,
28 }
29
30 // The channels do not ever project Pin to the inner T
31 impl<T> Unpin for Receiver<T> {}
32 impl<T> Unpin for Sender<T> {}
33
34 /// Internal state of the `Receiver`/`Sender` pair above. This is all used as
35 /// the internal synchronization between the two for send/recv operations.
36 #[derive(Debug)]
37 struct Inner<T> {
38 /// Indicates whether this oneshot is complete yet. This is filled in both
39 /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
40 /// appropriately.
41 ///
42 /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
43 /// unlocked and ready to be inspected.
44 ///
45 /// For `Sender` if this is `true` then the oneshot has gone away and it
46 /// can return ready from `poll_canceled`.
47 complete: AtomicBool,
48
49 /// The actual data being transferred as part of this `Receiver`. This is
50 /// filled in by `Sender::complete` and read by `Receiver::poll`.
51 ///
52 /// Note that this is protected by `Lock`, but it is in theory safe to
53 /// replace with an `UnsafeCell` as it's actually protected by `complete`
54 /// above. I wouldn't recommend doing this, however, unless someone is
55 /// supremely confident in the various atomic orderings here and there.
56 data: Lock<Option<T>>,
57
58 /// Field to store the task which is blocked in `Receiver::poll`.
59 ///
60 /// This is filled in when a oneshot is polled but not ready yet. Note that
61 /// the `Lock` here, unlike in `data` above, is important to resolve races.
62 /// Both the `Receiver` and the `Sender` halves understand that if they
63 /// can't acquire the lock then some important interference is happening.
64 rx_task: Lock<Option<Waker>>,
65
66 /// Like `rx_task` above, except for the task blocked in
67 /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
68 tx_task: Lock<Option<Waker>>,
69 }
70
71 /// Creates a new one-shot channel for sending values across asynchronous tasks.
72 ///
73 /// This function is similar to Rust's channel constructor found in the standard
74 /// library. Two halves are returned, the first of which is a `Sender` handle,
75 /// used to signal the end of a computation and provide its value. The second
76 /// half is a `Receiver` which implements the `Future` trait, resolving to the
77 /// value that was given to the `Sender` handle.
78 ///
79 /// Each half can be separately owned and sent across tasks.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use futures::channel::oneshot;
85 /// use std::{thread, time::Duration};
86 ///
87 /// let (sender, receiver) = oneshot::channel::<i32>();
88 ///
89 /// thread::spawn(|| {
90 /// println!("THREAD: sleeping zzz...");
91 /// thread::sleep(Duration::from_millis(1000));
92 /// println!("THREAD: i'm awake! sending.");
93 /// sender.send(3).unwrap();
94 /// });
95 ///
96 /// println!("MAIN: doing some useful stuff");
97 ///
98 /// futures::executor::block_on(async {
99 /// println!("MAIN: waiting for msg...");
100 /// println!("MAIN: got: {:?}", receiver.await)
101 /// });
102 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)103 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
104 let inner = Arc::new(Inner::new());
105 let receiver = Receiver {
106 inner: inner.clone(),
107 };
108 let sender = Sender {
109 inner,
110 };
111 (sender, receiver)
112 }
113
114 impl<T> Inner<T> {
new() -> Inner<T>115 fn new() -> Inner<T> {
116 Inner {
117 complete: AtomicBool::new(false),
118 data: Lock::new(None),
119 rx_task: Lock::new(None),
120 tx_task: Lock::new(None),
121 }
122 }
123
send(&self, t: T) -> Result<(), T>124 fn send(&self, t: T) -> Result<(), T> {
125 if self.complete.load(SeqCst) {
126 return Err(t)
127 }
128
129 // Note that this lock acquisition may fail if the receiver
130 // is closed and sets the `complete` flag to `true`, whereupon
131 // the receiver may call `poll()`.
132 if let Some(mut slot) = self.data.try_lock() {
133 assert!(slot.is_none());
134 *slot = Some(t);
135 drop(slot);
136
137 // If the receiver called `close()` between the check at the
138 // start of the function, and the lock being released, then
139 // the receiver may not be around to receive it, so try to
140 // pull it back out.
141 if self.complete.load(SeqCst) {
142 // If lock acquisition fails, then receiver is actually
143 // receiving it, so we're good.
144 if let Some(mut slot) = self.data.try_lock() {
145 if let Some(t) = slot.take() {
146 return Err(t);
147 }
148 }
149 }
150 Ok(())
151 } else {
152 // Must have been closed
153 Err(t)
154 }
155 }
156
poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()>157 fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
158 // Fast path up first, just read the flag and see if our other half is
159 // gone. This flag is set both in our destructor and the oneshot
160 // destructor, but our destructor hasn't run yet so if it's set then the
161 // oneshot is gone.
162 if self.complete.load(SeqCst) {
163 return Poll::Ready(())
164 }
165
166 // If our other half is not gone then we need to park our current task
167 // and move it into the `tx_task` slot to get notified when it's
168 // actually gone.
169 //
170 // If `try_lock` fails, then the `Receiver` is in the process of using
171 // it, so we can deduce that it's now in the process of going away and
172 // hence we're canceled. If it succeeds then we just store our handle.
173 //
174 // Crucially we then check `complete` *again* before we return.
175 // While we were storing our handle inside `tx_task` the
176 // `Receiver` may have been dropped. The first thing it does is set the
177 // flag, and if it fails to acquire the lock it assumes that we'll see
178 // the flag later on. So... we then try to see the flag later on!
179 let handle = cx.waker().clone();
180 match self.tx_task.try_lock() {
181 Some(mut p) => *p = Some(handle),
182 None => return Poll::Ready(()),
183 }
184 if self.complete.load(SeqCst) {
185 Poll::Ready(())
186 } else {
187 Poll::Pending
188 }
189 }
190
is_canceled(&self) -> bool191 fn is_canceled(&self) -> bool {
192 self.complete.load(SeqCst)
193 }
194
drop_tx(&self)195 fn drop_tx(&self) {
196 // Flag that we're a completed `Sender` and try to wake up a receiver.
197 // Whether or not we actually stored any data will get picked up and
198 // translated to either an item or cancellation.
199 //
200 // Note that if we fail to acquire the `rx_task` lock then that means
201 // we're in one of two situations:
202 //
203 // 1. The receiver is trying to block in `poll`
204 // 2. The receiver is being dropped
205 //
206 // In the first case it'll check the `complete` flag after it's done
207 // blocking to see if it succeeded. In the latter case we don't need to
208 // wake up anyone anyway. So in both cases it's ok to ignore the `None`
209 // case of `try_lock` and bail out.
210 //
211 // The first case crucially depends on `Lock` using `SeqCst` ordering
212 // under the hood. If it instead used `Release` / `Acquire` ordering,
213 // then it would not necessarily synchronize with `inner.complete`
214 // and deadlock might be possible, as was observed in
215 // https://github.com/rust-lang/futures-rs/pull/219.
216 self.complete.store(true, SeqCst);
217
218 if let Some(mut slot) = self.rx_task.try_lock() {
219 if let Some(task) = slot.take() {
220 drop(slot);
221 task.wake();
222 }
223 }
224
225 // If we registered a task for cancel notification drop it to reduce
226 // spurious wakeups
227 if let Some(mut slot) = self.tx_task.try_lock() {
228 drop(slot.take());
229 }
230 }
231
close_rx(&self)232 fn close_rx(&self) {
233 // Flag our completion and then attempt to wake up the sender if it's
234 // blocked. See comments in `drop` below for more info
235 self.complete.store(true, SeqCst);
236 if let Some(mut handle) = self.tx_task.try_lock() {
237 if let Some(task) = handle.take() {
238 drop(handle);
239 task.wake()
240 }
241 }
242 }
243
try_recv(&self) -> Result<Option<T>, Canceled>244 fn try_recv(&self) -> Result<Option<T>, Canceled> {
245 // If we're complete, either `::close_rx` or `::drop_tx` was called.
246 // We can assume a successful send if data is present.
247 if self.complete.load(SeqCst) {
248 if let Some(mut slot) = self.data.try_lock() {
249 if let Some(data) = slot.take() {
250 return Ok(Some(data));
251 }
252 }
253 Err(Canceled)
254 } else {
255 Ok(None)
256 }
257 }
258
recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>>259 fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
260 // Check to see if some data has arrived. If it hasn't then we need to
261 // block our task.
262 //
263 // Note that the acquisition of the `rx_task` lock might fail below, but
264 // the only situation where this can happen is during `Sender::drop`
265 // when we are indeed completed already. If that's happening then we
266 // know we're completed so keep going.
267 let done = if self.complete.load(SeqCst) {
268 true
269 } else {
270 let task = cx.waker().clone();
271 match self.rx_task.try_lock() {
272 Some(mut slot) => { *slot = Some(task); false },
273 None => true,
274 }
275 };
276
277 // If we're `done` via one of the paths above, then look at the data and
278 // figure out what the answer is. If, however, we stored `rx_task`
279 // successfully above we need to check again if we're completed in case
280 // a message was sent while `rx_task` was locked and couldn't notify us
281 // otherwise.
282 //
283 // If we're not done, and we're not complete, though, then we've
284 // successfully blocked our task and we return `Pending`.
285 if done || self.complete.load(SeqCst) {
286 // If taking the lock fails, the sender will realise that the we're
287 // `done` when it checks the `complete` flag on the way out, and
288 // will treat the send as a failure.
289 if let Some(mut slot) = self.data.try_lock() {
290 if let Some(data) = slot.take() {
291 return Poll::Ready(Ok(data));
292 }
293 }
294 Poll::Ready(Err(Canceled))
295 } else {
296 Poll::Pending
297 }
298 }
299
drop_rx(&self)300 fn drop_rx(&self) {
301 // Indicate to the `Sender` that we're done, so any future calls to
302 // `poll_canceled` are weeded out.
303 self.complete.store(true, SeqCst);
304
305 // If we've blocked a task then there's no need for it to stick around,
306 // so we need to drop it. If this lock acquisition fails, though, then
307 // it's just because our `Sender` is trying to take the task, so we
308 // let them take care of that.
309 if let Some(mut slot) = self.rx_task.try_lock() {
310 let task = slot.take();
311 drop(slot);
312 drop(task);
313 }
314
315 // Finally, if our `Sender` wants to get notified of us going away, it
316 // would have stored something in `tx_task`. Here we try to peel that
317 // out and unpark it.
318 //
319 // Note that the `try_lock` here may fail, but only if the `Sender` is
320 // in the process of filling in the task. If that happens then we
321 // already flagged `complete` and they'll pick that up above.
322 if let Some(mut handle) = self.tx_task.try_lock() {
323 if let Some(task) = handle.take() {
324 drop(handle);
325 task.wake()
326 }
327 }
328 }
329 }
330
331 impl<T> Sender<T> {
332 /// Completes this oneshot with a successful result.
333 ///
334 /// This function will consume `self` and indicate to the other end, the
335 /// [`Receiver`](Receiver), that the value provided is the result of the
336 /// computation this represents.
337 ///
338 /// If the value is successfully enqueued for the remote end to receive,
339 /// then `Ok(())` is returned. If the receiving end was dropped before
340 /// this function was called, however, then `Err` is returned with the value
341 /// provided.
send(self, t: T) -> Result<(), T>342 pub fn send(self, t: T) -> Result<(), T> {
343 self.inner.send(t)
344 }
345
346 /// Polls this `Sender` half to detect whether its associated
347 /// [`Receiver`](Receiver) with has been dropped.
348 ///
349 /// # Return values
350 ///
351 /// If `Ready(())` is returned then the associated `Receiver` has been
352 /// dropped, which means any work required for sending should be canceled.
353 ///
354 /// If `Pending` is returned then the associated `Receiver` is still
355 /// alive and may be able to receive a message if sent. The current task,
356 /// however, is scheduled to receive a notification if the corresponding
357 /// `Receiver` goes away.
poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()>358 pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
359 self.inner.poll_canceled(cx)
360 }
361
362 /// Creates a future that resolves when this `Sender`'s corresponding
363 /// [`Receiver`](Receiver) half has hung up.
364 ///
365 /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
366 /// to expose a [`Future`](core::future::Future).
cancellation(&mut self) -> Cancellation<'_, T>367 pub fn cancellation(&mut self) -> Cancellation<'_, T> {
368 Cancellation { inner: self }
369 }
370
371 /// Tests to see whether this `Sender`'s corresponding `Receiver`
372 /// has been dropped.
373 ///
374 /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
375 /// enqueue a task for wakeup upon cancellation, but merely reports the
376 /// current state, which may be subject to concurrent modification.
is_canceled(&self) -> bool377 pub fn is_canceled(&self) -> bool {
378 self.inner.is_canceled()
379 }
380 }
381
382 impl<T> Drop for Sender<T> {
drop(&mut self)383 fn drop(&mut self) {
384 self.inner.drop_tx()
385 }
386 }
387
388 /// A future that resolves when the receiving end of a channel has hung up.
389 ///
390 /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
391 #[must_use = "futures do nothing unless you `.await` or poll them"]
392 #[derive(Debug)]
393 pub struct Cancellation<'a, T> {
394 inner: &'a mut Sender<T>,
395 }
396
397 impl<T> Future for Cancellation<'_, T> {
398 type Output = ();
399
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>400 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
401 self.inner.poll_canceled(cx)
402 }
403 }
404
405 /// Error returned from a [`Receiver`](Receiver) when the corresponding
406 /// [`Sender`](Sender) is dropped.
407 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
408 pub struct Canceled;
409
410 impl fmt::Display for Canceled {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result411 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
412 write!(f, "oneshot canceled")
413 }
414 }
415
416 #[cfg(feature = "std")]
417 impl std::error::Error for Canceled {}
418
419 impl<T> Receiver<T> {
420 /// Gracefully close this receiver, preventing any subsequent attempts to
421 /// send to it.
422 ///
423 /// Any `send` operation which happens after this method returns is
424 /// guaranteed to fail. After calling this method, you can use
425 /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
426 /// message had previously been sent.
close(&mut self)427 pub fn close(&mut self) {
428 self.inner.close_rx()
429 }
430
431 /// Attempts to receive a message outside of the context of a task.
432 ///
433 /// Does not schedule a task wakeup or have any other side effects.
434 ///
435 /// A return value of `None` must be considered immediately stale (out of
436 /// date) unless [`close`](Receiver::close) has been called first.
437 ///
438 /// Returns an error if the sender was dropped.
try_recv(&mut self) -> Result<Option<T>, Canceled>439 pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
440 self.inner.try_recv()
441 }
442 }
443
444 impl<T> Future for Receiver<T> {
445 type Output = Result<T, Canceled>;
446
poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<T, Canceled>>447 fn poll(
448 self: Pin<&mut Self>,
449 cx: &mut Context<'_>,
450 ) -> Poll<Result<T, Canceled>> {
451 self.inner.recv(cx)
452 }
453 }
454
455 impl<T> Drop for Receiver<T> {
drop(&mut self)456 fn drop(&mut self) {
457 self.inner.drop_rx()
458 }
459 }
460