]> git.proxmox.com Git - cargo.git/commitdiff
Communicate jobserver information with each rustc
authorMark Rousskov <mark.simulacrum@gmail.com>
Sat, 21 Dec 2019 01:11:10 +0000 (20:11 -0500)
committerMark Rousskov <mark.simulacrum@gmail.com>
Wed, 22 Jan 2020 19:15:47 +0000 (14:15 -0500)
src/cargo/core/compiler/job_queue.rs
src/cargo/core/compiler/mod.rs

index 311fa2d77312ee41037e249eed483e61ee81fb9e..86833116107b523a5dd4fdc2b335858d4e524351 100644 (file)
@@ -91,6 +91,12 @@ enum Message {
     FixDiagnostic(diagnostic_server::Message),
     Token(io::Result<Acquired>),
     Finish(u32, Artifact, CargoResult<()>),
+
+    // This client should get release_raw called on it with one of our tokens
+    NeedsToken(u32, Client),
+
+    // A token previously passed to a NeedsToken client is being released.
+    ReleaseToken(u32),
 }
 
 impl<'a> JobState<'a> {
@@ -134,15 +140,15 @@ impl<'a> JobState<'a> {
     ///
     /// This should arrange for the passed client to eventually get a token via
     /// `client.release_raw()`.
-    pub fn will_acquire(&self, _client: &Client) {
-        // ...
+    pub fn will_acquire(&self, client: &Client) {
+        let _ = self.tx.send(Message::NeedsToken(self.id, client.clone()));
     }
 
     /// The rustc underlying this Job is informing us that it is done with a jobserver token.
     ///
     /// Note that it does *not* write that token back anywhere.
     pub fn release_token(&self) {
-        // ...
+        let _ = self.tx.send(Message::ReleaseToken(self.id));
     }
 }
 
@@ -285,6 +291,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         jobserver_helper: &HelperThread,
     ) -> CargoResult<()> {
         let mut tokens = Vec::new();
+        let mut rustc_tokens = Vec::new();
+        let mut to_send_clients: Vec<(u32, Client)> = Vec::new();
         let mut queue = Vec::new();
         let mut print = DiagnosticPrinter::new(cx.bcx.config);
         trace!("queue: {:#?}", self.queue);
@@ -322,6 +330,13 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
                 self.run(&unit, job, cx, scope)?;
             }
 
+            info!(
+                "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}",
+                tokens.len(),
+                rustc_tokens.len(),
+                to_send_clients.len()
+            );
+
             // If after all that we're not actually running anything then we're
             // done!
             if self.active.is_empty() {
@@ -333,6 +348,16 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
             // jobserver interface is architected we may acquire a token that we
             // don't actually use, and if this happens just relinquish it back
             // to the jobserver itself.
+            let extra_tokens = tokens.len() - (self.active.len() - 1);
+            for _ in 0..extra_tokens {
+                if let Some((id, client)) = to_send_clients.pop() {
+                    let token = tokens.pop().expect("an extra token");
+                    rustc_tokens.push((id, token));
+                    client
+                        .release_raw()
+                        .chain_err(|| "failed to release jobserver token")?;
+                }
+            }
             tokens.truncate(self.active.len() - 1);
 
             // Record some timing information if `-Ztimings` is enabled, and
@@ -389,6 +414,19 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
                             Artifact::All => {
                                 info!("end: {:?}", id);
                                 finished += 1;
+                                while let Some(pos) =
+                                    rustc_tokens.iter().position(|(i, _)| *i == id)
+                                {
+                                    // push all the leftover tokens back into
+                                    // the token list
+                                    tokens.push(rustc_tokens.remove(pos).1);
+                                }
+                                while let Some(pos) =
+                                    to_send_clients.iter().position(|(i, _)| *i == id)
+                                {
+                                    // drain all the pending clients
+                                    to_send_clients.remove(pos);
+                                }
                                 self.active.remove(&id).unwrap()
                             }
                             // ... otherwise if it hasn't finished we leave it
@@ -419,9 +457,34 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
                         }
                     }
                     Message::Token(acquired_token) => {
-                        tokens.push(
-                            acquired_token.chain_err(|| "failed to acquire jobserver token")?,
-                        );
+                        let token =
+                            acquired_token.chain_err(|| "failed to acquire jobserver token")?;
+                        if let Some((id, client)) = to_send_clients.pop() {
+                            rustc_tokens.push((id, token));
+                            client
+                                .release_raw()
+                                .chain_err(|| "failed to release jobserver token")?;
+                        } else {
+                            tokens.push(token);
+                        }
+                    }
+                    Message::NeedsToken(id, client) => {
+                        log::info!("queue token request");
+                        jobserver_helper.request_token();
+                        to_send_clients.push((id, client));
+                    }
+                    Message::ReleaseToken(id) => {
+                        // Note that this pops off potentially a completely
+                        // different token, but all tokens of the same job are
+                        // conceptually the same so that's fine.
+                        if let Some(pos) = rustc_tokens.iter().position(|(i, _)| *i == id) {
+                            tokens.push(rustc_tokens.remove(pos).1);
+                        } else {
+                            panic!(
+                                "This job (id={}) does not have tokens associated with it",
+                                id
+                            );
+                        }
                     }
                 }
             }
index 66f170a6f20f961ce7a7ca3023f4bf747a45a790..2b9dc64ffd529f346cb1a90177fbd647796167e3 100644 (file)
@@ -1240,7 +1240,7 @@ fn on_stderr_line_inner(
     if let Ok(JobserverNotification { jobserver_event }) =
         serde_json::from_str::<JobserverNotification>(compiler_message.get())
     {
-        log::trace!(
+        log::info!(
             "found jobserver directive from rustc: `{:?}`",
             jobserver_event
         );