FixDiagnostic(diagnostic_server::Message),
Token(io::Result<Acquired>),
Finish(u32, Artifact, CargoResult<()>),
+
+ // This client should get release_raw called on it with one of our tokens
+ NeedsToken(u32, Client),
+
+ // A token previously passed to a NeedsToken client is being released.
+ ReleaseToken(u32),
}
impl<'a> JobState<'a> {
///
/// This should arrange for the passed client to eventually get a token via
/// `client.release_raw()`.
- pub fn will_acquire(&self, _client: &Client) {
- // ...
+ pub fn will_acquire(&self, client: &Client) {
+ let _ = self.tx.send(Message::NeedsToken(self.id, client.clone()));
}
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
- // ...
+ let _ = self.tx.send(Message::ReleaseToken(self.id));
}
}
jobserver_helper: &HelperThread,
) -> CargoResult<()> {
let mut tokens = Vec::new();
+ let mut rustc_tokens = Vec::new();
+ let mut to_send_clients: Vec<(u32, Client)> = Vec::new();
let mut queue = Vec::new();
let mut print = DiagnosticPrinter::new(cx.bcx.config);
trace!("queue: {:#?}", self.queue);
self.run(&unit, job, cx, scope)?;
}
+ info!(
+ "tokens: {}, rustc_tokens: {}, waiting_rustcs: {}",
+ tokens.len(),
+ rustc_tokens.len(),
+ to_send_clients.len()
+ );
+
// If after all that we're not actually running anything then we're
// done!
if self.active.is_empty() {
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
+ let extra_tokens = tokens.len() - (self.active.len() - 1);
+ for _ in 0..extra_tokens {
+ if let Some((id, client)) = to_send_clients.pop() {
+ let token = tokens.pop().expect("an extra token");
+ rustc_tokens.push((id, token));
+ client
+ .release_raw()
+ .chain_err(|| "failed to release jobserver token")?;
+ }
+ }
tokens.truncate(self.active.len() - 1);
// Record some timing information if `-Ztimings` is enabled, and
Artifact::All => {
info!("end: {:?}", id);
finished += 1;
+ while let Some(pos) =
+ rustc_tokens.iter().position(|(i, _)| *i == id)
+ {
+ // push all the leftover tokens back into
+ // the token list
+ tokens.push(rustc_tokens.remove(pos).1);
+ }
+ while let Some(pos) =
+ to_send_clients.iter().position(|(i, _)| *i == id)
+ {
+ // drain all the pending clients
+ to_send_clients.remove(pos);
+ }
self.active.remove(&id).unwrap()
}
// ... otherwise if it hasn't finished we leave it
}
}
Message::Token(acquired_token) => {
- tokens.push(
- acquired_token.chain_err(|| "failed to acquire jobserver token")?,
- );
+ let token =
+ acquired_token.chain_err(|| "failed to acquire jobserver token")?;
+ if let Some((id, client)) = to_send_clients.pop() {
+ rustc_tokens.push((id, token));
+ client
+ .release_raw()
+ .chain_err(|| "failed to release jobserver token")?;
+ } else {
+ tokens.push(token);
+ }
+ }
+ Message::NeedsToken(id, client) => {
+ log::info!("queue token request");
+ jobserver_helper.request_token();
+ to_send_clients.push((id, client));
+ }
+ Message::ReleaseToken(id) => {
+ // Note that this pops off potentially a completely
+ // different token, but all tokens of the same job are
+ // conceptually the same so that's fine.
+ if let Some(pos) = rustc_tokens.iter().position(|(i, _)| *i == id) {
+ tokens.push(rustc_tokens.remove(pos).1);
+ } else {
+ panic!(
+ "This job (id={}) does not have tokens associated with it",
+ id
+ );
+ }
}
}
}