]> git.proxmox.com Git - cargo.git/commitdiff
Avoid buffering large amounts of rustc output.
authorEric Huss <eric@huss.org>
Sun, 8 Mar 2020 03:58:37 +0000 (19:58 -0800)
committerEric Huss <eric@huss.org>
Sun, 8 Mar 2020 03:58:37 +0000 (19:58 -0800)
src/cargo/core/compiler/job_queue.rs
src/cargo/util/queue.rs

index 0b8855eea52c0a1051fef6c43520b1784f34bedd..57e6b835b6bcfe90398547664dd59953509b8d75 100644 (file)
@@ -93,6 +93,28 @@ pub struct JobQueue<'a, 'cfg> {
 ///
 /// It is created from JobQueue when we have fully assembled the crate graph
 /// (i.e., all package dependencies are known).
+///
+/// # Message queue
+///
+/// Each thread running a process uses the message queue to send messages back
+/// to the main thread. The main thread coordinates everything, and handles
+/// printing output.
+///
+/// It is important to be careful which messages use `push` vs `push_bounded`.
+/// `push` is for priority messages (like tokens, or "finished") where the
+/// sender shouldn't block. We want to handle those so real work can proceed
+/// ASAP.
+///
+/// `push_bounded` is only for messages being printed to stdout/stderr. Being
+/// bounded prevents a flood of messages causing a large amount of memory
+/// being used.
+///
+/// `push` also avoids blocking which helps avoid deadlocks. For example, when
+/// the diagnostic server thread is dropped, it waits for the thread to exit.
+/// But if the thread is blocked on a full queue, and there is a critical
+/// error, the drop will deadlock. This should be fixed at some point in the
+/// future. The jobserver thread has a similar problem, though it will time
+/// out after 1 second.
 struct DrainState<'a, 'cfg> {
     // This is the length of the DependencyQueue when starting out
     total_units: usize,
@@ -212,11 +234,11 @@ impl<'a> JobState<'a> {
     }
 
     pub fn stdout(&self, stdout: String) {
-        self.messages.push(Message::Stdout(stdout));
+        self.messages.push_bounded(Message::Stdout(stdout));
     }
 
     pub fn stderr(&self, stderr: String) {
-        self.messages.push(Message::Stderr(stderr));
+        self.messages.push_bounded(Message::Stderr(stderr));
     }
 
     /// A method used to signal to the coordinator thread that the rmeta file
@@ -341,7 +363,10 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         let state = DrainState {
             total_units: self.queue.len(),
             queue: self.queue,
-            messages: Arc::new(Queue::new()),
+            // 100 here is somewhat arbitrary. It is a few screenfulls of
+            // output, and hopefully at most a few megabytes of memory for
+            // typical messages.
+            messages: Arc::new(Queue::new(100)),
             active: HashMap::new(),
             compiled: HashSet::new(),
             documented: HashSet::new(),
@@ -370,6 +395,9 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         // Create a helper thread to manage the diagnostics for rustfix if
         // necessary.
         let messages = state.messages.clone();
+        // It is important that this uses `push` instead of `push_bounded` for
+        // now. If someone wants to fix this to be bounded, the `drop`
+        // implementation needs to be changed to avoid possible deadlocks.
         let _diagnostic_server = cx
             .bcx
             .build_config
@@ -578,10 +606,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
         // to run above to calculate CPU usage over time. To do this we
         // listen for a message with a timeout, and on timeout we run the
         // previous parts of the loop again.
-        let mut events = Vec::new();
-        while let Some(event) = self.messages.try_pop() {
-            events.push(event);
-        }
+        let mut events = self.messages.try_pop_all();
         info!(
             "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
             self.tokens.len(),
@@ -815,15 +840,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
         };
 
         match fresh {
-            Freshness::Fresh => {
-                self.timings.add_fresh();
-                doit();
-            }
-            Freshness::Dirty => {
-                self.timings.add_dirty();
-                scope.spawn(move |_| doit());
-            }
+            Freshness::Fresh => self.timings.add_fresh(),
+            Freshness::Dirty => self.timings.add_dirty(),
         }
+        scope.spawn(move |_| doit());
 
         Ok(())
     }
index d9aefcc3b1c35c60102d1706407b157e1bb3a870..9adf1b88afe9854718d351f09fd78dd29a36be0e 100644 (file)
@@ -5,11 +5,16 @@ use std::time::{Duration, Instant};
 /// A simple, threadsafe, queue of items of type `T`
 ///
 /// This is a sort of channel where any thread can push to a queue and any
-/// thread can pop from a queue. Currently queues have infinite capacity where
-/// `push` will never block but `pop` will block.
+/// thread can pop from a queue.
+///
+/// This supports both bounded and unbounded operations. `push` will never block,
+/// and allows the queue to grow without bounds. `push_bounded` will block if the
+/// queue is over capacity, and will resume once there is enough capacity.
 pub struct Queue<T> {
     state: Mutex<State<T>>,
-    condvar: Condvar,
+    popper_cv: Condvar,
+    bounded_cv: Condvar,
+    bound: usize,
 }
 
 struct State<T> {
@@ -17,18 +22,34 @@ struct State<T> {
 }
 
 impl<T> Queue<T> {
-    pub fn new() -> Queue<T> {
+    pub fn new(bound: usize) -> Queue<T> {
         Queue {
             state: Mutex::new(State {
                 items: VecDeque::new(),
             }),
-            condvar: Condvar::new(),
+            popper_cv: Condvar::new(),
+            bounded_cv: Condvar::new(),
+            bound,
         }
     }
 
     pub fn push(&self, item: T) {
         self.state.lock().unwrap().items.push_back(item);
-        self.condvar.notify_one();
+        self.popper_cv.notify_one();
+    }
+
+    /// Pushes an item onto the queue, blocking if the queue is full.
+    pub fn push_bounded(&self, item: T) {
+        let mut state = self.state.lock().unwrap();
+        loop {
+            if state.items.len() >= self.bound {
+                state = self.bounded_cv.wait(state).unwrap();
+            } else {
+                state.items.push_back(item);
+                self.popper_cv.notify_one();
+                break;
+            }
+        }
     }
 
     pub fn pop(&self, timeout: Duration) -> Option<T> {
@@ -39,16 +60,27 @@ impl<T> Queue<T> {
             if elapsed >= timeout {
                 break;
             }
-            let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap();
+            let (lock, result) = self
+                .popper_cv
+                .wait_timeout(state, timeout - elapsed)
+                .unwrap();
             state = lock;
             if result.timed_out() {
                 break;
             }
         }
-        state.items.pop_front()
+        let value = state.items.pop_front()?;
+        if state.items.len() < self.bound {
+            // Assumes threads cannot be canceled.
+            self.bounded_cv.notify_one();
+        }
+        Some(value)
     }
 
-    pub fn try_pop(&self) -> Option<T> {
-        self.state.lock().unwrap().items.pop_front()
+    pub fn try_pop_all(&self) -> Vec<T> {
+        let mut state = self.state.lock().unwrap();
+        let result = state.items.drain(..).collect();
+        self.bounded_cv.notify_all();
+        result
     }
 }