1 #![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3 //! A channel for sending a single message between asynchronous tasks.
4
5 use crate::loom::cell::UnsafeCell;
6 use crate::loom::sync::atomic::AtomicUsize;
7 use crate::loom::sync::Arc;
8
9 use std::fmt;
10 use std::future::Future;
11 use std::mem::MaybeUninit;
12 use std::pin::Pin;
13 use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
14 use std::task::Poll::{Pending, Ready};
15 use std::task::{Context, Poll, Waker};
16
17 /// Sends a value to the associated `Receiver`.
18 ///
19 /// Instances are created by the [`channel`](fn@channel) function.
20 #[derive(Debug)]
21 pub struct Sender<T> {
22 inner: Option<Arc<Inner<T>>>,
23 }
24
25 /// Receive a value from the associated `Sender`.
26 ///
27 /// Instances are created by the [`channel`](fn@channel) function.
28 #[derive(Debug)]
29 pub struct Receiver<T> {
30 inner: Option<Arc<Inner<T>>>,
31 }
32
33 pub mod error {
34 //! Oneshot error types
35
36 use std::fmt;
37
38 /// Error returned by the `Future` implementation for `Receiver`.
39 #[derive(Debug, Eq, PartialEq)]
40 pub struct RecvError(pub(super) ());
41
42 /// Error returned by the `try_recv` function on `Receiver`.
43 #[derive(Debug, Eq, PartialEq)]
44 pub enum TryRecvError {
45 /// The send half of the channel has not yet sent a value.
46 Empty,
47
48 /// The send half of the channel was dropped without sending a value.
49 Closed,
50 }
51
52 // ===== impl RecvError =====
53
54 impl fmt::Display for RecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result55 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
56 write!(fmt, "channel closed")
57 }
58 }
59
60 impl std::error::Error for RecvError {}
61
62 // ===== impl TryRecvError =====
63
64 impl fmt::Display for TryRecvError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result65 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
66 match self {
67 TryRecvError::Empty => write!(fmt, "channel empty"),
68 TryRecvError::Closed => write!(fmt, "channel closed"),
69 }
70 }
71 }
72
73 impl std::error::Error for TryRecvError {}
74 }
75
76 use self::error::*;
77
78 struct Inner<T> {
79 /// Manages the state of the inner cell
80 state: AtomicUsize,
81
82 /// The value. This is set by `Sender` and read by `Receiver`. The state of
83 /// the cell is tracked by `state`.
84 value: UnsafeCell<Option<T>>,
85
86 /// The task to notify when the receiver drops without consuming the value.
87 tx_task: UnsafeCell<MaybeUninit<Waker>>,
88
89 /// The task to notify when the value is sent.
90 rx_task: UnsafeCell<MaybeUninit<Waker>>,
91 }
92
93 #[derive(Clone, Copy)]
94 struct State(usize);
95
96 /// Create a new one-shot channel for sending single values across asynchronous
97 /// tasks.
98 ///
99 /// The function returns separate "send" and "receive" handles. The `Sender`
100 /// handle is used by the producer to send the value. The `Receiver` handle is
101 /// used by the consumer to receive the value.
102 ///
103 /// Each handle can be used on separate tasks.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// use tokio::sync::oneshot;
109 ///
110 /// #[tokio::main]
111 /// async fn main() {
112 /// let (tx, rx) = oneshot::channel();
113 ///
114 /// tokio::spawn(async move {
115 /// if let Err(_) = tx.send(3) {
116 /// println!("the receiver dropped");
117 /// }
118 /// });
119 ///
120 /// match rx.await {
121 /// Ok(v) => println!("got = {:?}", v),
122 /// Err(_) => println!("the sender dropped"),
123 /// }
124 /// }
125 /// ```
channel<T>() -> (Sender<T>, Receiver<T>)126 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
127 #[allow(deprecated)]
128 let inner = Arc::new(Inner {
129 state: AtomicUsize::new(State::new().as_usize()),
130 value: UnsafeCell::new(None),
131 tx_task: UnsafeCell::new(MaybeUninit::uninit()),
132 rx_task: UnsafeCell::new(MaybeUninit::uninit()),
133 });
134
135 let tx = Sender {
136 inner: Some(inner.clone()),
137 };
138 let rx = Receiver { inner: Some(inner) };
139
140 (tx, rx)
141 }
142
143 impl<T> Sender<T> {
144 /// Attempts to send a value on this channel, returning it back if it could
145 /// not be sent.
146 ///
147 /// This method consumes `self` as only one value may ever be sent on a oneshot
148 /// channel. It is not marked async because sending a message to an oneshot
149 /// channel never requires any form of waiting. Because of this, the `send`
150 /// method can be used in both synchronous and asynchronous code without
151 /// problems.
152 ///
153 /// A successful send occurs when it is determined that the other end of the
154 /// channel has not hung up already. An unsuccessful send would be one where
155 /// the corresponding receiver has already been deallocated. Note that a
156 /// return value of `Err` means that the data will never be received, but
157 /// a return value of `Ok` does *not* mean that the data will be received.
158 /// It is possible for the corresponding receiver to hang up immediately
159 /// after this function returns `Ok`.
160 ///
161 /// # Examples
162 ///
163 /// Send a value to another task
164 ///
165 /// ```
166 /// use tokio::sync::oneshot;
167 ///
168 /// #[tokio::main]
169 /// async fn main() {
170 /// let (tx, rx) = oneshot::channel();
171 ///
172 /// tokio::spawn(async move {
173 /// if let Err(_) = tx.send(3) {
174 /// println!("the receiver dropped");
175 /// }
176 /// });
177 ///
178 /// match rx.await {
179 /// Ok(v) => println!("got = {:?}", v),
180 /// Err(_) => println!("the sender dropped"),
181 /// }
182 /// }
183 /// ```
send(mut self, t: T) -> Result<(), T>184 pub fn send(mut self, t: T) -> Result<(), T> {
185 let inner = self.inner.take().unwrap();
186
187 inner.value.with_mut(|ptr| unsafe {
188 *ptr = Some(t);
189 });
190
191 if !inner.complete() {
192 return Err(inner
193 .value
194 .with_mut(|ptr| unsafe { (*ptr).take() }.unwrap()));
195 }
196
197 Ok(())
198 }
199
200 #[doc(hidden)] // TODO: remove
poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>201 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
202 // Keep track of task budget
203 let coop = ready!(crate::coop::poll_proceed(cx));
204
205 let inner = self.inner.as_ref().unwrap();
206
207 let mut state = State::load(&inner.state, Acquire);
208
209 if state.is_closed() {
210 coop.made_progress();
211 return Poll::Ready(());
212 }
213
214 if state.is_tx_task_set() {
215 let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
216
217 if !will_notify {
218 state = State::unset_tx_task(&inner.state);
219
220 if state.is_closed() {
221 // Set the flag again so that the waker is released in drop
222 State::set_tx_task(&inner.state);
223 coop.made_progress();
224 return Ready(());
225 } else {
226 unsafe { inner.drop_tx_task() };
227 }
228 }
229 }
230
231 if !state.is_tx_task_set() {
232 // Attempt to set the task
233 unsafe {
234 inner.set_tx_task(cx);
235 }
236
237 // Update the state
238 state = State::set_tx_task(&inner.state);
239
240 if state.is_closed() {
241 coop.made_progress();
242 return Ready(());
243 }
244 }
245
246 Pending
247 }
248
249 /// Waits for the associated [`Receiver`] handle to close.
250 ///
251 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
252 /// [`Receiver`] value is dropped.
253 ///
254 /// This function is useful when paired with `select!` to abort a
255 /// computation when the receiver is no longer interested in the result.
256 ///
257 /// # Return
258 ///
259 /// Returns a `Future` which must be awaited on.
260 ///
261 /// [`Receiver`]: Receiver
262 /// [`close`]: Receiver::close
263 ///
264 /// # Examples
265 ///
266 /// Basic usage
267 ///
268 /// ```
269 /// use tokio::sync::oneshot;
270 ///
271 /// #[tokio::main]
272 /// async fn main() {
273 /// let (mut tx, rx) = oneshot::channel::<()>();
274 ///
275 /// tokio::spawn(async move {
276 /// drop(rx);
277 /// });
278 ///
279 /// tx.closed().await;
280 /// println!("the receiver dropped");
281 /// }
282 /// ```
283 ///
284 /// Paired with select
285 ///
286 /// ```
287 /// use tokio::sync::oneshot;
288 /// use tokio::time::{self, Duration};
289 ///
290 /// use futures::{select, FutureExt};
291 ///
292 /// async fn compute() -> String {
293 /// // Complex computation returning a `String`
294 /// # "hello".to_string()
295 /// }
296 ///
297 /// #[tokio::main]
298 /// async fn main() {
299 /// let (mut tx, rx) = oneshot::channel();
300 ///
301 /// tokio::spawn(async move {
302 /// select! {
303 /// _ = tx.closed().fuse() => {
304 /// // The receiver dropped, no need to do any further work
305 /// }
306 /// value = compute().fuse() => {
307 /// tx.send(value).unwrap()
308 /// }
309 /// }
310 /// });
311 ///
312 /// // Wait for up to 10 seconds
313 /// let _ = time::timeout(Duration::from_secs(10), rx).await;
314 /// }
315 /// ```
closed(&mut self)316 pub async fn closed(&mut self) {
317 use crate::future::poll_fn;
318
319 poll_fn(|cx| self.poll_closed(cx)).await
320 }
321
322 /// Returns `true` if the associated [`Receiver`] handle has been dropped.
323 ///
324 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
325 /// [`Receiver`] value is dropped.
326 ///
327 /// If `true` is returned, a call to `send` will always result in an error.
328 ///
329 /// [`Receiver`]: Receiver
330 /// [`close`]: Receiver::close
331 ///
332 /// # Examples
333 ///
334 /// ```
335 /// use tokio::sync::oneshot;
336 ///
337 /// #[tokio::main]
338 /// async fn main() {
339 /// let (tx, rx) = oneshot::channel();
340 ///
341 /// assert!(!tx.is_closed());
342 ///
343 /// drop(rx);
344 ///
345 /// assert!(tx.is_closed());
346 /// assert!(tx.send("never received").is_err());
347 /// }
348 /// ```
is_closed(&self) -> bool349 pub fn is_closed(&self) -> bool {
350 let inner = self.inner.as_ref().unwrap();
351
352 let state = State::load(&inner.state, Acquire);
353 state.is_closed()
354 }
355 }
356
357 impl<T> Drop for Sender<T> {
drop(&mut self)358 fn drop(&mut self) {
359 if let Some(inner) = self.inner.as_ref() {
360 inner.complete();
361 }
362 }
363 }
364
365 impl<T> Receiver<T> {
366 /// Prevents the associated [`Sender`] handle from sending a value.
367 ///
368 /// Any `send` operation which happens after calling `close` is guaranteed
369 /// to fail. After calling `close`, [`try_recv`] should be called to
370 /// receive a value if one was sent **before** the call to `close`
371 /// completed.
372 ///
373 /// This function is useful to perform a graceful shutdown and ensure that a
374 /// value will not be sent into the channel and never received.
375 ///
376 /// [`Sender`]: Sender
377 /// [`try_recv`]: Receiver::try_recv
378 ///
379 /// # Examples
380 ///
381 /// Prevent a value from being sent
382 ///
383 /// ```
384 /// use tokio::sync::oneshot;
385 /// use tokio::sync::oneshot::error::TryRecvError;
386 ///
387 /// #[tokio::main]
388 /// async fn main() {
389 /// let (tx, mut rx) = oneshot::channel();
390 ///
391 /// assert!(!tx.is_closed());
392 ///
393 /// rx.close();
394 ///
395 /// assert!(tx.is_closed());
396 /// assert!(tx.send("never received").is_err());
397 ///
398 /// match rx.try_recv() {
399 /// Err(TryRecvError::Closed) => {}
400 /// _ => unreachable!(),
401 /// }
402 /// }
403 /// ```
404 ///
405 /// Receive a value sent **before** calling `close`
406 ///
407 /// ```
408 /// use tokio::sync::oneshot;
409 ///
410 /// #[tokio::main]
411 /// async fn main() {
412 /// let (tx, mut rx) = oneshot::channel();
413 ///
414 /// assert!(tx.send("will receive").is_ok());
415 ///
416 /// rx.close();
417 ///
418 /// let msg = rx.try_recv().unwrap();
419 /// assert_eq!(msg, "will receive");
420 /// }
421 /// ```
close(&mut self)422 pub fn close(&mut self) {
423 let inner = self.inner.as_ref().unwrap();
424 inner.close();
425 }
426
427 /// Attempts to receive a value.
428 ///
429 /// If a pending value exists in the channel, it is returned. If no value
430 /// has been sent, the current task **will not** be registered for
431 /// future notification.
432 ///
433 /// This function is useful to call from outside the context of an
434 /// asynchronous task.
435 ///
436 /// # Return
437 ///
438 /// - `Ok(T)` if a value is pending in the channel.
439 /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
440 /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
441 /// a value.
442 ///
443 /// # Examples
444 ///
445 /// `try_recv` before a value is sent, then after.
446 ///
447 /// ```
448 /// use tokio::sync::oneshot;
449 /// use tokio::sync::oneshot::error::TryRecvError;
450 ///
451 /// #[tokio::main]
452 /// async fn main() {
453 /// let (tx, mut rx) = oneshot::channel();
454 ///
455 /// match rx.try_recv() {
456 /// // The channel is currently empty
457 /// Err(TryRecvError::Empty) => {}
458 /// _ => unreachable!(),
459 /// }
460 ///
461 /// // Send a value
462 /// tx.send("hello").unwrap();
463 ///
464 /// match rx.try_recv() {
465 /// Ok(value) => assert_eq!(value, "hello"),
466 /// _ => unreachable!(),
467 /// }
468 /// }
469 /// ```
470 ///
471 /// `try_recv` when the sender dropped before sending a value
472 ///
473 /// ```
474 /// use tokio::sync::oneshot;
475 /// use tokio::sync::oneshot::error::TryRecvError;
476 ///
477 /// #[tokio::main]
478 /// async fn main() {
479 /// let (tx, mut rx) = oneshot::channel::<()>();
480 ///
481 /// drop(tx);
482 ///
483 /// match rx.try_recv() {
484 /// // The channel will never receive a value.
485 /// Err(TryRecvError::Closed) => {}
486 /// _ => unreachable!(),
487 /// }
488 /// }
489 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>490 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
491 let result = if let Some(inner) = self.inner.as_ref() {
492 let state = State::load(&inner.state, Acquire);
493
494 if state.is_complete() {
495 match unsafe { inner.consume_value() } {
496 Some(value) => Ok(value),
497 None => Err(TryRecvError::Closed),
498 }
499 } else if state.is_closed() {
500 Err(TryRecvError::Closed)
501 } else {
502 // Not ready, this does not clear `inner`
503 return Err(TryRecvError::Empty);
504 }
505 } else {
506 panic!("called after complete");
507 };
508
509 self.inner = None;
510 result
511 }
512 }
513
514 impl<T> Drop for Receiver<T> {
drop(&mut self)515 fn drop(&mut self) {
516 if let Some(inner) = self.inner.as_ref() {
517 inner.close();
518 }
519 }
520 }
521
522 impl<T> Future for Receiver<T> {
523 type Output = Result<T, RecvError>;
524
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>525 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
526 // If `inner` is `None`, then `poll()` has already completed.
527 let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
528 ready!(inner.poll_recv(cx))?
529 } else {
530 panic!("called after complete");
531 };
532
533 self.inner = None;
534 Ready(Ok(ret))
535 }
536 }
537
538 impl<T> Inner<T> {
complete(&self) -> bool539 fn complete(&self) -> bool {
540 let prev = State::set_complete(&self.state);
541
542 if prev.is_closed() {
543 return false;
544 }
545
546 if prev.is_rx_task_set() {
547 // TODO: Consume waker?
548 unsafe {
549 self.with_rx_task(Waker::wake_by_ref);
550 }
551 }
552
553 true
554 }
555
poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>556 fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
557 // Keep track of task budget
558 let coop = ready!(crate::coop::poll_proceed(cx));
559
560 // Load the state
561 let mut state = State::load(&self.state, Acquire);
562
563 if state.is_complete() {
564 coop.made_progress();
565 match unsafe { self.consume_value() } {
566 Some(value) => Ready(Ok(value)),
567 None => Ready(Err(RecvError(()))),
568 }
569 } else if state.is_closed() {
570 coop.made_progress();
571 Ready(Err(RecvError(())))
572 } else {
573 if state.is_rx_task_set() {
574 let will_notify = unsafe { self.with_rx_task(|w| w.will_wake(cx.waker())) };
575
576 // Check if the task is still the same
577 if !will_notify {
578 // Unset the task
579 state = State::unset_rx_task(&self.state);
580 if state.is_complete() {
581 // Set the flag again so that the waker is released in drop
582 State::set_rx_task(&self.state);
583
584 coop.made_progress();
585 return match unsafe { self.consume_value() } {
586 Some(value) => Ready(Ok(value)),
587 None => Ready(Err(RecvError(()))),
588 };
589 } else {
590 unsafe { self.drop_rx_task() };
591 }
592 }
593 }
594
595 if !state.is_rx_task_set() {
596 // Attempt to set the task
597 unsafe {
598 self.set_rx_task(cx);
599 }
600
601 // Update the state
602 state = State::set_rx_task(&self.state);
603
604 if state.is_complete() {
605 coop.made_progress();
606 match unsafe { self.consume_value() } {
607 Some(value) => Ready(Ok(value)),
608 None => Ready(Err(RecvError(()))),
609 }
610 } else {
611 Pending
612 }
613 } else {
614 Pending
615 }
616 }
617 }
618
619 /// Called by `Receiver` to indicate that the value will never be received.
close(&self)620 fn close(&self) {
621 let prev = State::set_closed(&self.state);
622
623 if prev.is_tx_task_set() && !prev.is_complete() {
624 unsafe {
625 self.with_tx_task(Waker::wake_by_ref);
626 }
627 }
628 }
629
630 /// Consumes the value. This function does not check `state`.
consume_value(&self) -> Option<T>631 unsafe fn consume_value(&self) -> Option<T> {
632 self.value.with_mut(|ptr| (*ptr).take())
633 }
634
with_rx_task<F, R>(&self, f: F) -> R where F: FnOnce(&Waker) -> R,635 unsafe fn with_rx_task<F, R>(&self, f: F) -> R
636 where
637 F: FnOnce(&Waker) -> R,
638 {
639 self.rx_task.with(|ptr| {
640 let waker: *const Waker = (&*ptr).as_ptr();
641 f(&*waker)
642 })
643 }
644
with_tx_task<F, R>(&self, f: F) -> R where F: FnOnce(&Waker) -> R,645 unsafe fn with_tx_task<F, R>(&self, f: F) -> R
646 where
647 F: FnOnce(&Waker) -> R,
648 {
649 self.tx_task.with(|ptr| {
650 let waker: *const Waker = (&*ptr).as_ptr();
651 f(&*waker)
652 })
653 }
654
drop_rx_task(&self)655 unsafe fn drop_rx_task(&self) {
656 self.rx_task.with_mut(|ptr| {
657 let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
658 ptr.drop_in_place();
659 });
660 }
661
drop_tx_task(&self)662 unsafe fn drop_tx_task(&self) {
663 self.tx_task.with_mut(|ptr| {
664 let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
665 ptr.drop_in_place();
666 });
667 }
668
set_rx_task(&self, cx: &mut Context<'_>)669 unsafe fn set_rx_task(&self, cx: &mut Context<'_>) {
670 self.rx_task.with_mut(|ptr| {
671 let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
672 ptr.write(cx.waker().clone());
673 });
674 }
675
set_tx_task(&self, cx: &mut Context<'_>)676 unsafe fn set_tx_task(&self, cx: &mut Context<'_>) {
677 self.tx_task.with_mut(|ptr| {
678 let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
679 ptr.write(cx.waker().clone());
680 });
681 }
682 }
683
684 unsafe impl<T: Send> Send for Inner<T> {}
685 unsafe impl<T: Send> Sync for Inner<T> {}
686
687 impl<T> Drop for Inner<T> {
drop(&mut self)688 fn drop(&mut self) {
689 let state = State(self.state.with_mut(|v| *v));
690
691 if state.is_rx_task_set() {
692 unsafe {
693 self.drop_rx_task();
694 }
695 }
696
697 if state.is_tx_task_set() {
698 unsafe {
699 self.drop_tx_task();
700 }
701 }
702 }
703 }
704
705 impl<T: fmt::Debug> fmt::Debug for Inner<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result706 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
707 use std::sync::atomic::Ordering::Relaxed;
708
709 fmt.debug_struct("Inner")
710 .field("state", &State::load(&self.state, Relaxed))
711 .finish()
712 }
713 }
714
715 const RX_TASK_SET: usize = 0b00001;
716 const VALUE_SENT: usize = 0b00010;
717 const CLOSED: usize = 0b00100;
718 const TX_TASK_SET: usize = 0b01000;
719
720 impl State {
new() -> State721 fn new() -> State {
722 State(0)
723 }
724
is_complete(self) -> bool725 fn is_complete(self) -> bool {
726 self.0 & VALUE_SENT == VALUE_SENT
727 }
728
set_complete(cell: &AtomicUsize) -> State729 fn set_complete(cell: &AtomicUsize) -> State {
730 // TODO: This could be `Release`, followed by an `Acquire` fence *if*
731 // the `RX_TASK_SET` flag is set. However, `loom` does not support
732 // fences yet.
733 let val = cell.fetch_or(VALUE_SENT, AcqRel);
734 State(val)
735 }
736
is_rx_task_set(self) -> bool737 fn is_rx_task_set(self) -> bool {
738 self.0 & RX_TASK_SET == RX_TASK_SET
739 }
740
set_rx_task(cell: &AtomicUsize) -> State741 fn set_rx_task(cell: &AtomicUsize) -> State {
742 let val = cell.fetch_or(RX_TASK_SET, AcqRel);
743 State(val | RX_TASK_SET)
744 }
745
unset_rx_task(cell: &AtomicUsize) -> State746 fn unset_rx_task(cell: &AtomicUsize) -> State {
747 let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
748 State(val & !RX_TASK_SET)
749 }
750
is_closed(self) -> bool751 fn is_closed(self) -> bool {
752 self.0 & CLOSED == CLOSED
753 }
754
set_closed(cell: &AtomicUsize) -> State755 fn set_closed(cell: &AtomicUsize) -> State {
756 // Acquire because we want all later writes (attempting to poll) to be
757 // ordered after this.
758 let val = cell.fetch_or(CLOSED, Acquire);
759 State(val)
760 }
761
set_tx_task(cell: &AtomicUsize) -> State762 fn set_tx_task(cell: &AtomicUsize) -> State {
763 let val = cell.fetch_or(TX_TASK_SET, AcqRel);
764 State(val | TX_TASK_SET)
765 }
766
unset_tx_task(cell: &AtomicUsize) -> State767 fn unset_tx_task(cell: &AtomicUsize) -> State {
768 let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
769 State(val & !TX_TASK_SET)
770 }
771
is_tx_task_set(self) -> bool772 fn is_tx_task_set(self) -> bool {
773 self.0 & TX_TASK_SET == TX_TASK_SET
774 }
775
as_usize(self) -> usize776 fn as_usize(self) -> usize {
777 self.0
778 }
779
load(cell: &AtomicUsize, order: Ordering) -> State780 fn load(cell: &AtomicUsize, order: Ordering) -> State {
781 let val = cell.load(order);
782 State(val)
783 }
784 }
785
786 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result787 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
788 fmt.debug_struct("State")
789 .field("is_complete", &self.is_complete())
790 .field("is_closed", &self.is_closed())
791 .field("is_rx_task_set", &self.is_rx_task_set())
792 .field("is_tx_task_set", &self.is_tx_task_set())
793 .finish()
794 }
795 }
796