1 //! Run tests concurrently.
2 //!
3 //! This module provides the `ConcurrentRunner` struct which uses a pool of threads to run tests
4 //! concurrently.
5 
6 use crate::runone;
7 use crate::TestResult;
8 use cranelift_codegen::dbg::LOG_FILENAME_PREFIX;
9 use cranelift_codegen::timing;
10 use file_per_thread_logger;
11 use log::error;
12 use num_cpus;
13 use std::panic::catch_unwind;
14 use std::path::{Path, PathBuf};
15 use std::sync::mpsc::{channel, Receiver, Sender};
16 use std::sync::{Arc, Mutex};
17 use std::thread;
18 use std::time::Duration;
19 
20 /// Request sent to worker threads contains jobid and path.
21 struct Request(usize, PathBuf);
22 
23 /// Reply from worker thread,
24 pub enum Reply {
25     Starting { jobid: usize, thread_num: usize },
26     Done { jobid: usize, result: TestResult },
27     Tick,
28 }
29 
30 /// Manage threads that run test jobs concurrently.
31 pub struct ConcurrentRunner {
32     /// Channel for sending requests to the worker threads.
33     /// The workers are sharing the receiver with an `Arc<Mutex<Receiver>>`.
34     /// This is `None` when shutting down.
35     request_tx: Option<Sender<Request>>,
36 
37     /// Channel for receiving replies from the workers.
38     /// Workers have their own `Sender`.
39     reply_rx: Receiver<Reply>,
40 
41     handles: Vec<thread::JoinHandle<timing::PassTimes>>,
42 }
43 
44 impl ConcurrentRunner {
45     /// Create a new `ConcurrentRunner` with threads spun up.
new() -> Self46     pub fn new() -> Self {
47         let (request_tx, request_rx) = channel();
48         let request_mutex = Arc::new(Mutex::new(request_rx));
49         let (reply_tx, reply_rx) = channel();
50 
51         heartbeat_thread(reply_tx.clone());
52 
53         let num_threads = std::env::var("CRANELIFT_FILETESTS_THREADS")
54             .ok()
55             .map(|s| {
56                 use std::str::FromStr;
57                 let n = usize::from_str(&s).unwrap();
58                 assert!(n > 0);
59                 n
60             })
61             .unwrap_or_else(|| num_cpus::get());
62         let handles = (0..num_threads)
63             .map(|num| worker_thread(num, request_mutex.clone(), reply_tx.clone()))
64             .collect();
65 
66         Self {
67             request_tx: Some(request_tx),
68             reply_rx,
69             handles,
70         }
71     }
72 
73     /// Shut down worker threads orderly. They will finish any queued jobs first.
shutdown(&mut self)74     pub fn shutdown(&mut self) {
75         self.request_tx = None;
76     }
77 
78     /// Join all the worker threads.
79     /// Transfer pass timings from the worker threads to the current thread.
join(&mut self)80     pub fn join(&mut self) {
81         assert!(self.request_tx.is_none(), "must shutdown before join");
82         for h in self.handles.drain(..) {
83             match h.join() {
84                 Ok(t) => timing::add_to_current(&t),
85                 Err(e) => println!("worker panicked: {:?}", e),
86             }
87         }
88     }
89 
90     /// Add a new job to the queues.
put(&mut self, jobid: usize, path: &Path)91     pub fn put(&mut self, jobid: usize, path: &Path) {
92         self.request_tx
93             .as_ref()
94             .expect("cannot push after shutdown")
95             .send(Request(jobid, path.to_owned()))
96             .expect("all the worker threads are gone");
97     }
98 
99     /// Get a job reply without blocking.
try_get(&mut self) -> Option<Reply>100     pub fn try_get(&mut self) -> Option<Reply> {
101         self.reply_rx.try_recv().ok()
102     }
103 
104     /// Get a job reply, blocking until one is available.
get(&mut self) -> Option<Reply>105     pub fn get(&mut self) -> Option<Reply> {
106         self.reply_rx.recv().ok()
107     }
108 }
109 
110 /// Spawn a heartbeat thread which sends ticks down the reply channel every second.
111 /// This lets us implement timeouts without the not yet stable `recv_timeout`.
heartbeat_thread(replies: Sender<Reply>) -> thread::JoinHandle<()>112 fn heartbeat_thread(replies: Sender<Reply>) -> thread::JoinHandle<()> {
113     thread::Builder::new()
114         .name("heartbeat".to_string())
115         .spawn(move || {
116             file_per_thread_logger::initialize(LOG_FILENAME_PREFIX);
117             while replies.send(Reply::Tick).is_ok() {
118                 thread::sleep(Duration::from_secs(1));
119             }
120         })
121         .unwrap()
122 }
123 
124 /// Spawn a worker thread running tests.
worker_thread( thread_num: usize, requests: Arc<Mutex<Receiver<Request>>>, replies: Sender<Reply>, ) -> thread::JoinHandle<timing::PassTimes>125 fn worker_thread(
126     thread_num: usize,
127     requests: Arc<Mutex<Receiver<Request>>>,
128     replies: Sender<Reply>,
129 ) -> thread::JoinHandle<timing::PassTimes> {
130     thread::Builder::new()
131         .name(format!("worker #{}", thread_num))
132         .spawn(move || {
133             file_per_thread_logger::initialize(LOG_FILENAME_PREFIX);
134             loop {
135                 // Lock the mutex only long enough to extract a request.
136                 let Request(jobid, path) = match requests.lock().unwrap().recv() {
137                     Err(..) => break, // TX end shut down. exit thread.
138                     Ok(req) => req,
139                 };
140 
141                 // Tell them we're starting this job.
142                 // The receiver should always be present for this as long as we have jobs.
143                 replies.send(Reply::Starting { jobid, thread_num }).unwrap();
144 
145                 let result = catch_unwind(|| runone::run(path.as_path(), None, None))
146                     .unwrap_or_else(|e| {
147                         // The test panicked, leaving us a `Box<Any>`.
148                         // Panics are usually strings.
149                         if let Some(msg) = e.downcast_ref::<String>() {
150                             Err(format!("panicked in worker #{}: {}", thread_num, msg))
151                         } else if let Some(msg) = e.downcast_ref::<&'static str>() {
152                             Err(format!("panicked in worker #{}: {}", thread_num, msg))
153                         } else {
154                             Err(format!("panicked in worker #{}", thread_num))
155                         }
156                     });
157 
158                 if let Err(ref msg) = result {
159                     error!("FAIL: {}", msg);
160                 }
161 
162                 replies.send(Reply::Done { jobid, result }).unwrap();
163             }
164 
165             // Timing is accumulated independently per thread.
166             // Timings from this worker thread will be aggregated by `ConcurrentRunner::join()`.
167             timing::take_current()
168         })
169         .unwrap()
170 }
171