///
/// It is created from JobQueue when we have fully assembled the crate graph
/// (i.e., all package dependencies are known).
+///
+/// # Message queue
+///
+/// Each thread running a process uses the message queue to send messages back
+/// to the main thread. The main thread coordinates everything, and handles
+/// printing output.
+///
+/// It is important to be careful which messages use `push` vs `push_bounded`.
+/// `push` is for priority messages (like tokens, or "finished") where the
+/// sender shouldn't block. We want to handle those so real work can proceed
+/// ASAP.
+///
+/// `push_bounded` is only for messages being printed to stdout/stderr. Being
+/// bounded prevents a flood of messages causing a large amount of memory
+/// being used.
+///
+/// `push` also avoids blocking which helps avoid deadlocks. For example, when
+/// the diagnostic server thread is dropped, it waits for the thread to exit.
+/// But if the thread is blocked on a full queue, and there is a critical
+/// error, the drop will deadlock. This should be fixed at some point in the
+/// future. The jobserver thread has a similar problem, though it will time
+/// out after 1 second.
struct DrainState<'a, 'cfg> {
// This is the length of the DependencyQueue when starting out
total_units: usize,
}
pub fn stdout(&self, stdout: String) {
- self.messages.push(Message::Stdout(stdout));
+ self.messages.push_bounded(Message::Stdout(stdout));
}
pub fn stderr(&self, stderr: String) {
- self.messages.push(Message::Stderr(stderr));
+ self.messages.push_bounded(Message::Stderr(stderr));
}
/// A method used to signal to the coordinator thread that the rmeta file
let state = DrainState {
total_units: self.queue.len(),
queue: self.queue,
- messages: Arc::new(Queue::new()),
+ // 100 here is somewhat arbitrary. It is a few screenfulls of
+ // output, and hopefully at most a few megabytes of memory for
+ // typical messages.
+ messages: Arc::new(Queue::new(100)),
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
// Create a helper thread to manage the diagnostics for rustfix if
// necessary.
let messages = state.messages.clone();
+ // It is important that this uses `push` instead of `push_bounded` for
+ // now. If someone wants to fix this to be bounded, the `drop`
+ // implementation needs to be changed to avoid possible deadlocks.
let _diagnostic_server = cx
.bcx
.build_config
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
- let mut events = Vec::new();
- while let Some(event) = self.messages.try_pop() {
- events.push(event);
- }
+ let mut events = self.messages.try_pop_all();
info!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
};
match fresh {
- Freshness::Fresh => {
- self.timings.add_fresh();
- doit();
- }
- Freshness::Dirty => {
- self.timings.add_dirty();
- scope.spawn(move |_| doit());
- }
+ Freshness::Fresh => self.timings.add_fresh(),
+ Freshness::Dirty => self.timings.add_dirty(),
}
+ scope.spawn(move |_| doit());
Ok(())
}
/// A simple, threadsafe, queue of items of type `T`
///
/// This is a sort of channel where any thread can push to a queue and any
-/// thread can pop from a queue. Currently queues have infinite capacity where
-/// `push` will never block but `pop` will block.
+/// thread can pop from a queue.
+///
+/// This supports both bounded and unbounded operations. `push` will never block,
+/// and allows the queue to grow without bounds. `push_bounded` will block if the
+/// queue is over capacity, and will resume once there is enough capacity.
pub struct Queue<T> {
state: Mutex<State<T>>,
- condvar: Condvar,
+ popper_cv: Condvar,
+ bounded_cv: Condvar,
+ bound: usize,
}
struct State<T> {
}
impl<T> Queue<T> {
- pub fn new() -> Queue<T> {
+ pub fn new(bound: usize) -> Queue<T> {
Queue {
state: Mutex::new(State {
items: VecDeque::new(),
}),
- condvar: Condvar::new(),
+ popper_cv: Condvar::new(),
+ bounded_cv: Condvar::new(),
+ bound,
}
}
pub fn push(&self, item: T) {
self.state.lock().unwrap().items.push_back(item);
- self.condvar.notify_one();
+ self.popper_cv.notify_one();
+ }
+
+ /// Pushes an item onto the queue, blocking if the queue is full.
+ pub fn push_bounded(&self, item: T) {
+ let mut state = self.state.lock().unwrap();
+ loop {
+ if state.items.len() >= self.bound {
+ state = self.bounded_cv.wait(state).unwrap();
+ } else {
+ state.items.push_back(item);
+ self.popper_cv.notify_one();
+ break;
+ }
+ }
}
pub fn pop(&self, timeout: Duration) -> Option<T> {
if elapsed >= timeout {
break;
}
- let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap();
+ let (lock, result) = self
+ .popper_cv
+ .wait_timeout(state, timeout - elapsed)
+ .unwrap();
state = lock;
if result.timed_out() {
break;
}
}
- state.items.pop_front()
+ let value = state.items.pop_front()?;
+ if state.items.len() < self.bound {
+ // Assumes threads cannot be canceled.
+ self.bounded_cv.notify_one();
+ }
+ Some(value)
}
- pub fn try_pop(&self) -> Option<T> {
- self.state.lock().unwrap().items.pop_front()
+ pub fn try_pop_all(&self) -> Vec<T> {
+ let mut state = self.state.lock().unwrap();
+ let result = state.items.drain(..).collect();
+ self.bounded_cv.notify_all();
+ result
}
}