3 //! To use in a debug build, set the env var `RAYON_LOG` as
4 //! described below. In a release build, logs are compiled out by
5 //! default unless Rayon is built with `--cfg rayon_rs_log` (try
6 //! `RUSTFLAGS="--cfg rayon_rs_log"`).
8 //! Note that logs are an internally debugging tool and their format
9 //! is considered unstable, as are the details of how to enable them.
11 //! # Valid values for RAYON_LOG
13 //! The `RAYON_LOG` variable can take on the following values:
15 //! * `tail:<file>` -- dumps the last 10,000 events into the given file;
16 //! useful for tracking down deadlocks
17 //! * `profile:<file>` -- dumps only those events needed to reconstruct how
18 //! many workers are active at a given time
19 //! * `all:<file>` -- dumps every event to the file; useful for debugging
21 use crossbeam_channel
::{self, Receiver, Sender}
;
22 use std
::collections
::VecDeque
;
25 use std
::io
::{self, BufWriter, Write}
;
27 /// True if logs are compiled in.
28 pub(super) const LOG_ENABLED
: bool
= cfg
!(any(rayon_rs_log
, debug_assertions
));
30 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
31 pub(super) enum Event
{
32 /// Flushes events to disk, used to terminate benchmarking.
35 /// Indicates that a worker thread started execution.
38 terminate_addr
: usize,
41 /// Indicates that a worker thread started execution.
42 ThreadTerminate { worker: usize }
,
44 /// Indicates that a worker thread became idle, blocked on `latch_addr`.
45 ThreadIdle { worker: usize, latch_addr: usize }
,
47 /// Indicates that an idle worker thread found work to do, after
48 /// yield rounds. It should no longer be considered idle.
49 ThreadFoundWork { worker: usize, yields: u32 }
,
51 /// Indicates that a worker blocked on a latch observed that it was set.
53 /// Internal debugging event that does not affect the state
55 ThreadSawLatchSet { worker: usize, latch_addr: usize }
,
57 /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
58 /// sleep state that we saw at the time.
59 ThreadSleepy { worker: usize, jobs_counter: usize }
,
61 /// Indicates that the thread's attempt to fall asleep was
62 /// interrupted because the latch was set. (This is not, in and of
63 /// itself, a change to the thread state.)
64 ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }
,
66 /// Indicates that the thread's attempt to fall asleep was
67 /// interrupted because a job was posted. (This is not, in and of
68 /// itself, a change to the thread state.)
69 ThreadSleepInterruptedByJob { worker: usize }
,
71 /// Indicates that an idle worker has gone to sleep.
72 ThreadSleeping { worker: usize, latch_addr: usize }
,
74 /// Indicates that a sleeping worker has awoken.
75 ThreadAwoken { worker: usize, latch_addr: usize }
,
77 /// Indicates that the given worker thread was notified it should
79 ThreadNotify { worker: usize }
,
81 /// The given worker has pushed a job to its local deque.
82 JobPushed { worker: usize }
,
84 /// The given worker has popped a job from its local deque.
85 JobPopped { worker: usize }
,
87 /// The given worker has stolen a job from the deque of another.
88 JobStolen { worker: usize, victim: usize }
,
90 /// N jobs were injected into the global queue.
91 JobsInjected { count: usize }
,
93 /// A job was removed from the global queue.
94 JobUninjected { worker: usize }
,
96 /// When announcing a job, this was the value of the counters we observed.
98 /// No effect on thread state, just a debugging event.
106 /// Handle to the logging thread, if any. You can use this to deliver
107 /// logs. You can also clone it freely.
109 pub(super) struct Logger
{
110 sender
: Option
<Sender
<Event
>>,
114 pub(super) fn new(num_workers
: usize) -> Logger
{
116 return Self::disabled();
119 // see the doc comment for the format
120 let env_log
= match env
::var("RAYON_LOG") {
122 Err(_
) => return Self::disabled(),
125 let (sender
, receiver
) = crossbeam_channel
::unbounded();
127 if env_log
.starts_with("tail:") {
128 let filename
= env_log
["tail:".len()..].to_string();
129 ::std
::thread
::spawn(move || {
130 Self::tail_logger_thread(num_workers
, filename
, 10_000, receiver
)
132 } else if env_log
== "all" {
133 ::std
::thread
::spawn(move || Self::all_logger_thread(num_workers
, receiver
));
134 } else if env_log
.starts_with("profile:") {
135 let filename
= env_log
["profile:".len()..].to_string();
136 ::std
::thread
::spawn(move || {
137 Self::profile_logger_thread(num_workers
, filename
, 10_000, receiver
)
140 panic
!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
144 sender
: Some(sender
),
148 fn disabled() -> Logger
{
149 Logger { sender: None }
153 pub(super) fn log(&self, event
: impl FnOnce() -> Event
) {
158 if let Some(sender
) = &self.sender
{
159 sender
.send(event()).unwrap();
163 fn profile_logger_thread(
165 log_filename
: String
,
167 receiver
: Receiver
<Event
>,
169 let file
= File
::create(&log_filename
)
170 .unwrap_or_else(|err
| panic
!("failed to open `{}`: {}", log_filename
, err
));
172 let mut writer
= BufWriter
::new(file
);
173 let mut events
= Vec
::with_capacity(capacity
);
174 let mut state
= SimulatorState
::new(num_workers
);
175 let timeout
= std
::time
::Duration
::from_secs(30);
179 match receiver
.recv_timeout(timeout
) {
181 if let Event
::Flush
= event
{
191 if events
.len() == capacity
{
196 for event
in events
.drain(..) {
197 if state
.simulate(&event
) {
198 state
.dump(&mut writer
, &event
).unwrap();
202 writer
.flush().unwrap();
206 fn tail_logger_thread(
208 log_filename
: String
,
210 receiver
: Receiver
<Event
>,
212 let file
= File
::create(&log_filename
)
213 .unwrap_or_else(|err
| panic
!("failed to open `{}`: {}", log_filename
, err
));
215 let mut writer
= BufWriter
::new(file
);
216 let mut events
: VecDeque
<Event
> = VecDeque
::with_capacity(capacity
);
217 let mut state
= SimulatorState
::new(num_workers
);
218 let timeout
= std
::time
::Duration
::from_secs(30);
219 let mut skipped
= false;
223 match receiver
.recv_timeout(timeout
) {
225 if let Event
::Flush
= event
{
226 // We ignore Flush events in tail mode --
227 // we're really just looking for
231 if events
.len() == capacity
{
232 let event
= events
.pop_front().unwrap();
233 state
.simulate(&event
);
237 events
.push_back(event
);
246 write
!(writer
, "...\n").unwrap();
250 for event
in events
.drain(..) {
251 // In tail mode, we dump *all* events out, whether or
252 // not they were 'interesting' to the state machine.
253 state
.simulate(&event
);
254 state
.dump(&mut writer
, &event
).unwrap();
257 writer
.flush().unwrap();
261 fn all_logger_thread(num_workers
: usize, receiver
: Receiver
<Event
>) {
262 let stderr
= std
::io
::stderr();
263 let mut state
= SimulatorState
::new(num_workers
);
265 for event
in receiver
{
266 let mut writer
= BufWriter
::new(stderr
.lock());
267 state
.simulate(&event
);
268 state
.dump(&mut writer
, &event
).unwrap();
269 writer
.flush().unwrap();
274 #[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
284 fn letter(&self) -> char {
286 State
::Working
=> 'W'
,
288 State
::Notified
=> 'N'
,
289 State
::Sleeping
=> 'S'
,
290 State
::Terminated
=> 'T'
,
295 struct SimulatorState
{
296 local_queue_size
: Vec
<usize>,
297 thread_states
: Vec
<State
>,
298 injector_size
: usize,
301 impl SimulatorState
{
302 fn new(num_workers
: usize) -> Self {
304 local_queue_size
: (0..num_workers
).map(|_
| 0).collect(),
305 thread_states
: (0..num_workers
).map(|_
| State
::Working
).collect(),
310 fn simulate(&mut self, event
: &Event
) -> bool
{
312 Event
::ThreadIdle { worker, .. }
=> {
313 assert_eq
!(self.thread_states
[worker
], State
::Working
);
314 self.thread_states
[worker
] = State
::Idle
;
318 Event
::ThreadStart { worker, .. }
| Event
::ThreadFoundWork { worker, .. }
=> {
319 self.thread_states
[worker
] = State
::Working
;
323 Event
::ThreadTerminate { worker, .. }
=> {
324 self.thread_states
[worker
] = State
::Terminated
;
328 Event
::ThreadSleeping { worker, .. }
=> {
329 assert_eq
!(self.thread_states
[worker
], State
::Idle
);
330 self.thread_states
[worker
] = State
::Sleeping
;
334 Event
::ThreadAwoken { worker, .. }
=> {
335 assert_eq
!(self.thread_states
[worker
], State
::Notified
);
336 self.thread_states
[worker
] = State
::Idle
;
340 Event
::JobPushed { worker }
=> {
341 self.local_queue_size
[worker
] += 1;
345 Event
::JobPopped { worker }
=> {
346 self.local_queue_size
[worker
] -= 1;
350 Event
::JobStolen { victim, .. }
=> {
351 self.local_queue_size
[victim
] -= 1;
355 Event
::JobsInjected { count }
=> {
356 self.injector_size
+= count
;
360 Event
::JobUninjected { .. }
=> {
361 self.injector_size
-= 1;
365 Event
::ThreadNotify { worker }
=> {
366 // Currently, this log event occurs while holding the
367 // thread lock, so we should *always* see it before
368 // the worker awakens.
369 assert_eq
!(self.thread_states
[worker
], State
::Sleeping
);
370 self.thread_states
[worker
] = State
::Notified
;
374 // remaining events are no-ops from pov of simulating the
380 fn dump(&mut self, w
: &mut impl Write
, event
: &Event
) -> io
::Result
<()> {
381 let num_idle_threads
= self
384 .filter(|s
| **s
== State
::Idle
)
387 let num_sleeping_threads
= self
390 .filter(|s
| **s
== State
::Sleeping
)
393 let num_notified_threads
= self
396 .filter(|s
| **s
== State
::Notified
)
399 let num_pending_jobs
: usize = self.local_queue_size
.iter().sum();
401 write
!(w
, "{:2},", num_idle_threads
)?
;
402 write
!(w
, "{:2},", num_sleeping_threads
)?
;
403 write
!(w
, "{:2},", num_notified_threads
)?
;
404 write
!(w
, "{:4},", num_pending_jobs
)?
;
405 write
!(w
, "{:4},", self.injector_size
)?
;
407 let event_str
= format
!("{:?}", event
);
408 write
!(w
, r
#""{:60}","#, event_str)?;
410 for ((i
, state
), queue_size
) in (0..).zip(&self.thread_states
).zip(&self.local_queue_size
) {
411 write
!(w
, " T{:02},{}", i
, state
.letter(),)?
;
414 write
!(w
, ",{:03},", queue_size
)?
;