]> git.proxmox.com Git - cargo.git/commitdiff
Remove the `Key` type from `JobQueue`
authorAlex Crichton <alex@alexcrichton.com>
Fri, 19 Apr 2019 17:27:58 +0000 (10:27 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Mon, 22 Apr 2019 20:47:31 +0000 (13:47 -0700)
This isn't actually necessary with a bit of refactoring, and in general
it makes management of `JobQueue` simpler since there's one less type to
worry about. The one main usage of `Key` vs `Unit` was that `Key` was
`Send` to be placed in `Message`, but this was replaced by just
assigning each `Unit` an ever-increasing integer, and then `Finished`
contains the integer rather than the `Key` itself which we can map once
we get back to the main thread.

src/cargo/core/compiler/job_queue.rs

index b5175d2b999ac82c1bde9e2ebc5f06bc6f1bab2d..f7a6a6f7e45fcef6705a241f1b656b143ed65b11 100644 (file)
@@ -1,8 +1,6 @@
-use std::collections::hash_map::HashMap;
-use std::collections::HashSet;
-use std::fmt;
+use std::collections::{HashMap, HashSet};
 use std::io;
-use std::mem;
+use std::marker;
 use std::process::Output;
 use std::sync::mpsc::{channel, Receiver, Sender};
 use std::sync::Arc;
@@ -16,9 +14,8 @@ use super::job::{
     Freshness::{self, Dirty, Fresh},
     Job,
 };
-use super::{BuildContext, BuildPlan, CompileMode, Context, Kind, Unit};
-use crate::core::profiles::Profile;
-use crate::core::{PackageId, Target, TargetKind};
+use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
+use crate::core::{PackageId, TargetKind};
 use crate::handle_error;
 use crate::util;
 use crate::util::diagnostic_server::{self, DiagnosticPrinter};
@@ -32,59 +29,33 @@ use crate::util::{Progress, ProgressStyle};
 /// actual compilation step of each package. Packages enqueue units of work and
 /// then later on the entire graph is processed and compiled.
 pub struct JobQueue<'a, 'cfg> {
-    queue: DependencyQueue<Key<'a>, Job>,
-    tx: Sender<Message<'a>>,
-    rx: Receiver<Message<'a>>,
-    active: Vec<Key<'a>>,
+    queue: DependencyQueue<Unit<'a>, Job>,
+    tx: Sender<Message>,
+    rx: Receiver<Message>,
+    active: HashMap<u32, Unit<'a>>,
     compiled: HashSet<PackageId>,
     documented: HashSet<PackageId>,
     counts: HashMap<PackageId, usize>,
     is_release: bool,
     progress: Progress<'cfg>,
-}
-
-#[derive(Clone, Copy, Eq, PartialEq, Hash)]
-struct Key<'a> {
-    pkg: PackageId,
-    target: &'a Target,
-    profile: Profile,
-    kind: Kind,
-    mode: CompileMode,
-}
-
-impl<'a> Key<'a> {
-    fn name_for_progress(&self) -> String {
-        let pkg_name = self.pkg.name();
-        match self.mode {
-            CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
-            CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
-            _ => {
-                let annotation = match self.target.kind() {
-                    TargetKind::Lib(_) => return pkg_name.to_string(),
-                    TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
-                    TargetKind::Bin => "bin",
-                    TargetKind::Test => "test",
-                    TargetKind::Bench => "bench",
-                    TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
-                };
-                format!("{}({})", self.target.name(), annotation)
-            }
-        }
-    }
+    next_id: u32,
 }
 
 pub struct JobState<'a> {
-    tx: Sender<Message<'a>>,
+    tx: Sender<Message>,
+    // Historical versions of Cargo made use of the `'a` argument here, so to
+    // leave the door open to future refactorings keep it here.
+    _marker: marker::PhantomData<&'a ()>,
 }
 
-enum Message<'a> {
+enum Message {
     Run(String),
     BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
     Stdout(String),
     Stderr(String),
     FixDiagnostic(diagnostic_server::Message),
     Token(io::Result<Acquired>),
-    Finish(Key<'a>, CargoResult<()>),
+    Finish(u32, CargoResult<()>),
 }
 
 impl<'a> JobState<'a> {
@@ -132,12 +103,13 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
             queue: DependencyQueue::new(),
             tx,
             rx,
-            active: Vec::new(),
+            active: HashMap::new(),
             compiled: HashSet::new(),
             documented: HashSet::new(),
             counts: HashMap::new(),
             is_release: bcx.build_config.release,
             progress,
+            next_id: 0,
         }
     }
 
@@ -147,10 +119,18 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         unit: &Unit<'a>,
         job: Job,
     ) -> CargoResult<()> {
-        let key = Key::new(unit);
-        let deps = key.dependencies(cx)?;
-        self.queue.queue(&key, job, &deps);
-        *self.counts.entry(key.pkg).or_insert(0) += 1;
+        let dependencies = cx.dep_targets(unit);
+        let dependencies = dependencies
+            .iter()
+            .filter(|unit| {
+                // Binaries aren't actually needed to *compile* tests, just to run
+                // them, so we don't include this dependency edge in the job graph.
+                !unit.target.is_test() || !unit.target.is_bin()
+            })
+            .cloned()
+            .collect::<Vec<_>>();
+        self.queue.queue(unit, job, &dependencies);
+        *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
         Ok(())
     }
 
@@ -163,22 +143,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         let _p = profile::start("executing the job graph");
         self.queue.queue_finished();
 
-        // We need to give a handle to the send half of our message queue to the
-        // jobserver and (optionally) diagnostic helper thread. Unfortunately
-        // though we need the handle to be `'static` as that's typically what's
-        // required when spawning a thread!
-        //
-        // To work around this we transmute the `Sender` to a static lifetime.
-        // we're only sending "longer living" messages and we should also
-        // destroy all references to the channel before this function exits as
-        // the destructor for the `helper` object will ensure the associated
-        // thread is no longer running.
-        //
-        // As a result, this `transmute` to a longer lifetime should be safe in
-        // practice.
+        // Create a helper thread for acquiring jobserver tokens
         let tx = self.tx.clone();
-        let tx = unsafe { mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx) };
-        let tx2 = tx.clone();
         let helper = cx
             .jobserver
             .clone()
@@ -186,14 +152,24 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
                 drop(tx.send(Message::Token(token)));
             })
             .chain_err(|| "failed to create helper thread for jobserver management")?;
+
+        // Create a helper thread to manage the diagnostics for rustfix if
+        // necessary.
+        let tx = self.tx.clone();
         let _diagnostic_server = cx
             .bcx
             .build_config
             .rustfix_diagnostic_server
             .borrow_mut()
             .take()
-            .map(move |srv| srv.start(move |msg| drop(tx2.send(Message::FixDiagnostic(msg)))));
-
+            .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
+
+        // Use `crossbeam` to create a scope in which we can execute scoped
+        // threads. Note that this isn't currently required by Cargo but it was
+        // historically required. This is left in for now in case we need the
+        // `'a` ability for child threads in the near future, but if this
+        // comment has been sitting here for a long time feel free to refactor
+        // away crossbeam.
         crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper))
             .expect("child threads should't panic")
     }
@@ -227,8 +203,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
             // possible that can run. Note that this is also the point where we
             // start requesting job tokens. Each job after the first needs to
             // request a token.
-            while let Some((key, job)) = self.queue.dequeue() {
-                queue.push((key, job));
+            while let Some((unit, job)) = self.queue.dequeue() {
+                queue.push((unit, job));
                 if self.active.len() + queue.len() > 1 {
                     jobserver_helper.request_token();
                 }
@@ -238,8 +214,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
             // try to spawn it so long as we've got a jobserver token which says
             // we're able to perform some parallel work.
             while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() {
-                let (key, job) = queue.remove(0);
-                self.run(key, job, cx, scope)?;
+                let (unit, job) = queue.remove(0);
+                self.run(&unit, job, cx, scope)?;
             }
 
             // If after all that we're not actually running anything then we're
@@ -288,26 +264,19 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
                     Message::FixDiagnostic(msg) => {
                         print.print(&msg)?;
                     }
-                    Message::Finish(key, result) => {
-                        info!("end: {:?}", key);
-
-                        // FIXME: switch to this when stabilized.
-                        // self.active.remove_item(&key);
-                        let pos = self
-                            .active
-                            .iter()
-                            .position(|k| *k == key)
-                            .expect("an unrecorded package has finished compiling");
-                        self.active.remove(pos);
+                    Message::Finish(id, result) => {
+                        let unit = self.active.remove(&id).unwrap();
+                        info!("end: {:?}", unit);
+
                         if !self.active.is_empty() {
                             assert!(!tokens.is_empty());
                             drop(tokens.pop());
                         }
                         match result {
-                            Ok(()) => self.finish(key, cx)?,
+                            Ok(()) => self.finish(&unit, cx)?,
                             Err(e) => {
                                 let msg = "The following warnings were emitted during compilation:";
-                                self.emit_warnings(Some(msg), &key, cx)?;
+                                self.emit_warnings(Some(msg), &unit, cx)?;
 
                                 if !self.active.is_empty() {
                                     error = Some(failure::format_err!("build failed"));
@@ -373,8 +342,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         let count = total - self.queue.len();
         let active_names = self
             .active
-            .iter()
-            .map(Key::name_for_progress)
+            .values()
+            .map(|u| self.name_for_progress(u))
             .collect::<Vec<_>>();
         drop(
             self.progress
@@ -382,30 +351,54 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         );
     }
 
+    fn name_for_progress(&self, unit: &Unit<'_>) -> String {
+        let pkg_name = unit.pkg.name();
+        match unit.mode {
+            CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
+            CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
+            _ => {
+                let annotation = match unit.target.kind() {
+                    TargetKind::Lib(_) => return pkg_name.to_string(),
+                    TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
+                    TargetKind::Bin => "bin",
+                    TargetKind::Test => "test",
+                    TargetKind::Bench => "bench",
+                    TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
+                };
+                format!("{}({})", unit.target.name(), annotation)
+            }
+        }
+    }
+
     /// Executes a job in the `scope` given, pushing the spawned thread's
     /// handled onto `threads`.
     fn run(
         &mut self,
-        key: Key<'a>,
+        unit: &Unit<'a>,
         job: Job,
         cx: &Context<'_, '_>,
         scope: &Scope<'a>,
     ) -> CargoResult<()> {
-        info!("start: {:?}", key);
+        info!("start: {:?}", unit);
 
-        self.active.push(key);
-        *self.counts.get_mut(&key.pkg).unwrap() -= 1;
+        let id = self.next_id;
+        self.next_id = id.checked_add(1).unwrap();
+        assert!(self.active.insert(id, *unit).is_none());
+        *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
 
         let my_tx = self.tx.clone();
         let fresh = job.freshness();
         let doit = move || {
-            let res = job.run(&JobState { tx: my_tx.clone() });
-            my_tx.send(Message::Finish(key, res)).unwrap();
+            let res = job.run(&JobState {
+                tx: my_tx.clone(),
+                _marker: marker::PhantomData,
+            });
+            my_tx.send(Message::Finish(id, res)).unwrap();
         };
 
         if !cx.bcx.build_config.build_plan {
             // Print out some nice progress information.
-            self.note_working_on(cx.bcx.config, &key, fresh)?;
+            self.note_working_on(cx.bcx.config, unit, fresh)?;
         }
 
         match fresh {
@@ -421,12 +414,12 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
     fn emit_warnings(
         &mut self,
         msg: Option<&str>,
-        key: &Key<'a>,
+        unit: &Unit<'a>,
         cx: &mut Context<'_, '_>,
     ) -> CargoResult<()> {
         let output = cx.build_state.outputs.lock().unwrap();
         let bcx = &mut cx.bcx;
-        if let Some(output) = output.get(&(key.pkg, key.kind)) {
+        if let Some(output) = output.get(&(unit.pkg.package_id(), unit.kind)) {
             if !output.warnings.is_empty() {
                 if let Some(msg) = msg {
                     writeln!(bcx.config.shell().err(), "{}\n", msg)?;
@@ -446,11 +439,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
         Ok(())
     }
 
-    fn finish(&mut self, key: Key<'a>, cx: &mut Context<'_, '_>) -> CargoResult<()> {
-        if key.mode.is_run_custom_build() && cx.bcx.show_warnings(key.pkg) {
-            self.emit_warnings(None, &key, cx)?;
+    fn finish(&mut self, unit: &Unit<'a>, cx: &mut Context<'_, '_>) -> CargoResult<()> {
+        if unit.mode.is_run_custom_build() && cx.bcx.show_warnings(unit.pkg.package_id()) {
+            self.emit_warnings(None, unit, cx)?;
         }
-        self.queue.finish(&key);
+        self.queue.finish(unit);
         Ok(())
     }
 
@@ -466,11 +459,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
     fn note_working_on(
         &mut self,
         config: &Config,
-        key: &Key<'a>,
+        unit: &Unit<'a>,
         fresh: Freshness,
     ) -> CargoResult<()> {
-        if (self.compiled.contains(&key.pkg) && !key.mode.is_doc())
-            || (self.documented.contains(&key.pkg) && key.mode.is_doc())
+        if (self.compiled.contains(&unit.pkg.package_id()) && !unit.mode.is_doc())
+            || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
         {
             return Ok(());
         }
@@ -479,76 +472,32 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
             // Any dirty stage which runs at least one command gets printed as
             // being a compiled package.
             Dirty => {
-                if key.mode.is_doc() {
+                if unit.mode.is_doc() {
                     // Skip doc test.
-                    if !key.mode.is_any_test() {
-                        self.documented.insert(key.pkg);
-                        config.shell().status("Documenting", key.pkg)?;
+                    if !unit.mode.is_any_test() {
+                        self.documented.insert(unit.pkg.package_id());
+                        config.shell().status("Documenting", unit.pkg)?;
                     }
                 } else {
-                    self.compiled.insert(key.pkg);
-                    if key.mode.is_check() {
-                        config.shell().status("Checking", key.pkg)?;
+                    self.compiled.insert(unit.pkg.package_id());
+                    if unit.mode.is_check() {
+                        config.shell().status("Checking", unit.pkg)?;
                     } else {
-                        config.shell().status("Compiling", key.pkg)?;
+                        config.shell().status("Compiling", unit.pkg)?;
                     }
                 }
             }
             Fresh => {
                 // If doc test are last, only print "Fresh" if nothing has been printed.
-                if self.counts[&key.pkg] == 0
-                    && !(key.mode == CompileMode::Doctest && self.compiled.contains(&key.pkg))
+                if self.counts[&unit.pkg.package_id()] == 0
+                    && !(unit.mode == CompileMode::Doctest
+                        && self.compiled.contains(&unit.pkg.package_id()))
                 {
-                    self.compiled.insert(key.pkg);
-                    config.shell().verbose(|c| c.status("Fresh", key.pkg))?;
+                    self.compiled.insert(unit.pkg.package_id());
+                    config.shell().verbose(|c| c.status("Fresh", unit.pkg))?;
                 }
             }
         }
         Ok(())
     }
 }
-
-impl<'a> Key<'a> {
-    fn new(unit: &Unit<'a>) -> Key<'a> {
-        Key {
-            pkg: unit.pkg.package_id(),
-            target: unit.target,
-            profile: unit.profile,
-            kind: unit.kind,
-            mode: unit.mode,
-        }
-    }
-
-    fn dependencies<'cfg>(&self, cx: &Context<'a, 'cfg>) -> CargoResult<Vec<Key<'a>>> {
-        let unit = Unit {
-            pkg: cx.get_package(self.pkg)?,
-            target: self.target,
-            profile: self.profile,
-            kind: self.kind,
-            mode: self.mode,
-        };
-        let targets = cx.dep_targets(&unit);
-        Ok(targets
-            .iter()
-            .filter_map(|unit| {
-                // Binaries aren't actually needed to *compile* tests, just to run
-                // them, so we don't include this dependency edge in the job graph.
-                if self.target.is_test() && unit.target.is_bin() {
-                    None
-                } else {
-                    Some(Key::new(unit))
-                }
-            })
-            .collect())
-    }
-}
-
-impl<'a> fmt::Debug for Key<'a> {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        write!(
-            f,
-            "{} => {}/{} => {:?}",
-            self.pkg, self.target, self.profile, self.kind
-        )
-    }
-}