]> git.proxmox.com Git - rustc.git/blob - vendor/rustc-rayon-core/src/log.rs
New upstream version 1.63.0+dfsg1
[rustc.git] / vendor / rustc-rayon-core / src / log.rs
1 //! Debug Logging
2 //!
3 //! To use in a debug build, set the env var `RAYON_LOG` as
4 //! described below. In a release build, logs are compiled out by
5 //! default unless Rayon is built with `--cfg rayon_rs_log` (try
6 //! `RUSTFLAGS="--cfg rayon_rs_log"`).
7 //!
8 //! Note that logs are an internally debugging tool and their format
9 //! is considered unstable, as are the details of how to enable them.
10 //!
11 //! # Valid values for RAYON_LOG
12 //!
13 //! The `RAYON_LOG` variable can take on the following values:
14 //!
15 //! * `tail:<file>` -- dumps the last 10,000 events into the given file;
16 //! useful for tracking down deadlocks
17 //! * `profile:<file>` -- dumps only those events needed to reconstruct how
18 //! many workers are active at a given time
19 //! * `all:<file>` -- dumps every event to the file; useful for debugging
20
21 use crossbeam_channel::{self, Receiver, Sender};
22 use std::collections::VecDeque;
23 use std::env;
24 use std::fs::File;
25 use std::io::{self, BufWriter, Write};
26
27 /// True if logs are compiled in.
28 pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
29
30 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
31 pub(super) enum Event {
32 /// Flushes events to disk, used to terminate benchmarking.
33 Flush,
34
35 /// Indicates that a worker thread started execution.
36 ThreadStart {
37 worker: usize,
38 terminate_addr: usize,
39 },
40
41 /// Indicates that a worker thread started execution.
42 ThreadTerminate { worker: usize },
43
44 /// Indicates that a worker thread became idle, blocked on `latch_addr`.
45 ThreadIdle { worker: usize, latch_addr: usize },
46
47 /// Indicates that an idle worker thread found work to do, after
48 /// yield rounds. It should no longer be considered idle.
49 ThreadFoundWork { worker: usize, yields: u32 },
50
51 /// Indicates that a worker blocked on a latch observed that it was set.
52 ///
53 /// Internal debugging event that does not affect the state
54 /// machine.
55 ThreadSawLatchSet { worker: usize, latch_addr: usize },
56
57 /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
58 /// sleep state that we saw at the time.
59 ThreadSleepy { worker: usize, jobs_counter: usize },
60
61 /// Indicates that the thread's attempt to fall asleep was
62 /// interrupted because the latch was set. (This is not, in and of
63 /// itself, a change to the thread state.)
64 ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
65
66 /// Indicates that the thread's attempt to fall asleep was
67 /// interrupted because a job was posted. (This is not, in and of
68 /// itself, a change to the thread state.)
69 ThreadSleepInterruptedByJob { worker: usize },
70
71 /// Indicates that an idle worker has gone to sleep.
72 ThreadSleeping { worker: usize, latch_addr: usize },
73
74 /// Indicates that a sleeping worker has awoken.
75 ThreadAwoken { worker: usize, latch_addr: usize },
76
77 /// Indicates that the given worker thread was notified it should
78 /// awaken.
79 ThreadNotify { worker: usize },
80
81 /// The given worker has pushed a job to its local deque.
82 JobPushed { worker: usize },
83
84 /// The given worker has popped a job from its local deque.
85 JobPopped { worker: usize },
86
87 /// The given worker has stolen a job from the deque of another.
88 JobStolen { worker: usize, victim: usize },
89
90 /// N jobs were injected into the global queue.
91 JobsInjected { count: usize },
92
93 /// A job was removed from the global queue.
94 JobUninjected { worker: usize },
95
96 /// When announcing a job, this was the value of the counters we observed.
97 ///
98 /// No effect on thread state, just a debugging event.
99 JobThreadCounts {
100 worker: usize,
101 num_idle: u16,
102 num_sleepers: u16,
103 },
104 }
105
106 /// Handle to the logging thread, if any. You can use this to deliver
107 /// logs. You can also clone it freely.
108 #[derive(Clone)]
109 pub(super) struct Logger {
110 sender: Option<Sender<Event>>,
111 }
112
113 impl Logger {
114 pub(super) fn new(num_workers: usize) -> Logger {
115 if !LOG_ENABLED {
116 return Self::disabled();
117 }
118
119 // see the doc comment for the format
120 let env_log = match env::var("RAYON_LOG") {
121 Ok(s) => s,
122 Err(_) => return Self::disabled(),
123 };
124
125 let (sender, receiver) = crossbeam_channel::unbounded();
126
127 if env_log.starts_with("tail:") {
128 let filename = env_log["tail:".len()..].to_string();
129 ::std::thread::spawn(move || {
130 Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
131 });
132 } else if env_log == "all" {
133 ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
134 } else if env_log.starts_with("profile:") {
135 let filename = env_log["profile:".len()..].to_string();
136 ::std::thread::spawn(move || {
137 Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
138 });
139 } else {
140 panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
141 }
142
143 return Logger {
144 sender: Some(sender),
145 };
146 }
147
148 fn disabled() -> Logger {
149 Logger { sender: None }
150 }
151
152 #[inline]
153 pub(super) fn log(&self, event: impl FnOnce() -> Event) {
154 if !LOG_ENABLED {
155 return;
156 }
157
158 if let Some(sender) = &self.sender {
159 sender.send(event()).unwrap();
160 }
161 }
162
163 fn profile_logger_thread(
164 num_workers: usize,
165 log_filename: String,
166 capacity: usize,
167 receiver: Receiver<Event>,
168 ) {
169 let file = File::create(&log_filename)
170 .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
171
172 let mut writer = BufWriter::new(file);
173 let mut events = Vec::with_capacity(capacity);
174 let mut state = SimulatorState::new(num_workers);
175 let timeout = std::time::Duration::from_secs(30);
176
177 loop {
178 loop {
179 match receiver.recv_timeout(timeout) {
180 Ok(event) => {
181 if let Event::Flush = event {
182 break;
183 } else {
184 events.push(event);
185 }
186 }
187
188 Err(_) => break,
189 }
190
191 if events.len() == capacity {
192 break;
193 }
194 }
195
196 for event in events.drain(..) {
197 if state.simulate(&event) {
198 state.dump(&mut writer, &event).unwrap();
199 }
200 }
201
202 writer.flush().unwrap();
203 }
204 }
205
206 fn tail_logger_thread(
207 num_workers: usize,
208 log_filename: String,
209 capacity: usize,
210 receiver: Receiver<Event>,
211 ) {
212 let file = File::create(&log_filename)
213 .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
214
215 let mut writer = BufWriter::new(file);
216 let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
217 let mut state = SimulatorState::new(num_workers);
218 let timeout = std::time::Duration::from_secs(30);
219 let mut skipped = false;
220
221 loop {
222 loop {
223 match receiver.recv_timeout(timeout) {
224 Ok(event) => {
225 if let Event::Flush = event {
226 // We ignore Flush events in tail mode --
227 // we're really just looking for
228 // deadlocks.
229 continue;
230 } else {
231 if events.len() == capacity {
232 let event = events.pop_front().unwrap();
233 state.simulate(&event);
234 skipped = true;
235 }
236
237 events.push_back(event);
238 }
239 }
240
241 Err(_) => break,
242 }
243 }
244
245 if skipped {
246 write!(writer, "...\n").unwrap();
247 skipped = false;
248 }
249
250 for event in events.drain(..) {
251 // In tail mode, we dump *all* events out, whether or
252 // not they were 'interesting' to the state machine.
253 state.simulate(&event);
254 state.dump(&mut writer, &event).unwrap();
255 }
256
257 writer.flush().unwrap();
258 }
259 }
260
261 fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
262 let stderr = std::io::stderr();
263 let mut state = SimulatorState::new(num_workers);
264
265 for event in receiver {
266 let mut writer = BufWriter::new(stderr.lock());
267 state.simulate(&event);
268 state.dump(&mut writer, &event).unwrap();
269 writer.flush().unwrap();
270 }
271 }
272 }
273
274 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
275 enum State {
276 Working,
277 Idle,
278 Notified,
279 Sleeping,
280 Terminated,
281 }
282
283 impl State {
284 fn letter(&self) -> char {
285 match self {
286 State::Working => 'W',
287 State::Idle => 'I',
288 State::Notified => 'N',
289 State::Sleeping => 'S',
290 State::Terminated => 'T',
291 }
292 }
293 }
294
295 struct SimulatorState {
296 local_queue_size: Vec<usize>,
297 thread_states: Vec<State>,
298 injector_size: usize,
299 }
300
301 impl SimulatorState {
302 fn new(num_workers: usize) -> Self {
303 Self {
304 local_queue_size: (0..num_workers).map(|_| 0).collect(),
305 thread_states: (0..num_workers).map(|_| State::Working).collect(),
306 injector_size: 0,
307 }
308 }
309
310 fn simulate(&mut self, event: &Event) -> bool {
311 match *event {
312 Event::ThreadIdle { worker, .. } => {
313 assert_eq!(self.thread_states[worker], State::Working);
314 self.thread_states[worker] = State::Idle;
315 true
316 }
317
318 Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
319 self.thread_states[worker] = State::Working;
320 true
321 }
322
323 Event::ThreadTerminate { worker, .. } => {
324 self.thread_states[worker] = State::Terminated;
325 true
326 }
327
328 Event::ThreadSleeping { worker, .. } => {
329 assert_eq!(self.thread_states[worker], State::Idle);
330 self.thread_states[worker] = State::Sleeping;
331 true
332 }
333
334 Event::ThreadAwoken { worker, .. } => {
335 assert_eq!(self.thread_states[worker], State::Notified);
336 self.thread_states[worker] = State::Idle;
337 true
338 }
339
340 Event::JobPushed { worker } => {
341 self.local_queue_size[worker] += 1;
342 true
343 }
344
345 Event::JobPopped { worker } => {
346 self.local_queue_size[worker] -= 1;
347 true
348 }
349
350 Event::JobStolen { victim, .. } => {
351 self.local_queue_size[victim] -= 1;
352 true
353 }
354
355 Event::JobsInjected { count } => {
356 self.injector_size += count;
357 true
358 }
359
360 Event::JobUninjected { .. } => {
361 self.injector_size -= 1;
362 true
363 }
364
365 Event::ThreadNotify { worker } => {
366 // Currently, this log event occurs while holding the
367 // thread lock, so we should *always* see it before
368 // the worker awakens.
369 assert_eq!(self.thread_states[worker], State::Sleeping);
370 self.thread_states[worker] = State::Notified;
371 true
372 }
373
374 // remaining events are no-ops from pov of simulating the
375 // thread state
376 _ => false,
377 }
378 }
379
380 fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
381 let num_idle_threads = self
382 .thread_states
383 .iter()
384 .filter(|s| **s == State::Idle)
385 .count();
386
387 let num_sleeping_threads = self
388 .thread_states
389 .iter()
390 .filter(|s| **s == State::Sleeping)
391 .count();
392
393 let num_notified_threads = self
394 .thread_states
395 .iter()
396 .filter(|s| **s == State::Notified)
397 .count();
398
399 let num_pending_jobs: usize = self.local_queue_size.iter().sum();
400
401 write!(w, "{:2},", num_idle_threads)?;
402 write!(w, "{:2},", num_sleeping_threads)?;
403 write!(w, "{:2},", num_notified_threads)?;
404 write!(w, "{:4},", num_pending_jobs)?;
405 write!(w, "{:4},", self.injector_size)?;
406
407 let event_str = format!("{:?}", event);
408 write!(w, r#""{:60}","#, event_str)?;
409
410 for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
411 write!(w, " T{:02},{}", i, state.letter(),)?;
412
413 if *queue_size > 0 {
414 write!(w, ",{:03},", queue_size)?;
415 } else {
416 write!(w, ", ,")?;
417 }
418 }
419
420 write!(w, "\n")?;
421 Ok(())
422 }
423 }