-use std::collections::hash_map::HashMap;
-use std::collections::HashSet;
-use std::fmt;
+use std::collections::{HashMap, HashSet};
use std::io;
-use std::mem;
+use std::marker;
use std::process::Output;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
Freshness::{self, Dirty, Fresh},
Job,
};
-use super::{BuildContext, BuildPlan, CompileMode, Context, Kind, Unit};
-use crate::core::profiles::Profile;
-use crate::core::{PackageId, Target, TargetKind};
+use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
+use crate::core::{PackageId, TargetKind};
use crate::handle_error;
use crate::util;
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
/// actual compilation step of each package. Packages enqueue units of work and
/// then later on the entire graph is processed and compiled.
pub struct JobQueue<'a, 'cfg> {
- queue: DependencyQueue<Key<'a>, Job>,
- tx: Sender<Message<'a>>,
- rx: Receiver<Message<'a>>,
- active: Vec<Key<'a>>,
+ queue: DependencyQueue<Unit<'a>, Job>,
+ tx: Sender<Message>,
+ rx: Receiver<Message>,
+ active: HashMap<u32, Unit<'a>>,
compiled: HashSet<PackageId>,
documented: HashSet<PackageId>,
counts: HashMap<PackageId, usize>,
is_release: bool,
progress: Progress<'cfg>,
-}
-
-#[derive(Clone, Copy, Eq, PartialEq, Hash)]
-struct Key<'a> {
- pkg: PackageId,
- target: &'a Target,
- profile: Profile,
- kind: Kind,
- mode: CompileMode,
-}
-
-impl<'a> Key<'a> {
- fn name_for_progress(&self) -> String {
- let pkg_name = self.pkg.name();
- match self.mode {
- CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
- CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
- _ => {
- let annotation = match self.target.kind() {
- TargetKind::Lib(_) => return pkg_name.to_string(),
- TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
- TargetKind::Bin => "bin",
- TargetKind::Test => "test",
- TargetKind::Bench => "bench",
- TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
- };
- format!("{}({})", self.target.name(), annotation)
- }
- }
- }
+ next_id: u32,
}
pub struct JobState<'a> {
- tx: Sender<Message<'a>>,
+ tx: Sender<Message>,
+ // Historical versions of Cargo made use of the `'a` argument here, so to
+ // leave the door open to future refactorings keep it here.
+ _marker: marker::PhantomData<&'a ()>,
}
-enum Message<'a> {
+enum Message {
Run(String),
BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
Stdout(String),
Stderr(String),
FixDiagnostic(diagnostic_server::Message),
Token(io::Result<Acquired>),
- Finish(Key<'a>, CargoResult<()>),
+ Finish(u32, CargoResult<()>),
}
impl<'a> JobState<'a> {
queue: DependencyQueue::new(),
tx,
rx,
- active: Vec::new(),
+ active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
counts: HashMap::new(),
is_release: bcx.build_config.release,
progress,
+ next_id: 0,
}
}
unit: &Unit<'a>,
job: Job,
) -> CargoResult<()> {
- let key = Key::new(unit);
- let deps = key.dependencies(cx)?;
- self.queue.queue(&key, job, &deps);
- *self.counts.entry(key.pkg).or_insert(0) += 1;
+ let dependencies = cx.dep_targets(unit);
+ let dependencies = dependencies
+ .iter()
+ .filter(|unit| {
+ // Binaries aren't actually needed to *compile* tests, just to run
+ // them, so we don't include this dependency edge in the job graph.
+ !unit.target.is_test() || !unit.target.is_bin()
+ })
+ .cloned()
+ .collect::<Vec<_>>();
+ self.queue.queue(unit, job, &dependencies);
+ *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
Ok(())
}
let _p = profile::start("executing the job graph");
self.queue.queue_finished();
- // We need to give a handle to the send half of our message queue to the
- // jobserver and (optionally) diagnostic helper thread. Unfortunately
- // though we need the handle to be `'static` as that's typically what's
- // required when spawning a thread!
- //
- // To work around this we transmute the `Sender` to a static lifetime.
- // we're only sending "longer living" messages and we should also
- // destroy all references to the channel before this function exits as
- // the destructor for the `helper` object will ensure the associated
- // thread is no longer running.
- //
- // As a result, this `transmute` to a longer lifetime should be safe in
- // practice.
+ // Create a helper thread for acquiring jobserver tokens
let tx = self.tx.clone();
- let tx = unsafe { mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx) };
- let tx2 = tx.clone();
let helper = cx
.jobserver
.clone()
drop(tx.send(Message::Token(token)));
})
.chain_err(|| "failed to create helper thread for jobserver management")?;
+
+ // Create a helper thread to manage the diagnostics for rustfix if
+ // necessary.
+ let tx = self.tx.clone();
let _diagnostic_server = cx
.bcx
.build_config
.rustfix_diagnostic_server
.borrow_mut()
.take()
- .map(move |srv| srv.start(move |msg| drop(tx2.send(Message::FixDiagnostic(msg)))));
-
+ .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
+
+ // Use `crossbeam` to create a scope in which we can execute scoped
+ // threads. Note that this isn't currently required by Cargo but it was
+ // historically required. This is left in for now in case we need the
+ // `'a` ability for child threads in the near future, but if this
+ // comment has been sitting here for a long time feel free to refactor
+ // away crossbeam.
crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper))
.expect("child threads should't panic")
}
// possible that can run. Note that this is also the point where we
// start requesting job tokens. Each job after the first needs to
// request a token.
- while let Some((key, job)) = self.queue.dequeue() {
- queue.push((key, job));
+ while let Some((unit, job)) = self.queue.dequeue() {
+ queue.push((unit, job));
if self.active.len() + queue.len() > 1 {
jobserver_helper.request_token();
}
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() {
- let (key, job) = queue.remove(0);
- self.run(key, job, cx, scope)?;
+ let (unit, job) = queue.remove(0);
+ self.run(&unit, job, cx, scope)?;
}
// If after all that we're not actually running anything then we're
Message::FixDiagnostic(msg) => {
print.print(&msg)?;
}
- Message::Finish(key, result) => {
- info!("end: {:?}", key);
-
- // FIXME: switch to this when stabilized.
- // self.active.remove_item(&key);
- let pos = self
- .active
- .iter()
- .position(|k| *k == key)
- .expect("an unrecorded package has finished compiling");
- self.active.remove(pos);
+ Message::Finish(id, result) => {
+ let unit = self.active.remove(&id).unwrap();
+ info!("end: {:?}", unit);
+
if !self.active.is_empty() {
assert!(!tokens.is_empty());
drop(tokens.pop());
}
match result {
- Ok(()) => self.finish(key, cx)?,
+ Ok(()) => self.finish(&unit, cx)?,
Err(e) => {
let msg = "The following warnings were emitted during compilation:";
- self.emit_warnings(Some(msg), &key, cx)?;
+ self.emit_warnings(Some(msg), &unit, cx)?;
if !self.active.is_empty() {
error = Some(failure::format_err!("build failed"));
let count = total - self.queue.len();
let active_names = self
.active
- .iter()
- .map(Key::name_for_progress)
+ .values()
+ .map(|u| self.name_for_progress(u))
.collect::<Vec<_>>();
drop(
self.progress
);
}
+ fn name_for_progress(&self, unit: &Unit<'_>) -> String {
+ let pkg_name = unit.pkg.name();
+ match unit.mode {
+ CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
+ CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
+ _ => {
+ let annotation = match unit.target.kind() {
+ TargetKind::Lib(_) => return pkg_name.to_string(),
+ TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
+ TargetKind::Bin => "bin",
+ TargetKind::Test => "test",
+ TargetKind::Bench => "bench",
+ TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
+ };
+ format!("{}({})", unit.target.name(), annotation)
+ }
+ }
+ }
+
/// Executes a job in the `scope` given, pushing the spawned thread's
/// handled onto `threads`.
fn run(
&mut self,
- key: Key<'a>,
+ unit: &Unit<'a>,
job: Job,
cx: &Context<'_, '_>,
scope: &Scope<'a>,
) -> CargoResult<()> {
- info!("start: {:?}", key);
+ info!("start: {:?}", unit);
- self.active.push(key);
- *self.counts.get_mut(&key.pkg).unwrap() -= 1;
+ let id = self.next_id;
+ self.next_id = id.checked_add(1).unwrap();
+ assert!(self.active.insert(id, *unit).is_none());
+ *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
let my_tx = self.tx.clone();
let fresh = job.freshness();
let doit = move || {
- let res = job.run(&JobState { tx: my_tx.clone() });
- my_tx.send(Message::Finish(key, res)).unwrap();
+ let res = job.run(&JobState {
+ tx: my_tx.clone(),
+ _marker: marker::PhantomData,
+ });
+ my_tx.send(Message::Finish(id, res)).unwrap();
};
if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
- self.note_working_on(cx.bcx.config, &key, fresh)?;
+ self.note_working_on(cx.bcx.config, unit, fresh)?;
}
match fresh {
fn emit_warnings(
&mut self,
msg: Option<&str>,
- key: &Key<'a>,
+ unit: &Unit<'a>,
cx: &mut Context<'_, '_>,
) -> CargoResult<()> {
let output = cx.build_state.outputs.lock().unwrap();
let bcx = &mut cx.bcx;
- if let Some(output) = output.get(&(key.pkg, key.kind)) {
+ if let Some(output) = output.get(&(unit.pkg.package_id(), unit.kind)) {
if !output.warnings.is_empty() {
if let Some(msg) = msg {
writeln!(bcx.config.shell().err(), "{}\n", msg)?;
Ok(())
}
- fn finish(&mut self, key: Key<'a>, cx: &mut Context<'_, '_>) -> CargoResult<()> {
- if key.mode.is_run_custom_build() && cx.bcx.show_warnings(key.pkg) {
- self.emit_warnings(None, &key, cx)?;
+ fn finish(&mut self, unit: &Unit<'a>, cx: &mut Context<'_, '_>) -> CargoResult<()> {
+ if unit.mode.is_run_custom_build() && cx.bcx.show_warnings(unit.pkg.package_id()) {
+ self.emit_warnings(None, unit, cx)?;
}
- self.queue.finish(&key);
+ self.queue.finish(unit);
Ok(())
}
fn note_working_on(
&mut self,
config: &Config,
- key: &Key<'a>,
+ unit: &Unit<'a>,
fresh: Freshness,
) -> CargoResult<()> {
- if (self.compiled.contains(&key.pkg) && !key.mode.is_doc())
- || (self.documented.contains(&key.pkg) && key.mode.is_doc())
+ if (self.compiled.contains(&unit.pkg.package_id()) && !unit.mode.is_doc())
+ || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
{
return Ok(());
}
// Any dirty stage which runs at least one command gets printed as
// being a compiled package.
Dirty => {
- if key.mode.is_doc() {
+ if unit.mode.is_doc() {
// Skip doc test.
- if !key.mode.is_any_test() {
- self.documented.insert(key.pkg);
- config.shell().status("Documenting", key.pkg)?;
+ if !unit.mode.is_any_test() {
+ self.documented.insert(unit.pkg.package_id());
+ config.shell().status("Documenting", unit.pkg)?;
}
} else {
- self.compiled.insert(key.pkg);
- if key.mode.is_check() {
- config.shell().status("Checking", key.pkg)?;
+ self.compiled.insert(unit.pkg.package_id());
+ if unit.mode.is_check() {
+ config.shell().status("Checking", unit.pkg)?;
} else {
- config.shell().status("Compiling", key.pkg)?;
+ config.shell().status("Compiling", unit.pkg)?;
}
}
}
Fresh => {
// If doc test are last, only print "Fresh" if nothing has been printed.
- if self.counts[&key.pkg] == 0
- && !(key.mode == CompileMode::Doctest && self.compiled.contains(&key.pkg))
+ if self.counts[&unit.pkg.package_id()] == 0
+ && !(unit.mode == CompileMode::Doctest
+ && self.compiled.contains(&unit.pkg.package_id()))
{
- self.compiled.insert(key.pkg);
- config.shell().verbose(|c| c.status("Fresh", key.pkg))?;
+ self.compiled.insert(unit.pkg.package_id());
+ config.shell().verbose(|c| c.status("Fresh", unit.pkg))?;
}
}
}
Ok(())
}
}
-
-impl<'a> Key<'a> {
- fn new(unit: &Unit<'a>) -> Key<'a> {
- Key {
- pkg: unit.pkg.package_id(),
- target: unit.target,
- profile: unit.profile,
- kind: unit.kind,
- mode: unit.mode,
- }
- }
-
- fn dependencies<'cfg>(&self, cx: &Context<'a, 'cfg>) -> CargoResult<Vec<Key<'a>>> {
- let unit = Unit {
- pkg: cx.get_package(self.pkg)?,
- target: self.target,
- profile: self.profile,
- kind: self.kind,
- mode: self.mode,
- };
- let targets = cx.dep_targets(&unit);
- Ok(targets
- .iter()
- .filter_map(|unit| {
- // Binaries aren't actually needed to *compile* tests, just to run
- // them, so we don't include this dependency edge in the job graph.
- if self.target.is_test() && unit.target.is_bin() {
- None
- } else {
- Some(Key::new(unit))
- }
- })
- .collect())
- }
-}
-
-impl<'a> fmt::Debug for Key<'a> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "{} => {}/{} => {:?}",
- self.pkg, self.target, self.profile, self.kind
- )
- }
-}