1 //! Debug Logging 2 //! 3 //! To use in a debug build, set the env var `RAYON_LOG` as 4 //! described below. In a release build, logs are compiled out by 5 //! default unless Rayon is built with `--cfg rayon_rs_log` (try 6 //! `RUSTFLAGS="--cfg rayon_rs_log"`). 7 //! 8 //! Note that logs are an internally debugging tool and their format 9 //! is considered unstable, as are the details of how to enable them. 10 //! 11 //! # Valid values for RAYON_LOG 12 //! 13 //! The `RAYON_LOG` variable can take on the following values: 14 //! 15 //! * `tail:<file>` -- dumps the last 10,000 events into the given file; 16 //! useful for tracking down deadlocks 17 //! * `profile:<file>` -- dumps only those events needed to reconstruct how 18 //! many workers are active at a given time 19 //! * `all:<file>` -- dumps every event to the file; useful for debugging 20 21 use crossbeam_channel::{self, Receiver, Sender}; 22 use std::collections::VecDeque; 23 use std::env; 24 use std::fs::File; 25 use std::io::{self, BufWriter, Write}; 26 27 /// True if logs are compiled in. 28 pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions)); 29 30 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] 31 pub(super) enum Event { 32 /// Flushes events to disk, used to terminate benchmarking. 33 Flush, 34 35 /// Indicates that a worker thread started execution. 36 ThreadStart { 37 worker: usize, 38 terminate_addr: usize, 39 }, 40 41 /// Indicates that a worker thread started execution. 42 ThreadTerminate { worker: usize }, 43 44 /// Indicates that a worker thread became idle, blocked on `latch_addr`. 45 ThreadIdle { worker: usize, latch_addr: usize }, 46 47 /// Indicates that an idle worker thread found work to do, after 48 /// yield rounds. It should no longer be considered idle. 49 ThreadFoundWork { worker: usize, yields: u32 }, 50 51 /// Indicates that a worker blocked on a latch observed that it was set. 52 /// 53 /// Internal debugging event that does not affect the state 54 /// machine. 55 ThreadSawLatchSet { worker: usize, latch_addr: usize }, 56 57 /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal 58 /// sleep state that we saw at the time. 59 ThreadSleepy { worker: usize, jobs_counter: usize }, 60 61 /// Indicates that the thread's attempt to fall asleep was 62 /// interrupted because the latch was set. (This is not, in and of 63 /// itself, a change to the thread state.) 64 ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }, 65 66 /// Indicates that the thread's attempt to fall asleep was 67 /// interrupted because a job was posted. (This is not, in and of 68 /// itself, a change to the thread state.) 69 ThreadSleepInterruptedByJob { worker: usize }, 70 71 /// Indicates that an idle worker has gone to sleep. 72 ThreadSleeping { worker: usize, latch_addr: usize }, 73 74 /// Indicates that a sleeping worker has awoken. 75 ThreadAwoken { worker: usize, latch_addr: usize }, 76 77 /// Indicates that the given worker thread was notified it should 78 /// awaken. 79 ThreadNotify { worker: usize }, 80 81 /// The given worker has pushed a job to its local deque. 82 JobPushed { worker: usize }, 83 84 /// The given worker has popped a job from its local deque. 85 JobPopped { worker: usize }, 86 87 /// The given worker has stolen a job from the deque of another. 88 JobStolen { worker: usize, victim: usize }, 89 90 /// N jobs were injected into the global queue. 91 JobsInjected { count: usize }, 92 93 /// A job was removed from the global queue. 94 JobUninjected { worker: usize }, 95 96 /// When announcing a job, this was the value of the counters we observed. 97 /// 98 /// No effect on thread state, just a debugging event. 99 JobThreadCounts { 100 worker: usize, 101 num_idle: u16, 102 num_sleepers: u16, 103 }, 104 } 105 106 /// Handle to the logging thread, if any. You can use this to deliver 107 /// logs. You can also clone it freely. 108 #[derive(Clone)] 109 pub(super) struct Logger { 110 sender: Option<Sender<Event>>, 111 } 112 113 impl Logger { new(num_workers: usize) -> Logger114 pub(super) fn new(num_workers: usize) -> Logger { 115 if !LOG_ENABLED { 116 return Self::disabled(); 117 } 118 119 // see the doc comment for the format 120 let env_log = match env::var("RAYON_LOG") { 121 Ok(s) => s, 122 Err(_) => return Self::disabled(), 123 }; 124 125 let (sender, receiver) = crossbeam_channel::unbounded(); 126 127 if env_log.starts_with("tail:") { 128 let filename = env_log["tail:".len()..].to_string(); 129 ::std::thread::spawn(move || { 130 Self::tail_logger_thread(num_workers, filename, 10_000, receiver) 131 }); 132 } else if env_log == "all" { 133 ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver)); 134 } else if env_log.starts_with("profile:") { 135 let filename = env_log["profile:".len()..].to_string(); 136 ::std::thread::spawn(move || { 137 Self::profile_logger_thread(num_workers, filename, 10_000, receiver) 138 }); 139 } else { 140 panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'"); 141 } 142 143 return Logger { 144 sender: Some(sender), 145 }; 146 } 147 disabled() -> Logger148 fn disabled() -> Logger { 149 Logger { sender: None } 150 } 151 152 #[inline] log(&self, event: impl FnOnce() -> Event)153 pub(super) fn log(&self, event: impl FnOnce() -> Event) { 154 if !LOG_ENABLED { 155 return; 156 } 157 158 if let Some(sender) = &self.sender { 159 sender.send(event()).unwrap(); 160 } 161 } 162 profile_logger_thread( num_workers: usize, log_filename: String, capacity: usize, receiver: Receiver<Event>, )163 fn profile_logger_thread( 164 num_workers: usize, 165 log_filename: String, 166 capacity: usize, 167 receiver: Receiver<Event>, 168 ) { 169 let file = File::create(&log_filename) 170 .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); 171 172 let mut writer = BufWriter::new(file); 173 let mut events = Vec::with_capacity(capacity); 174 let mut state = SimulatorState::new(num_workers); 175 let timeout = std::time::Duration::from_secs(30); 176 177 loop { 178 loop { 179 match receiver.recv_timeout(timeout) { 180 Ok(event) => { 181 if let Event::Flush = event { 182 break; 183 } else { 184 events.push(event); 185 } 186 } 187 188 Err(_) => break, 189 } 190 191 if events.len() == capacity { 192 break; 193 } 194 } 195 196 for event in events.drain(..) { 197 if state.simulate(&event) { 198 state.dump(&mut writer, &event).unwrap(); 199 } 200 } 201 202 writer.flush().unwrap(); 203 } 204 } 205 tail_logger_thread( num_workers: usize, log_filename: String, capacity: usize, receiver: Receiver<Event>, )206 fn tail_logger_thread( 207 num_workers: usize, 208 log_filename: String, 209 capacity: usize, 210 receiver: Receiver<Event>, 211 ) { 212 let file = File::create(&log_filename) 213 .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); 214 215 let mut writer = BufWriter::new(file); 216 let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity); 217 let mut state = SimulatorState::new(num_workers); 218 let timeout = std::time::Duration::from_secs(30); 219 let mut skipped = false; 220 221 loop { 222 loop { 223 match receiver.recv_timeout(timeout) { 224 Ok(event) => { 225 if let Event::Flush = event { 226 // We ignore Flush events in tail mode -- 227 // we're really just looking for 228 // deadlocks. 229 continue; 230 } else { 231 if events.len() == capacity { 232 let event = events.pop_front().unwrap(); 233 state.simulate(&event); 234 skipped = true; 235 } 236 237 events.push_back(event); 238 } 239 } 240 241 Err(_) => break, 242 } 243 } 244 245 if skipped { 246 write!(writer, "...\n").unwrap(); 247 skipped = false; 248 } 249 250 for event in events.drain(..) { 251 // In tail mode, we dump *all* events out, whether or 252 // not they were 'interesting' to the state machine. 253 state.simulate(&event); 254 state.dump(&mut writer, &event).unwrap(); 255 } 256 257 writer.flush().unwrap(); 258 } 259 } 260 all_logger_thread(num_workers: usize, receiver: Receiver<Event>)261 fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) { 262 let stderr = std::io::stderr(); 263 let mut state = SimulatorState::new(num_workers); 264 265 for event in receiver { 266 let mut writer = BufWriter::new(stderr.lock()); 267 state.simulate(&event); 268 state.dump(&mut writer, &event).unwrap(); 269 writer.flush().unwrap(); 270 } 271 } 272 } 273 274 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] 275 enum State { 276 Working, 277 Idle, 278 Notified, 279 Sleeping, 280 Terminated, 281 } 282 283 impl State { letter(&self) -> char284 fn letter(&self) -> char { 285 match self { 286 State::Working => 'W', 287 State::Idle => 'I', 288 State::Notified => 'N', 289 State::Sleeping => 'S', 290 State::Terminated => 'T', 291 } 292 } 293 } 294 295 struct SimulatorState { 296 local_queue_size: Vec<usize>, 297 thread_states: Vec<State>, 298 injector_size: usize, 299 } 300 301 impl SimulatorState { new(num_workers: usize) -> Self302 fn new(num_workers: usize) -> Self { 303 Self { 304 local_queue_size: (0..num_workers).map(|_| 0).collect(), 305 thread_states: (0..num_workers).map(|_| State::Working).collect(), 306 injector_size: 0, 307 } 308 } 309 simulate(&mut self, event: &Event) -> bool310 fn simulate(&mut self, event: &Event) -> bool { 311 match *event { 312 Event::ThreadIdle { worker, .. } => { 313 assert_eq!(self.thread_states[worker], State::Working); 314 self.thread_states[worker] = State::Idle; 315 true 316 } 317 318 Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => { 319 self.thread_states[worker] = State::Working; 320 true 321 } 322 323 Event::ThreadTerminate { worker, .. } => { 324 self.thread_states[worker] = State::Terminated; 325 true 326 } 327 328 Event::ThreadSleeping { worker, .. } => { 329 assert_eq!(self.thread_states[worker], State::Idle); 330 self.thread_states[worker] = State::Sleeping; 331 true 332 } 333 334 Event::ThreadAwoken { worker, .. } => { 335 assert_eq!(self.thread_states[worker], State::Notified); 336 self.thread_states[worker] = State::Idle; 337 true 338 } 339 340 Event::JobPushed { worker } => { 341 self.local_queue_size[worker] += 1; 342 true 343 } 344 345 Event::JobPopped { worker } => { 346 self.local_queue_size[worker] -= 1; 347 true 348 } 349 350 Event::JobStolen { victim, .. } => { 351 self.local_queue_size[victim] -= 1; 352 true 353 } 354 355 Event::JobsInjected { count } => { 356 self.injector_size += count; 357 true 358 } 359 360 Event::JobUninjected { .. } => { 361 self.injector_size -= 1; 362 true 363 } 364 365 Event::ThreadNotify { worker } => { 366 // Currently, this log event occurs while holding the 367 // thread lock, so we should *always* see it before 368 // the worker awakens. 369 assert_eq!(self.thread_states[worker], State::Sleeping); 370 self.thread_states[worker] = State::Notified; 371 true 372 } 373 374 // remaining events are no-ops from pov of simulating the 375 // thread state 376 _ => false, 377 } 378 } 379 dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()>380 fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> { 381 let num_idle_threads = self 382 .thread_states 383 .iter() 384 .filter(|s| **s == State::Idle) 385 .count(); 386 387 let num_sleeping_threads = self 388 .thread_states 389 .iter() 390 .filter(|s| **s == State::Sleeping) 391 .count(); 392 393 let num_notified_threads = self 394 .thread_states 395 .iter() 396 .filter(|s| **s == State::Notified) 397 .count(); 398 399 let num_pending_jobs: usize = self.local_queue_size.iter().sum(); 400 401 write!(w, "{:2},", num_idle_threads)?; 402 write!(w, "{:2},", num_sleeping_threads)?; 403 write!(w, "{:2},", num_notified_threads)?; 404 write!(w, "{:4},", num_pending_jobs)?; 405 write!(w, "{:4},", self.injector_size)?; 406 407 let event_str = format!("{:?}", event); 408 write!(w, r#""{:60}","#, event_str)?; 409 410 for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) { 411 write!(w, " T{:02},{}", i, state.letter(),)?; 412 413 if *queue_size > 0 { 414 write!(w, ",{:03},", queue_size)?; 415 } else { 416 write!(w, ", ,")?; 417 } 418 } 419 420 write!(w, "\n")?; 421 Ok(()) 422 } 423 } 424