1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 use std::io; 6 use std::sync::{Arc, Mutex, Weak}; 7 use std::sync::atomic::{AtomicBool, Ordering}; 8 use std::thread::{Builder, JoinHandle}; 9 use std::time::{Duration, Instant}; 10 11 struct Canary { 12 alive: AtomicBool, 13 thread: Mutex<Option<JoinHandle<()>>>, 14 } 15 16 impl Canary { new() -> Self17 fn new() -> Self { 18 Self { 19 alive: AtomicBool::new(true), 20 thread: Mutex::new(None), 21 } 22 } 23 } 24 25 pub struct RunLoop { 26 flag: Weak<Canary>, 27 } 28 29 impl RunLoop { new<F, T>(fun: F) -> io::Result<Self> where F: FnOnce(&Fn() -> bool) -> T, F: Send + 'static,30 pub fn new<F, T>(fun: F) -> io::Result<Self> 31 where 32 F: FnOnce(&Fn() -> bool) -> T, 33 F: Send + 'static, 34 { 35 Self::new_with_timeout(fun, 0 /* no timeout */) 36 } 37 new_with_timeout<F, T>(fun: F, timeout_ms: u64) -> io::Result<Self> where F: FnOnce(&Fn() -> bool) -> T, F: Send + 'static,38 pub fn new_with_timeout<F, T>(fun: F, timeout_ms: u64) -> io::Result<Self> 39 where 40 F: FnOnce(&Fn() -> bool) -> T, 41 F: Send + 'static, 42 { 43 let flag = Arc::new(Canary::new()); 44 let flag_ = flag.clone(); 45 46 // Spawn the run loop thread. 47 let thread = Builder::new().spawn(move || { 48 let timeout = Duration::from_millis(timeout_ms); 49 let start = Instant::now(); 50 51 // A callback to determine whether the thread should terminate. 52 let still_alive = || { 53 // `flag.alive` will be false after cancel() was called. 54 flag.alive.load(Ordering::Relaxed) && 55 // If a timeout was provided, we'll check that too. 56 (timeout_ms == 0 || start.elapsed() < timeout) 57 }; 58 59 // Ignore return values. 60 let _ = fun(&still_alive); 61 })?; 62 63 // We really should never fail to lock here. 64 let mut guard = (*flag_).thread.lock().map_err(|_| { 65 io::Error::new(io::ErrorKind::Other, "failed to lock") 66 })?; 67 68 // Store the thread handle so we can join later. 69 *guard = Some(thread); 70 71 Ok(Self { flag: Arc::downgrade(&flag_) }) 72 } 73 74 // Cancels the run loop and waits for the thread to terminate. 75 // This is a potentially BLOCKING operation. cancel(&self)76 pub fn cancel(&self) { 77 // If the thread still exists... 78 if let Some(flag) = self.flag.upgrade() { 79 // ...let the run loop terminate. 80 flag.alive.store(false, Ordering::Relaxed); 81 82 // Locking should never fail here either. 83 if let Ok(mut guard) = flag.thread.lock() { 84 // This really can't fail. 85 if let Some(handle) = (*guard).take() { 86 // This might fail, ignore. 87 let _ = handle.join(); 88 } 89 } 90 } 91 } 92 93 // Tells whether the runloop is alive. alive(&self) -> bool94 pub fn alive(&self) -> bool { 95 // If the thread still exists... 96 if let Some(flag) = self.flag.upgrade() { 97 flag.alive.load(Ordering::Relaxed) 98 } else { 99 false 100 } 101 } 102 } 103 104 #[cfg(test)] 105 mod tests { 106 use std::sync::{Arc, Barrier}; 107 use std::sync::mpsc::channel; 108 109 use super::RunLoop; 110 111 #[test] test_empty()112 fn test_empty() { 113 // Create a runloop that exits right away. 114 let rloop = RunLoop::new(|_| {}).unwrap(); 115 while rloop.alive() { /* wait */ } 116 rloop.cancel(); // noop 117 } 118 119 #[test] test_cancel_early()120 fn test_cancel_early() { 121 // Create a runloop and cancel it before the thread spawns. 122 RunLoop::new(|alive| assert!(!alive())).unwrap().cancel(); 123 } 124 125 #[test] test_cancel_endless_loop()126 fn test_cancel_endless_loop() { 127 let barrier = Arc::new(Barrier::new(2)); 128 let b = barrier.clone(); 129 130 // Create a runloop that never exits. 131 let rloop = RunLoop::new(move |alive| { 132 b.wait(); 133 while alive() { /* loop */ } 134 }).unwrap(); 135 136 barrier.wait(); 137 assert!(rloop.alive()); 138 rloop.cancel(); 139 assert!(!rloop.alive()); 140 } 141 142 #[test] test_timeout()143 fn test_timeout() { 144 // Create a runloop that never exits, but times out after 1ms. 145 let rloop = RunLoop::new_with_timeout(|alive| while alive() {}, 1).unwrap(); 146 147 while rloop.alive() { /* wait */ } 148 assert!(!rloop.alive()); 149 rloop.cancel(); // noop 150 } 151 152 #[test] test_channel()153 fn test_channel() { 154 let (tx, rx) = channel(); 155 156 // A runloop that sends data via a channel. 157 let rloop = RunLoop::new(move |alive| while alive() { 158 tx.send(0u8).unwrap(); 159 }).unwrap(); 160 161 // Wait until the data arrives. 162 assert_eq!(rx.recv().unwrap(), 0u8); 163 164 assert!(rloop.alive()); 165 rloop.cancel(); 166 assert!(!rloop.alive()); 167 } 168 } 169