1 //! Debug Logging
2 //!
3 //! To use in a debug build, set the env var `RAYON_LOG` as
4 //! described below.  In a release build, logs are compiled out by
5 //! default unless Rayon is built with `--cfg rayon_rs_log` (try
6 //! `RUSTFLAGS="--cfg rayon_rs_log"`).
7 //!
8 //! Note that logs are an internally debugging tool and their format
9 //! is considered unstable, as are the details of how to enable them.
10 //!
11 //! # Valid values for RAYON_LOG
12 //!
13 //! The `RAYON_LOG` variable can take on the following values:
14 //!
15 //! * `tail:<file>` -- dumps the last 10,000 events into the given file;
16 //!   useful for tracking down deadlocks
17 //! * `profile:<file>` -- dumps only those events needed to reconstruct how
18 //!   many workers are active at a given time
19 //! * `all:<file>` -- dumps every event to the file; useful for debugging
20 
21 use crossbeam_channel::{self, Receiver, Sender};
22 use std::collections::VecDeque;
23 use std::env;
24 use std::fs::File;
25 use std::io::{self, BufWriter, Write};
26 
27 /// True if logs are compiled in.
28 pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
29 
30 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
31 pub(super) enum Event {
32     /// Flushes events to disk, used to terminate benchmarking.
33     Flush,
34 
35     /// Indicates that a worker thread started execution.
36     ThreadStart {
37         worker: usize,
38         terminate_addr: usize,
39     },
40 
41     /// Indicates that a worker thread started execution.
42     ThreadTerminate { worker: usize },
43 
44     /// Indicates that a worker thread became idle, blocked on `latch_addr`.
45     ThreadIdle { worker: usize, latch_addr: usize },
46 
47     /// Indicates that an idle worker thread found work to do, after
48     /// yield rounds. It should no longer be considered idle.
49     ThreadFoundWork { worker: usize, yields: u32 },
50 
51     /// Indicates that a worker blocked on a latch observed that it was set.
52     ///
53     /// Internal debugging event that does not affect the state
54     /// machine.
55     ThreadSawLatchSet { worker: usize, latch_addr: usize },
56 
57     /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
58     /// sleep state that we saw at the time.
59     ThreadSleepy { worker: usize, jobs_counter: usize },
60 
61     /// Indicates that the thread's attempt to fall asleep was
62     /// interrupted because the latch was set. (This is not, in and of
63     /// itself, a change to the thread state.)
64     ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
65 
66     /// Indicates that the thread's attempt to fall asleep was
67     /// interrupted because a job was posted. (This is not, in and of
68     /// itself, a change to the thread state.)
69     ThreadSleepInterruptedByJob { worker: usize },
70 
71     /// Indicates that an idle worker has gone to sleep.
72     ThreadSleeping { worker: usize, latch_addr: usize },
73 
74     /// Indicates that a sleeping worker has awoken.
75     ThreadAwoken { worker: usize, latch_addr: usize },
76 
77     /// Indicates that the given worker thread was notified it should
78     /// awaken.
79     ThreadNotify { worker: usize },
80 
81     /// The given worker has pushed a job to its local deque.
82     JobPushed { worker: usize },
83 
84     /// The given worker has popped a job from its local deque.
85     JobPopped { worker: usize },
86 
87     /// The given worker has stolen a job from the deque of another.
88     JobStolen { worker: usize, victim: usize },
89 
90     /// N jobs were injected into the global queue.
91     JobsInjected { count: usize },
92 
93     /// A job was removed from the global queue.
94     JobUninjected { worker: usize },
95 
96     /// When announcing a job, this was the value of the counters we observed.
97     ///
98     /// No effect on thread state, just a debugging event.
99     JobThreadCounts {
100         worker: usize,
101         num_idle: u16,
102         num_sleepers: u16,
103     },
104 }
105 
106 /// Handle to the logging thread, if any. You can use this to deliver
107 /// logs. You can also clone it freely.
108 #[derive(Clone)]
109 pub(super) struct Logger {
110     sender: Option<Sender<Event>>,
111 }
112 
113 impl Logger {
new(num_workers: usize) -> Logger114     pub(super) fn new(num_workers: usize) -> Logger {
115         if !LOG_ENABLED {
116             return Self::disabled();
117         }
118 
119         // see the doc comment for the format
120         let env_log = match env::var("RAYON_LOG") {
121             Ok(s) => s,
122             Err(_) => return Self::disabled(),
123         };
124 
125         let (sender, receiver) = crossbeam_channel::unbounded();
126 
127         if env_log.starts_with("tail:") {
128             let filename = env_log["tail:".len()..].to_string();
129             ::std::thread::spawn(move || {
130                 Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
131             });
132         } else if env_log == "all" {
133             ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
134         } else if env_log.starts_with("profile:") {
135             let filename = env_log["profile:".len()..].to_string();
136             ::std::thread::spawn(move || {
137                 Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
138             });
139         } else {
140             panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
141         }
142 
143         return Logger {
144             sender: Some(sender),
145         };
146     }
147 
disabled() -> Logger148     fn disabled() -> Logger {
149         Logger { sender: None }
150     }
151 
152     #[inline]
log(&self, event: impl FnOnce() -> Event)153     pub(super) fn log(&self, event: impl FnOnce() -> Event) {
154         if !LOG_ENABLED {
155             return;
156         }
157 
158         if let Some(sender) = &self.sender {
159             sender.send(event()).unwrap();
160         }
161     }
162 
profile_logger_thread( num_workers: usize, log_filename: String, capacity: usize, receiver: Receiver<Event>, )163     fn profile_logger_thread(
164         num_workers: usize,
165         log_filename: String,
166         capacity: usize,
167         receiver: Receiver<Event>,
168     ) {
169         let file = File::create(&log_filename)
170             .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
171 
172         let mut writer = BufWriter::new(file);
173         let mut events = Vec::with_capacity(capacity);
174         let mut state = SimulatorState::new(num_workers);
175         let timeout = std::time::Duration::from_secs(30);
176 
177         loop {
178             loop {
179                 match receiver.recv_timeout(timeout) {
180                     Ok(event) => {
181                         if let Event::Flush = event {
182                             break;
183                         } else {
184                             events.push(event);
185                         }
186                     }
187 
188                     Err(_) => break,
189                 }
190 
191                 if events.len() == capacity {
192                     break;
193                 }
194             }
195 
196             for event in events.drain(..) {
197                 if state.simulate(&event) {
198                     state.dump(&mut writer, &event).unwrap();
199                 }
200             }
201 
202             writer.flush().unwrap();
203         }
204     }
205 
tail_logger_thread( num_workers: usize, log_filename: String, capacity: usize, receiver: Receiver<Event>, )206     fn tail_logger_thread(
207         num_workers: usize,
208         log_filename: String,
209         capacity: usize,
210         receiver: Receiver<Event>,
211     ) {
212         let file = File::create(&log_filename)
213             .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
214 
215         let mut writer = BufWriter::new(file);
216         let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
217         let mut state = SimulatorState::new(num_workers);
218         let timeout = std::time::Duration::from_secs(30);
219         let mut skipped = false;
220 
221         loop {
222             loop {
223                 match receiver.recv_timeout(timeout) {
224                     Ok(event) => {
225                         if let Event::Flush = event {
226                             // We ignore Flush events in tail mode --
227                             // we're really just looking for
228                             // deadlocks.
229                             continue;
230                         } else {
231                             if events.len() == capacity {
232                                 let event = events.pop_front().unwrap();
233                                 state.simulate(&event);
234                                 skipped = true;
235                             }
236 
237                             events.push_back(event);
238                         }
239                     }
240 
241                     Err(_) => break,
242                 }
243             }
244 
245             if skipped {
246                 write!(writer, "...\n").unwrap();
247                 skipped = false;
248             }
249 
250             for event in events.drain(..) {
251                 // In tail mode, we dump *all* events out, whether or
252                 // not they were 'interesting' to the state machine.
253                 state.simulate(&event);
254                 state.dump(&mut writer, &event).unwrap();
255             }
256 
257             writer.flush().unwrap();
258         }
259     }
260 
all_logger_thread(num_workers: usize, receiver: Receiver<Event>)261     fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
262         let stderr = std::io::stderr();
263         let mut state = SimulatorState::new(num_workers);
264 
265         for event in receiver {
266             let mut writer = BufWriter::new(stderr.lock());
267             state.simulate(&event);
268             state.dump(&mut writer, &event).unwrap();
269             writer.flush().unwrap();
270         }
271     }
272 }
273 
274 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
275 enum State {
276     Working,
277     Idle,
278     Notified,
279     Sleeping,
280     Terminated,
281 }
282 
283 impl State {
letter(&self) -> char284     fn letter(&self) -> char {
285         match self {
286             State::Working => 'W',
287             State::Idle => 'I',
288             State::Notified => 'N',
289             State::Sleeping => 'S',
290             State::Terminated => 'T',
291         }
292     }
293 }
294 
295 struct SimulatorState {
296     local_queue_size: Vec<usize>,
297     thread_states: Vec<State>,
298     injector_size: usize,
299 }
300 
301 impl SimulatorState {
new(num_workers: usize) -> Self302     fn new(num_workers: usize) -> Self {
303         Self {
304             local_queue_size: (0..num_workers).map(|_| 0).collect(),
305             thread_states: (0..num_workers).map(|_| State::Working).collect(),
306             injector_size: 0,
307         }
308     }
309 
simulate(&mut self, event: &Event) -> bool310     fn simulate(&mut self, event: &Event) -> bool {
311         match *event {
312             Event::ThreadIdle { worker, .. } => {
313                 assert_eq!(self.thread_states[worker], State::Working);
314                 self.thread_states[worker] = State::Idle;
315                 true
316             }
317 
318             Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
319                 self.thread_states[worker] = State::Working;
320                 true
321             }
322 
323             Event::ThreadTerminate { worker, .. } => {
324                 self.thread_states[worker] = State::Terminated;
325                 true
326             }
327 
328             Event::ThreadSleeping { worker, .. } => {
329                 assert_eq!(self.thread_states[worker], State::Idle);
330                 self.thread_states[worker] = State::Sleeping;
331                 true
332             }
333 
334             Event::ThreadAwoken { worker, .. } => {
335                 assert_eq!(self.thread_states[worker], State::Notified);
336                 self.thread_states[worker] = State::Idle;
337                 true
338             }
339 
340             Event::JobPushed { worker } => {
341                 self.local_queue_size[worker] += 1;
342                 true
343             }
344 
345             Event::JobPopped { worker } => {
346                 self.local_queue_size[worker] -= 1;
347                 true
348             }
349 
350             Event::JobStolen { victim, .. } => {
351                 self.local_queue_size[victim] -= 1;
352                 true
353             }
354 
355             Event::JobsInjected { count } => {
356                 self.injector_size += count;
357                 true
358             }
359 
360             Event::JobUninjected { .. } => {
361                 self.injector_size -= 1;
362                 true
363             }
364 
365             Event::ThreadNotify { worker } => {
366                 // Currently, this log event occurs while holding the
367                 // thread lock, so we should *always* see it before
368                 // the worker awakens.
369                 assert_eq!(self.thread_states[worker], State::Sleeping);
370                 self.thread_states[worker] = State::Notified;
371                 true
372             }
373 
374             // remaining events are no-ops from pov of simulating the
375             // thread state
376             _ => false,
377         }
378     }
379 
dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()>380     fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
381         let num_idle_threads = self
382             .thread_states
383             .iter()
384             .filter(|s| **s == State::Idle)
385             .count();
386 
387         let num_sleeping_threads = self
388             .thread_states
389             .iter()
390             .filter(|s| **s == State::Sleeping)
391             .count();
392 
393         let num_notified_threads = self
394             .thread_states
395             .iter()
396             .filter(|s| **s == State::Notified)
397             .count();
398 
399         let num_pending_jobs: usize = self.local_queue_size.iter().sum();
400 
401         write!(w, "{:2},", num_idle_threads)?;
402         write!(w, "{:2},", num_sleeping_threads)?;
403         write!(w, "{:2},", num_notified_threads)?;
404         write!(w, "{:4},", num_pending_jobs)?;
405         write!(w, "{:4},", self.injector_size)?;
406 
407         let event_str = format!("{:?}", event);
408         write!(w, r#""{:60}","#, event_str)?;
409 
410         for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
411             write!(w, " T{:02},{}", i, state.letter(),)?;
412 
413             if *queue_size > 0 {
414                 write!(w, ",{:03},", queue_size)?;
415             } else {
416                 write!(w, ",   ,")?;
417             }
418         }
419 
420         write!(w, "\n")?;
421         Ok(())
422     }
423 }
424