1 //! A shutdown channel.
2 //!
3 //! Each worker holds the `Sender` half. When all the `Sender` halves are
4 //! dropped, the `Receiver` receives a notification.
5 
6 use crate::loom::sync::Arc;
7 use crate::sync::oneshot;
8 
9 use std::time::Duration;
10 
11 #[derive(Debug, Clone)]
12 pub(super) struct Sender {
13     tx: Arc<oneshot::Sender<()>>,
14 }
15 
16 #[derive(Debug)]
17 pub(super) struct Receiver {
18     rx: oneshot::Receiver<()>,
19 }
20 
channel() -> (Sender, Receiver)21 pub(super) fn channel() -> (Sender, Receiver) {
22     let (tx, rx) = oneshot::channel();
23     let tx = Sender { tx: Arc::new(tx) };
24     let rx = Receiver { rx };
25 
26     (tx, rx)
27 }
28 
29 impl Receiver {
30     /// Blocks the current thread until all `Sender` handles drop.
31     ///
32     /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
33     /// duration. If `timeout` is `None`, then the thread is blocked until the
34     /// shutdown signal is received.
35     ///
36     /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
wait(&mut self, timeout: Option<Duration>) -> bool37     pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
38         use crate::runtime::enter::try_enter;
39 
40         if timeout == Some(Duration::from_nanos(0)) {
41             return true;
42         }
43 
44         let mut e = match try_enter(false) {
45             Some(enter) => enter,
46             _ => {
47                 if std::thread::panicking() {
48                     // Don't panic in a panic
49                     return false;
50                 } else {
51                     panic!(
52                         "Cannot drop a runtime in a context where blocking is not allowed. \
53                         This happens when a runtime is dropped from within an asynchronous context."
54                     );
55                 }
56             }
57         };
58 
59         // The oneshot completes with an Err
60         //
61         // If blocking fails to wait, this indicates a problem parking the
62         // current thread (usually, shutting down a runtime stored in a
63         // thread-local).
64         if let Some(timeout) = timeout {
65             e.block_on_timeout(&mut self.rx, timeout).is_ok()
66         } else {
67             let _ = e.block_on(&mut self.rx);
68             true
69         }
70     }
71 }
72