1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 use std::io;
6 use std::sync::{Arc, Mutex, Weak};
7 use std::sync::atomic::{AtomicBool, Ordering};
8 use std::thread::{Builder, JoinHandle};
9 use std::time::{Duration, Instant};
10 
11 struct Canary {
12     alive: AtomicBool,
13     thread: Mutex<Option<JoinHandle<()>>>,
14 }
15 
16 impl Canary {
new() -> Self17     fn new() -> Self {
18         Self {
19             alive: AtomicBool::new(true),
20             thread: Mutex::new(None),
21         }
22     }
23 }
24 
25 pub struct RunLoop {
26     flag: Weak<Canary>,
27 }
28 
29 impl RunLoop {
new<F, T>(fun: F) -> io::Result<Self> where F: FnOnce(&Fn() -> bool) -> T, F: Send + 'static,30     pub fn new<F, T>(fun: F) -> io::Result<Self>
31     where
32         F: FnOnce(&Fn() -> bool) -> T,
33         F: Send + 'static,
34     {
35         Self::new_with_timeout(fun, 0 /* no timeout */)
36     }
37 
new_with_timeout<F, T>(fun: F, timeout_ms: u64) -> io::Result<Self> where F: FnOnce(&Fn() -> bool) -> T, F: Send + 'static,38     pub fn new_with_timeout<F, T>(fun: F, timeout_ms: u64) -> io::Result<Self>
39     where
40         F: FnOnce(&Fn() -> bool) -> T,
41         F: Send + 'static,
42     {
43         let flag = Arc::new(Canary::new());
44         let flag_ = flag.clone();
45 
46         // Spawn the run loop thread.
47         let thread = Builder::new().spawn(move || {
48             let timeout = Duration::from_millis(timeout_ms);
49             let start = Instant::now();
50 
51             // A callback to determine whether the thread should terminate.
52             let still_alive = || {
53                 // `flag.alive` will be false after cancel() was called.
54                 flag.alive.load(Ordering::Relaxed) &&
55                 // If a timeout was provided, we'll check that too.
56                 (timeout_ms == 0 || start.elapsed() < timeout)
57             };
58 
59             // Ignore return values.
60             let _ = fun(&still_alive);
61         })?;
62 
63         // We really should never fail to lock here.
64         let mut guard = (*flag_).thread.lock().map_err(|_| {
65             io::Error::new(io::ErrorKind::Other, "failed to lock")
66         })?;
67 
68         // Store the thread handle so we can join later.
69         *guard = Some(thread);
70 
71         Ok(Self { flag: Arc::downgrade(&flag_) })
72     }
73 
74     // Cancels the run loop and waits for the thread to terminate.
75     // This is a potentially BLOCKING operation.
cancel(&self)76     pub fn cancel(&self) {
77         // If the thread still exists...
78         if let Some(flag) = self.flag.upgrade() {
79             // ...let the run loop terminate.
80             flag.alive.store(false, Ordering::Relaxed);
81 
82             // Locking should never fail here either.
83             if let Ok(mut guard) = flag.thread.lock() {
84                 // This really can't fail.
85                 if let Some(handle) = (*guard).take() {
86                     // This might fail, ignore.
87                     let _ = handle.join();
88                 }
89             }
90         }
91     }
92 
93     // Tells whether the runloop is alive.
alive(&self) -> bool94     pub fn alive(&self) -> bool {
95         // If the thread still exists...
96         if let Some(flag) = self.flag.upgrade() {
97             flag.alive.load(Ordering::Relaxed)
98         } else {
99             false
100         }
101     }
102 }
103 
104 #[cfg(test)]
105 mod tests {
106     use std::sync::{Arc, Barrier};
107     use std::sync::mpsc::channel;
108 
109     use super::RunLoop;
110 
111     #[test]
test_empty()112     fn test_empty() {
113         // Create a runloop that exits right away.
114         let rloop = RunLoop::new(|_| {}).unwrap();
115         while rloop.alive() { /* wait */ }
116         rloop.cancel(); // noop
117     }
118 
119     #[test]
test_cancel_early()120     fn test_cancel_early() {
121         // Create a runloop and cancel it before the thread spawns.
122         RunLoop::new(|alive| assert!(!alive())).unwrap().cancel();
123     }
124 
125     #[test]
test_cancel_endless_loop()126     fn test_cancel_endless_loop() {
127         let barrier = Arc::new(Barrier::new(2));
128         let b = barrier.clone();
129 
130         // Create a runloop that never exits.
131         let rloop = RunLoop::new(move |alive| {
132             b.wait();
133             while alive() { /* loop */ }
134         }).unwrap();
135 
136         barrier.wait();
137         assert!(rloop.alive());
138         rloop.cancel();
139         assert!(!rloop.alive());
140     }
141 
142     #[test]
test_timeout()143     fn test_timeout() {
144         // Create a runloop that never exits, but times out after 1ms.
145         let rloop = RunLoop::new_with_timeout(|alive| while alive() {}, 1).unwrap();
146 
147         while rloop.alive() { /* wait */ }
148         assert!(!rloop.alive());
149         rloop.cancel(); // noop
150     }
151 
152     #[test]
test_channel()153     fn test_channel() {
154         let (tx, rx) = channel();
155 
156         // A runloop that sends data via a channel.
157         let rloop = RunLoop::new(move |alive| while alive() {
158             tx.send(0u8).unwrap();
159         }).unwrap();
160 
161         // Wait until the data arrives.
162         assert_eq!(rx.recv().unwrap(), 0u8);
163 
164         assert!(rloop.alive());
165         rloop.cancel();
166         assert!(!rloop.alive());
167     }
168 }
169