1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2 
3 //! A channel for sending a single message between asynchronous tasks.
4 
5 use crate::loom::cell::UnsafeCell;
6 use crate::loom::sync::atomic::AtomicUsize;
7 use crate::loom::sync::Arc;
8 
9 use std::fmt;
10 use std::future::Future;
11 use std::mem::MaybeUninit;
12 use std::pin::Pin;
13 use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
14 use std::task::Poll::{Pending, Ready};
15 use std::task::{Context, Poll, Waker};
16 
17 /// Sends a value to the associated `Receiver`.
18 ///
19 /// Instances are created by the [`channel`](fn@channel) function.
20 #[derive(Debug)]
21 pub struct Sender<T> {
22     inner: Option<Arc<Inner<T>>>,
23 }
24 
25 /// Receive a value from the associated `Sender`.
26 ///
27 /// Instances are created by the [`channel`](fn@channel) function.
28 #[derive(Debug)]
29 pub struct Receiver<T> {
30     inner: Option<Arc<Inner<T>>>,
31 }
32 
33 pub mod error {
34     //! Oneshot error types
35 
36     use std::fmt;
37 
38     /// Error returned by the `Future` implementation for `Receiver`.
39     #[derive(Debug, Eq, PartialEq)]
40     pub struct RecvError(pub(super) ());
41 
42     /// Error returned by the `try_recv` function on `Receiver`.
43     #[derive(Debug, Eq, PartialEq)]
44     pub enum TryRecvError {
45         /// The send half of the channel has not yet sent a value.
46         Empty,
47 
48         /// The send half of the channel was dropped without sending a value.
49         Closed,
50     }
51 
52     // ===== impl RecvError =====
53 
54     impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result55         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
56             write!(fmt, "channel closed")
57         }
58     }
59 
60     impl std::error::Error for RecvError {}
61 
62     // ===== impl TryRecvError =====
63 
64     impl fmt::Display for TryRecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result65         fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
66             match self {
67                 TryRecvError::Empty => write!(fmt, "channel empty"),
68                 TryRecvError::Closed => write!(fmt, "channel closed"),
69             }
70         }
71     }
72 
73     impl std::error::Error for TryRecvError {}
74 }
75 
76 use self::error::*;
77 
78 struct Inner<T> {
79     /// Manages the state of the inner cell
80     state: AtomicUsize,
81 
82     /// The value. This is set by `Sender` and read by `Receiver`. The state of
83     /// the cell is tracked by `state`.
84     value: UnsafeCell<Option<T>>,
85 
86     /// The task to notify when the receiver drops without consuming the value.
87     tx_task: UnsafeCell<MaybeUninit<Waker>>,
88 
89     /// The task to notify when the value is sent.
90     rx_task: UnsafeCell<MaybeUninit<Waker>>,
91 }
92 
93 #[derive(Clone, Copy)]
94 struct State(usize);
95 
96 /// Create a new one-shot channel for sending single values across asynchronous
97 /// tasks.
98 ///
99 /// The function returns separate "send" and "receive" handles. The `Sender`
100 /// handle is used by the producer to send the value. The `Receiver` handle is
101 /// used by the consumer to receive the value.
102 ///
103 /// Each handle can be used on separate tasks.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// use tokio::sync::oneshot;
109 ///
110 /// #[tokio::main]
111 /// async fn main() {
112 ///     let (tx, rx) = oneshot::channel();
113 ///
114 ///     tokio::spawn(async move {
115 ///         if let Err(_) = tx.send(3) {
116 ///             println!("the receiver dropped");
117 ///         }
118 ///     });
119 ///
120 ///     match rx.await {
121 ///         Ok(v) => println!("got = {:?}", v),
122 ///         Err(_) => println!("the sender dropped"),
123 ///     }
124 /// }
125 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)126 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
127     #[allow(deprecated)]
128     let inner = Arc::new(Inner {
129         state: AtomicUsize::new(State::new().as_usize()),
130         value: UnsafeCell::new(None),
131         tx_task: UnsafeCell::new(MaybeUninit::uninit()),
132         rx_task: UnsafeCell::new(MaybeUninit::uninit()),
133     });
134 
135     let tx = Sender {
136         inner: Some(inner.clone()),
137     };
138     let rx = Receiver { inner: Some(inner) };
139 
140     (tx, rx)
141 }
142 
143 impl<T> Sender<T> {
144     /// Attempts to send a value on this channel, returning it back if it could
145     /// not be sent.
146     ///
147     /// This method consumes `self` as only one value may ever be sent on a oneshot
148     /// channel. It is not marked async because sending a message to an oneshot
149     /// channel never requires any form of waiting.  Because of this, the `send`
150     /// method can be used in both synchronous and asynchronous code without
151     /// problems.
152     ///
153     /// A successful send occurs when it is determined that the other end of the
154     /// channel has not hung up already. An unsuccessful send would be one where
155     /// the corresponding receiver has already been deallocated. Note that a
156     /// return value of `Err` means that the data will never be received, but
157     /// a return value of `Ok` does *not* mean that the data will be received.
158     /// It is possible for the corresponding receiver to hang up immediately
159     /// after this function returns `Ok`.
160     ///
161     /// # Examples
162     ///
163     /// Send a value to another task
164     ///
165     /// ```
166     /// use tokio::sync::oneshot;
167     ///
168     /// #[tokio::main]
169     /// async fn main() {
170     ///     let (tx, rx) = oneshot::channel();
171     ///
172     ///     tokio::spawn(async move {
173     ///         if let Err(_) = tx.send(3) {
174     ///             println!("the receiver dropped");
175     ///         }
176     ///     });
177     ///
178     ///     match rx.await {
179     ///         Ok(v) => println!("got = {:?}", v),
180     ///         Err(_) => println!("the sender dropped"),
181     ///     }
182     /// }
183     /// ```
send(mut self, t: T) -> Result<(), T>184     pub fn send(mut self, t: T) -> Result<(), T> {
185         let inner = self.inner.take().unwrap();
186 
187         inner.value.with_mut(|ptr| unsafe {
188             *ptr = Some(t);
189         });
190 
191         if !inner.complete() {
192             return Err(inner
193                 .value
194                 .with_mut(|ptr| unsafe { (*ptr).take() }.unwrap()));
195         }
196 
197         Ok(())
198     }
199 
200     #[doc(hidden)] // TODO: remove
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>201     pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
202         // Keep track of task budget
203         let coop = ready!(crate::coop::poll_proceed(cx));
204 
205         let inner = self.inner.as_ref().unwrap();
206 
207         let mut state = State::load(&inner.state, Acquire);
208 
209         if state.is_closed() {
210             coop.made_progress();
211             return Poll::Ready(());
212         }
213 
214         if state.is_tx_task_set() {
215             let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
216 
217             if !will_notify {
218                 state = State::unset_tx_task(&inner.state);
219 
220                 if state.is_closed() {
221                     // Set the flag again so that the waker is released in drop
222                     State::set_tx_task(&inner.state);
223                     coop.made_progress();
224                     return Ready(());
225                 } else {
226                     unsafe { inner.drop_tx_task() };
227                 }
228             }
229         }
230 
231         if !state.is_tx_task_set() {
232             // Attempt to set the task
233             unsafe {
234                 inner.set_tx_task(cx);
235             }
236 
237             // Update the state
238             state = State::set_tx_task(&inner.state);
239 
240             if state.is_closed() {
241                 coop.made_progress();
242                 return Ready(());
243             }
244         }
245 
246         Pending
247     }
248 
249     /// Waits for the associated [`Receiver`] handle to close.
250     ///
251     /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
252     /// [`Receiver`] value is dropped.
253     ///
254     /// This function is useful when paired with `select!` to abort a
255     /// computation when the receiver is no longer interested in the result.
256     ///
257     /// # Return
258     ///
259     /// Returns a `Future` which must be awaited on.
260     ///
261     /// [`Receiver`]: Receiver
262     /// [`close`]: Receiver::close
263     ///
264     /// # Examples
265     ///
266     /// Basic usage
267     ///
268     /// ```
269     /// use tokio::sync::oneshot;
270     ///
271     /// #[tokio::main]
272     /// async fn main() {
273     ///     let (mut tx, rx) = oneshot::channel::<()>();
274     ///
275     ///     tokio::spawn(async move {
276     ///         drop(rx);
277     ///     });
278     ///
279     ///     tx.closed().await;
280     ///     println!("the receiver dropped");
281     /// }
282     /// ```
283     ///
284     /// Paired with select
285     ///
286     /// ```
287     /// use tokio::sync::oneshot;
288     /// use tokio::time::{self, Duration};
289     ///
290     /// use futures::{select, FutureExt};
291     ///
292     /// async fn compute() -> String {
293     ///     // Complex computation returning a `String`
294     /// # "hello".to_string()
295     /// }
296     ///
297     /// #[tokio::main]
298     /// async fn main() {
299     ///     let (mut tx, rx) = oneshot::channel();
300     ///
301     ///     tokio::spawn(async move {
302     ///         select! {
303     ///             _ = tx.closed().fuse() => {
304     ///                 // The receiver dropped, no need to do any further work
305     ///             }
306     ///             value = compute().fuse() => {
307     ///                 tx.send(value).unwrap()
308     ///             }
309     ///         }
310     ///     });
311     ///
312     ///     // Wait for up to 10 seconds
313     ///     let _ = time::timeout(Duration::from_secs(10), rx).await;
314     /// }
315     /// ```
closed(&mut self)316     pub async fn closed(&mut self) {
317         use crate::future::poll_fn;
318 
319         poll_fn(|cx| self.poll_closed(cx)).await
320     }
321 
322     /// Returns `true` if the associated [`Receiver`] handle has been dropped.
323     ///
324     /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
325     /// [`Receiver`] value is dropped.
326     ///
327     /// If `true` is returned, a call to `send` will always result in an error.
328     ///
329     /// [`Receiver`]: Receiver
330     /// [`close`]: Receiver::close
331     ///
332     /// # Examples
333     ///
334     /// ```
335     /// use tokio::sync::oneshot;
336     ///
337     /// #[tokio::main]
338     /// async fn main() {
339     ///     let (tx, rx) = oneshot::channel();
340     ///
341     ///     assert!(!tx.is_closed());
342     ///
343     ///     drop(rx);
344     ///
345     ///     assert!(tx.is_closed());
346     ///     assert!(tx.send("never received").is_err());
347     /// }
348     /// ```
is_closed(&self) -> bool349     pub fn is_closed(&self) -> bool {
350         let inner = self.inner.as_ref().unwrap();
351 
352         let state = State::load(&inner.state, Acquire);
353         state.is_closed()
354     }
355 }
356 
357 impl<T> Drop for Sender<T> {
drop(&mut self)358     fn drop(&mut self) {
359         if let Some(inner) = self.inner.as_ref() {
360             inner.complete();
361         }
362     }
363 }
364 
365 impl<T> Receiver<T> {
366     /// Prevents the associated [`Sender`] handle from sending a value.
367     ///
368     /// Any `send` operation which happens after calling `close` is guaranteed
369     /// to fail. After calling `close`, [`try_recv`] should be called to
370     /// receive a value if one was sent **before** the call to `close`
371     /// completed.
372     ///
373     /// This function is useful to perform a graceful shutdown and ensure that a
374     /// value will not be sent into the channel and never received.
375     ///
376     /// [`Sender`]: Sender
377     /// [`try_recv`]: Receiver::try_recv
378     ///
379     /// # Examples
380     ///
381     /// Prevent a value from being sent
382     ///
383     /// ```
384     /// use tokio::sync::oneshot;
385     /// use tokio::sync::oneshot::error::TryRecvError;
386     ///
387     /// #[tokio::main]
388     /// async fn main() {
389     ///     let (tx, mut rx) = oneshot::channel();
390     ///
391     ///     assert!(!tx.is_closed());
392     ///
393     ///     rx.close();
394     ///
395     ///     assert!(tx.is_closed());
396     ///     assert!(tx.send("never received").is_err());
397     ///
398     ///     match rx.try_recv() {
399     ///         Err(TryRecvError::Closed) => {}
400     ///         _ => unreachable!(),
401     ///     }
402     /// }
403     /// ```
404     ///
405     /// Receive a value sent **before** calling `close`
406     ///
407     /// ```
408     /// use tokio::sync::oneshot;
409     ///
410     /// #[tokio::main]
411     /// async fn main() {
412     ///     let (tx, mut rx) = oneshot::channel();
413     ///
414     ///     assert!(tx.send("will receive").is_ok());
415     ///
416     ///     rx.close();
417     ///
418     ///     let msg = rx.try_recv().unwrap();
419     ///     assert_eq!(msg, "will receive");
420     /// }
421     /// ```
close(&mut self)422     pub fn close(&mut self) {
423         let inner = self.inner.as_ref().unwrap();
424         inner.close();
425     }
426 
427     /// Attempts to receive a value.
428     ///
429     /// If a pending value exists in the channel, it is returned. If no value
430     /// has been sent, the current task **will not** be registered for
431     /// future notification.
432     ///
433     /// This function is useful to call from outside the context of an
434     /// asynchronous task.
435     ///
436     /// # Return
437     ///
438     /// - `Ok(T)` if a value is pending in the channel.
439     /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
440     /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
441     ///   a value.
442     ///
443     /// # Examples
444     ///
445     /// `try_recv` before a value is sent, then after.
446     ///
447     /// ```
448     /// use tokio::sync::oneshot;
449     /// use tokio::sync::oneshot::error::TryRecvError;
450     ///
451     /// #[tokio::main]
452     /// async fn main() {
453     ///     let (tx, mut rx) = oneshot::channel();
454     ///
455     ///     match rx.try_recv() {
456     ///         // The channel is currently empty
457     ///         Err(TryRecvError::Empty) => {}
458     ///         _ => unreachable!(),
459     ///     }
460     ///
461     ///     // Send a value
462     ///     tx.send("hello").unwrap();
463     ///
464     ///     match rx.try_recv() {
465     ///         Ok(value) => assert_eq!(value, "hello"),
466     ///         _ => unreachable!(),
467     ///     }
468     /// }
469     /// ```
470     ///
471     /// `try_recv` when the sender dropped before sending a value
472     ///
473     /// ```
474     /// use tokio::sync::oneshot;
475     /// use tokio::sync::oneshot::error::TryRecvError;
476     ///
477     /// #[tokio::main]
478     /// async fn main() {
479     ///     let (tx, mut rx) = oneshot::channel::<()>();
480     ///
481     ///     drop(tx);
482     ///
483     ///     match rx.try_recv() {
484     ///         // The channel will never receive a value.
485     ///         Err(TryRecvError::Closed) => {}
486     ///         _ => unreachable!(),
487     ///     }
488     /// }
489     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>490     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
491         let result = if let Some(inner) = self.inner.as_ref() {
492             let state = State::load(&inner.state, Acquire);
493 
494             if state.is_complete() {
495                 match unsafe { inner.consume_value() } {
496                     Some(value) => Ok(value),
497                     None => Err(TryRecvError::Closed),
498                 }
499             } else if state.is_closed() {
500                 Err(TryRecvError::Closed)
501             } else {
502                 // Not ready, this does not clear `inner`
503                 return Err(TryRecvError::Empty);
504             }
505         } else {
506             panic!("called after complete");
507         };
508 
509         self.inner = None;
510         result
511     }
512 }
513 
514 impl<T> Drop for Receiver<T> {
drop(&mut self)515     fn drop(&mut self) {
516         if let Some(inner) = self.inner.as_ref() {
517             inner.close();
518         }
519     }
520 }
521 
522 impl<T> Future for Receiver<T> {
523     type Output = Result<T, RecvError>;
524 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>525     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
526         // If `inner` is `None`, then `poll()` has already completed.
527         let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
528             ready!(inner.poll_recv(cx))?
529         } else {
530             panic!("called after complete");
531         };
532 
533         self.inner = None;
534         Ready(Ok(ret))
535     }
536 }
537 
538 impl<T> Inner<T> {
complete(&self) -> bool539     fn complete(&self) -> bool {
540         let prev = State::set_complete(&self.state);
541 
542         if prev.is_closed() {
543             return false;
544         }
545 
546         if prev.is_rx_task_set() {
547             // TODO: Consume waker?
548             unsafe {
549                 self.with_rx_task(Waker::wake_by_ref);
550             }
551         }
552 
553         true
554     }
555 
poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>556     fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
557         // Keep track of task budget
558         let coop = ready!(crate::coop::poll_proceed(cx));
559 
560         // Load the state
561         let mut state = State::load(&self.state, Acquire);
562 
563         if state.is_complete() {
564             coop.made_progress();
565             match unsafe { self.consume_value() } {
566                 Some(value) => Ready(Ok(value)),
567                 None => Ready(Err(RecvError(()))),
568             }
569         } else if state.is_closed() {
570             coop.made_progress();
571             Ready(Err(RecvError(())))
572         } else {
573             if state.is_rx_task_set() {
574                 let will_notify = unsafe { self.with_rx_task(|w| w.will_wake(cx.waker())) };
575 
576                 // Check if the task is still the same
577                 if !will_notify {
578                     // Unset the task
579                     state = State::unset_rx_task(&self.state);
580                     if state.is_complete() {
581                         // Set the flag again so that the waker is released in drop
582                         State::set_rx_task(&self.state);
583 
584                         coop.made_progress();
585                         return match unsafe { self.consume_value() } {
586                             Some(value) => Ready(Ok(value)),
587                             None => Ready(Err(RecvError(()))),
588                         };
589                     } else {
590                         unsafe { self.drop_rx_task() };
591                     }
592                 }
593             }
594 
595             if !state.is_rx_task_set() {
596                 // Attempt to set the task
597                 unsafe {
598                     self.set_rx_task(cx);
599                 }
600 
601                 // Update the state
602                 state = State::set_rx_task(&self.state);
603 
604                 if state.is_complete() {
605                     coop.made_progress();
606                     match unsafe { self.consume_value() } {
607                         Some(value) => Ready(Ok(value)),
608                         None => Ready(Err(RecvError(()))),
609                     }
610                 } else {
611                     Pending
612                 }
613             } else {
614                 Pending
615             }
616         }
617     }
618 
619     /// Called by `Receiver` to indicate that the value will never be received.
close(&self)620     fn close(&self) {
621         let prev = State::set_closed(&self.state);
622 
623         if prev.is_tx_task_set() && !prev.is_complete() {
624             unsafe {
625                 self.with_tx_task(Waker::wake_by_ref);
626             }
627         }
628     }
629 
630     /// Consumes the value. This function does not check `state`.
consume_value(&self) -> Option<T>631     unsafe fn consume_value(&self) -> Option<T> {
632         self.value.with_mut(|ptr| (*ptr).take())
633     }
634 
with_rx_task<F, R>(&self, f: F) -> R where F: FnOnce(&Waker) -> R,635     unsafe fn with_rx_task<F, R>(&self, f: F) -> R
636     where
637         F: FnOnce(&Waker) -> R,
638     {
639         self.rx_task.with(|ptr| {
640             let waker: *const Waker = (&*ptr).as_ptr();
641             f(&*waker)
642         })
643     }
644 
with_tx_task<F, R>(&self, f: F) -> R where F: FnOnce(&Waker) -> R,645     unsafe fn with_tx_task<F, R>(&self, f: F) -> R
646     where
647         F: FnOnce(&Waker) -> R,
648     {
649         self.tx_task.with(|ptr| {
650             let waker: *const Waker = (&*ptr).as_ptr();
651             f(&*waker)
652         })
653     }
654 
drop_rx_task(&self)655     unsafe fn drop_rx_task(&self) {
656         self.rx_task.with_mut(|ptr| {
657             let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
658             ptr.drop_in_place();
659         });
660     }
661 
drop_tx_task(&self)662     unsafe fn drop_tx_task(&self) {
663         self.tx_task.with_mut(|ptr| {
664             let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
665             ptr.drop_in_place();
666         });
667     }
668 
set_rx_task(&self, cx: &mut Context<'_>)669     unsafe fn set_rx_task(&self, cx: &mut Context<'_>) {
670         self.rx_task.with_mut(|ptr| {
671             let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
672             ptr.write(cx.waker().clone());
673         });
674     }
675 
set_tx_task(&self, cx: &mut Context<'_>)676     unsafe fn set_tx_task(&self, cx: &mut Context<'_>) {
677         self.tx_task.with_mut(|ptr| {
678             let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
679             ptr.write(cx.waker().clone());
680         });
681     }
682 }
683 
684 unsafe impl<T: Send> Send for Inner<T> {}
685 unsafe impl<T: Send> Sync for Inner<T> {}
686 
687 impl<T> Drop for Inner<T> {
drop(&mut self)688     fn drop(&mut self) {
689         let state = State(self.state.with_mut(|v| *v));
690 
691         if state.is_rx_task_set() {
692             unsafe {
693                 self.drop_rx_task();
694             }
695         }
696 
697         if state.is_tx_task_set() {
698             unsafe {
699                 self.drop_tx_task();
700             }
701         }
702     }
703 }
704 
705 impl<T: fmt::Debug> fmt::Debug for Inner<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result706     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
707         use std::sync::atomic::Ordering::Relaxed;
708 
709         fmt.debug_struct("Inner")
710             .field("state", &State::load(&self.state, Relaxed))
711             .finish()
712     }
713 }
714 
715 const RX_TASK_SET: usize = 0b00001;
716 const VALUE_SENT: usize = 0b00010;
717 const CLOSED: usize = 0b00100;
718 const TX_TASK_SET: usize = 0b01000;
719 
720 impl State {
new() -> State721     fn new() -> State {
722         State(0)
723     }
724 
is_complete(self) -> bool725     fn is_complete(self) -> bool {
726         self.0 & VALUE_SENT == VALUE_SENT
727     }
728 
set_complete(cell: &AtomicUsize) -> State729     fn set_complete(cell: &AtomicUsize) -> State {
730         // TODO: This could be `Release`, followed by an `Acquire` fence *if*
731         // the `RX_TASK_SET` flag is set. However, `loom` does not support
732         // fences yet.
733         let val = cell.fetch_or(VALUE_SENT, AcqRel);
734         State(val)
735     }
736 
is_rx_task_set(self) -> bool737     fn is_rx_task_set(self) -> bool {
738         self.0 & RX_TASK_SET == RX_TASK_SET
739     }
740 
set_rx_task(cell: &AtomicUsize) -> State741     fn set_rx_task(cell: &AtomicUsize) -> State {
742         let val = cell.fetch_or(RX_TASK_SET, AcqRel);
743         State(val | RX_TASK_SET)
744     }
745 
unset_rx_task(cell: &AtomicUsize) -> State746     fn unset_rx_task(cell: &AtomicUsize) -> State {
747         let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
748         State(val & !RX_TASK_SET)
749     }
750 
is_closed(self) -> bool751     fn is_closed(self) -> bool {
752         self.0 & CLOSED == CLOSED
753     }
754 
set_closed(cell: &AtomicUsize) -> State755     fn set_closed(cell: &AtomicUsize) -> State {
756         // Acquire because we want all later writes (attempting to poll) to be
757         // ordered after this.
758         let val = cell.fetch_or(CLOSED, Acquire);
759         State(val)
760     }
761 
set_tx_task(cell: &AtomicUsize) -> State762     fn set_tx_task(cell: &AtomicUsize) -> State {
763         let val = cell.fetch_or(TX_TASK_SET, AcqRel);
764         State(val | TX_TASK_SET)
765     }
766 
unset_tx_task(cell: &AtomicUsize) -> State767     fn unset_tx_task(cell: &AtomicUsize) -> State {
768         let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
769         State(val & !TX_TASK_SET)
770     }
771 
is_tx_task_set(self) -> bool772     fn is_tx_task_set(self) -> bool {
773         self.0 & TX_TASK_SET == TX_TASK_SET
774     }
775 
as_usize(self) -> usize776     fn as_usize(self) -> usize {
777         self.0
778     }
779 
load(cell: &AtomicUsize, order: Ordering) -> State780     fn load(cell: &AtomicUsize, order: Ordering) -> State {
781         let val = cell.load(order);
782         State(val)
783     }
784 }
785 
786 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result787     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
788         fmt.debug_struct("State")
789             .field("is_complete", &self.is_complete())
790             .field("is_closed", &self.is_closed())
791             .field("is_rx_task_set", &self.is_rx_task_set())
792             .field("is_tx_task_set", &self.is_tx_task_set())
793             .finish()
794     }
795 }
796