]> git.proxmox.com Git - cargo.git/blame - src/cargo/core/compiler/job_queue.rs
Upgrade to Rust 2018
[cargo.git] / src / cargo / core / compiler / job_queue.rs
CommitLineData
d058d24a 1use std::collections::hash_map::HashMap;
a4947c2b 2use std::collections::HashSet;
659f8244 3use std::fmt;
6f45cb61 4use std::io;
cbf25a9b 5use std::mem;
92485f8c 6use std::process::Output;
1e682848 7use std::sync::mpsc::{channel, Receiver, Sender};
72e6b9d3 8use std::sync::Arc;
a6dad622 9
8ea90e96 10use crossbeam_utils;
810a0a1e 11use crossbeam_utils::thread::Scope;
cbf25a9b 12use jobserver::{Acquired, HelperThread};
7c8ea19b 13
04ddd4d0
DW
14use crate::core::profiles::Profile;
15use crate::core::{PackageId, Target, TargetKind};
16use crate::handle_error;
17use crate::util;
18use crate::util::diagnostic_server::{self, DiagnosticPrinter};
19use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
20use crate::util::{Config, DependencyQueue, Dirty, Fresh, Freshness};
21use crate::util::{Progress, ProgressStyle};
7c8ea19b 22
92485f8c 23use super::context::OutputFile;
7c8ea19b 24use super::job::Job;
72e6b9d3 25use super::{BuildContext, BuildPlan, CompileMode, Context, Kind, Unit};
7c8ea19b 26
79768eb0
AC
27/// A management structure of the entire dependency graph to compile.
28///
29/// This structure is backed by the `DependencyQueue` type and manages the
30/// actual compilation step of each package. Packages enqueue units of work and
31/// then later on the entire graph is processed and compiled.
973f6712 32pub struct JobQueue<'a> {
659f8244 33 queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
cbf25a9b
AC
34 tx: Sender<Message<'a>>,
35 rx: Receiver<Message<'a>>,
a753b500 36 active: Vec<Key<'a>>,
659f8244 37 pending: HashMap<Key<'a>, PendingBuild>,
dae87a26
E
38 compiled: HashSet<PackageId>,
39 documented: HashSet<PackageId>,
40 counts: HashMap<PackageId, usize>,
cd955f12 41 is_release: bool,
7c8ea19b
AC
42}
43
79768eb0
AC
44/// A helper structure for metadata about the state of a building package.
45struct PendingBuild {
46 /// Number of jobs currently active
659f8244 47 amt: usize,
79768eb0
AC
48 /// Current freshness state of this package. Any dirty target within a
49 /// package will cause the entire package to become dirty.
50 fresh: Freshness,
51}
52
659f8244
AC
53#[derive(Clone, Copy, Eq, PartialEq, Hash)]
54struct Key<'a> {
dae87a26 55 pkg: PackageId,
659f8244 56 target: &'a Target,
73660740 57 profile: Profile,
659f8244 58 kind: Kind,
575d6e81 59 mode: CompileMode,
79768eb0
AC
60}
61
a40a5c70 62impl<'a> Key<'a> {
63 fn name_for_progress(&self) -> String {
64 let pkg_name = self.pkg.name();
65 match self.mode {
66 CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
67 CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
68 _ => {
69 let annotation = match self.target.kind() {
70 TargetKind::Lib(_) => return pkg_name.to_string(),
71 TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
72 TargetKind::Bin => "bin",
73 TargetKind::Test => "test",
74 TargetKind::Bench => "bench",
75 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
76 };
77 format!("{}({})", self.target.name(), annotation)
78 }
79 }
80 }
81}
82
26690d33 83pub struct JobState<'a> {
cbf25a9b 84 tx: Sender<Message<'a>>,
26690d33
AC
85}
86
cbf25a9b 87enum Message<'a> {
26690d33 88 Run(String),
72e6b9d3 89 BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
26690d33
AC
90 Stdout(String),
91 Stderr(String),
b02ba377 92 FixDiagnostic(diagnostic_server::Message),
cbf25a9b
AC
93 Token(io::Result<Acquired>),
94 Finish(Key<'a>, CargoResult<()>),
26690d33
AC
95}
96
97impl<'a> JobState<'a> {
5aae6f09 98 pub fn running(&self, cmd: &ProcessBuilder) {
cbf25a9b 99 let _ = self.tx.send(Message::Run(cmd.to_string()));
26690d33
AC
100 }
101
72e6b9d3
MS
102 pub fn build_plan(
103 &self,
104 module_name: String,
105 cmd: ProcessBuilder,
106 filenames: Arc<Vec<OutputFile>>,
107 ) {
92485f8c
E
108 let _ = self
109 .tx
72e6b9d3
MS
110 .send(Message::BuildPlanMsg(module_name, cmd, filenames));
111 }
112
d079551e 113 pub fn capture_output(
114 &self,
8798bf0d 115 cmd: &ProcessBuilder,
20920c01 116 prefix: Option<String>,
17b6df9c 117 capture_output: bool,
d079551e 118 ) -> CargoResult<Output> {
92485f8c 119 let prefix = prefix.unwrap_or_else(String::new);
d079551e 120 cmd.exec_with_streaming(
121 &mut |out| {
20920c01 122 let _ = self.tx.send(Message::Stdout(format!("{}{}", prefix, out)));
d079551e 123 Ok(())
124 },
125 &mut |err| {
20920c01 126 let _ = self.tx.send(Message::Stderr(format!("{}{}", prefix, err)));
d079551e 127 Ok(())
128 },
17b6df9c 129 capture_output,
d079551e 130 )
26690d33 131 }
659f8244 132}
7c8ea19b 133
973f6712 134impl<'a> JobQueue<'a> {
c32e395c 135 pub fn new<'cfg>(bcx: &BuildContext<'a, 'cfg>) -> JobQueue<'a> {
7c8ea19b 136 let (tx, rx) = channel();
7c8ea19b 137 JobQueue {
79768eb0 138 queue: DependencyQueue::new(),
0247dc42
E
139 tx,
140 rx,
a753b500 141 active: Vec::new(),
79768eb0 142 pending: HashMap::new(),
925fb508
EY
143 compiled: HashSet::new(),
144 documented: HashSet::new(),
659f8244 145 counts: HashMap::new(),
c32e395c 146 is_release: bcx.build_config.release,
7c8ea19b
AC
147 }
148 }
149
1e682848
AC
150 pub fn enqueue<'cfg>(
151 &mut self,
152 cx: &Context<'a, 'cfg>,
153 unit: &Unit<'a>,
154 job: Job,
155 fresh: Freshness,
156 ) -> CargoResult<()> {
659f8244 157 let key = Key::new(unit);
82655b46 158 let deps = key.dependencies(cx)?;
1e682848 159 self.queue
385b54b3 160 .queue(Fresh, &key, Vec::new(), &deps)
1e682848 161 .push((job, fresh));
659f8244 162 *self.counts.entry(key.pkg).or_insert(0) += 1;
078b6707 163 Ok(())
79768eb0
AC
164 }
165
7c8ea19b
AC
166 /// Execute all jobs necessary to build the dependency graph.
167 ///
168 /// This function will spawn off `config.jobs()` workers to build all of the
169 /// necessary dependencies, in order. Freshness is propagated as far as
170 /// possible along each dependency chain.
72e6b9d3 171 pub fn execute(&mut self, cx: &mut Context, plan: &mut BuildPlan) -> CargoResult<()> {
4f12a2b5 172 let _p = profile::start("executing the job graph");
e54b5f87 173 self.queue.queue_finished();
4f12a2b5 174
cbf25a9b 175 // We need to give a handle to the send half of our message queue to the
b02ba377
AC
176 // jobserver and (optionally) diagnostic helper thread. Unfortunately
177 // though we need the handle to be `'static` as that's typically what's
178 // required when spawning a thread!
cbf25a9b
AC
179 //
180 // To work around this we transmute the `Sender` to a static lifetime.
181 // we're only sending "longer living" messages and we should also
182 // destroy all references to the channel before this function exits as
183 // the destructor for the `helper` object will ensure the associated
43833213 184 // thread is no longer running.
cbf25a9b
AC
185 //
186 // As a result, this `transmute` to a longer lifetime should be safe in
187 // practice.
188 let tx = self.tx.clone();
1e682848 189 let tx = unsafe { mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx) };
b02ba377 190 let tx2 = tx.clone();
92485f8c
E
191 let helper = cx
192 .jobserver
1e682848
AC
193 .clone()
194 .into_helper_thread(move |token| {
195 drop(tx.send(Message::Token(token)));
196 })
197 .chain_err(|| "failed to create helper thread for jobserver management")?;
92485f8c
E
198 let _diagnostic_server = cx
199 .bcx
200 .build_config
b02ba377
AC
201 .rustfix_diagnostic_server
202 .borrow_mut()
203 .take()
92485f8c 204 .map(move |srv| srv.start(move |msg| drop(tx2.send(Message::FixDiagnostic(msg)))));
1e682848 205
92485f8c
E
206 crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper))
207 .expect("child threads should't panic")
659f8244
AC
208 }
209
1e682848
AC
210 fn drain_the_queue(
211 &mut self,
212 cx: &mut Context,
72e6b9d3 213 plan: &mut BuildPlan,
1e682848
AC
214 scope: &Scope<'a>,
215 jobserver_helper: &HelperThread,
216 ) -> CargoResult<()> {
cbf25a9b 217 let mut tokens = Vec::new();
659f8244 218 let mut queue = Vec::new();
72e6b9d3 219 let build_plan = cx.bcx.build_config.build_plan;
fa7a3877 220 let mut print = DiagnosticPrinter::new(cx.bcx.config);
659f8244
AC
221 trace!("queue: {:#?}", self.queue);
222
223 // Iteratively execute the entire dependency graph. Each turn of the
224 // loop starts out by scheduling as much work as possible (up to the
cbf25a9b
AC
225 // maximum number of parallel jobs we have tokens for). A local queue
226 // is maintained separately from the main dependency queue as one
227 // dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
3492a390 228 // in one package).
659f8244
AC
229 //
230 // After a job has finished we update our internal state if it was
231 // successful and otherwise wait for pending work to finish if it failed
232 // and then immediately return.
26690d33 233 let mut error = None;
a753b500 234 let mut progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
a8081a00 235 let total = self.queue.len();
659f8244 236 loop {
cbf25a9b
AC
237 // Dequeue as much work as we can, learning about everything
238 // possible that can run. Note that this is also the point where we
239 // start requesting job tokens. Each job after the first needs to
240 // request a token.
241 while let Some((fresh, key, jobs)) = self.queue.dequeue() {
1e682848
AC
242 let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| f.combine(fresh));
243 self.pending.insert(
244 key,
245 PendingBuild {
246 amt: jobs.len(),
247 fresh: total_fresh,
248 },
249 );
cbf25a9b
AC
250 for (job, f) in jobs {
251 queue.push((key, job, f.combine(fresh)));
3dbe701f 252 if !self.active.is_empty() || !queue.is_empty() {
cbf25a9b
AC
253 jobserver_helper.request_token();
254 }
7c8ea19b
AC
255 }
256 }
cbf25a9b
AC
257
258 // Now that we've learned of all possible work that we can execute
259 // try to spawn it so long as we've got a jobserver token which says
260 // we're able to perform some parallel work.
3dbe701f 261 while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() {
cbf25a9b 262 let (key, job, fresh) = queue.remove(0);
72e6b9d3 263 self.run(key, fresh, job, cx.bcx.config, scope, build_plan)?;
cbf25a9b
AC
264 }
265
266 // If after all that we're not actually running anything then we're
267 // done!
3dbe701f 268 if self.active.is_empty() {
1e682848 269 break;
659f8244 270 }
7c8ea19b 271
cbf25a9b
AC
272 // And finally, before we block waiting for the next event, drop any
273 // excess tokens we may have accidentally acquired. Due to how our
274 // jobserver interface is architected we may acquire a token that we
275 // don't actually use, and if this happens just relinquish it back
276 // to the jobserver itself.
3dbe701f 277 tokens.truncate(self.active.len() - 1);
278
d079551e 279 let count = total - self.queue.len();
92485f8c
E
280 let active_names = self
281 .active
282 .iter()
d079551e 283 .map(Key::name_for_progress)
284 .collect::<Vec<_>>();
285 drop(progress.tick_now(count, total, &format!(": {}", active_names.join(", "))));
3dbe701f 286 let event = self.rx.recv().unwrap();
d079551e 287 progress.clear();
3dbe701f 288
289 match event {
26690d33 290 Message::Run(cmd) => {
c32e395c
DO
291 cx.bcx
292 .config
293 .shell()
294 .verbose(|c| c.status("Running", &cmd))?;
26690d33 295 }
72e6b9d3 296 Message::BuildPlanMsg(module_name, cmd, filenames) => {
385b54b3 297 plan.update(&module_name, &cmd, &filenames)?;
72e6b9d3 298 }
26690d33 299 Message::Stdout(out) => {
d079551e 300 println!("{}", out);
7c8ea19b 301 }
26690d33 302 Message::Stderr(err) => {
641f7ff2 303 let mut shell = cx.bcx.config.shell();
304 shell.print_ansi(err.as_bytes())?;
92485f8c 305 shell.err().write_all(b"\n")?;
26690d33 306 }
b02ba377 307 Message::FixDiagnostic(msg) => {
fa7a3877 308 print.print(&msg)?;
b02ba377 309 }
cbf25a9b 310 Message::Finish(key, result) => {
26690d33 311 info!("end: {:?}", key);
3dbe701f 312
a753b500 313 // self.active.remove_item(&key); // <- switch to this when stabilized.
a8081a00 314 let pos = self
315 .active
316 .iter()
317 .position(|k| *k == key)
318 .expect("an unrecorded package has finished compiling");
319 self.active.remove(pos);
3dbe701f 320 if !self.active.is_empty() {
23591fe5 321 assert!(!tokens.is_empty());
cbf25a9b
AC
322 drop(tokens.pop());
323 }
26690d33 324 match result {
82655b46 325 Ok(()) => self.finish(key, cx)?,
26690d33 326 Err(e) => {
4adae9eb 327 let msg = "The following warnings were emitted during compilation:";
575d6e81 328 self.emit_warnings(Some(msg), &key, cx)?;
4adae9eb 329
3dbe701f 330 if !self.active.is_empty() {
37cffbe0 331 error = Some(format_err!("build failed"));
385b54b3 332 handle_error(&e, &mut *cx.bcx.config.shell());
c32e395c 333 cx.bcx.config.shell().warn(
1e682848
AC
334 "build failed, waiting for other \
335 jobs to finish...",
336 )?;
37cffbe0 337 } else {
a8a944c4 338 error = Some(e);
26690d33
AC
339 }
340 }
7c8ea19b 341 }
7c8ea19b 342 }
cbf25a9b 343 Message::Token(acquired_token) => {
1e682848 344 tokens.push(acquired_token.chain_err(|| "failed to acquire jobserver token")?);
cbf25a9b 345 }
7c8ea19b
AC
346 }
347 }
3dbe701f 348 drop(progress);
7c8ea19b 349
34628b65 350 let build_type = if self.is_release { "release" } else { "dev" };
accc9b28
EH
351 // NOTE: This may be a bit inaccurate, since this may not display the
352 // profile for what was actually built. Profile overrides can change
353 // these settings, and in some cases different targets are built with
354 // different profiles. To be accurate, it would need to collect a
355 // list of Units built, and maybe display a list of the different
356 // profiles used. However, to keep it simple and compatible with old
357 // behavior, we just display what the base profile is.
c32e395c 358 let profile = cx.bcx.profiles.base_profile(self.is_release);
575d6e81 359 let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
1e682848
AC
360 "unoptimized"
361 } else {
362 "optimized"
363 });
6d864a81 364 if profile.debuginfo.is_some() {
23591fe5 365 opt_type += " + debuginfo";
cd955f12 366 }
88d2886f 367
a46df8fe 368 let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
88d2886f 369
8628dbeb 370 if self.queue.is_empty() {
1e682848
AC
371 let message = format!(
372 "{} [{}] target(s) in {}",
373 build_type, opt_type, time_elapsed
374 );
72e6b9d3
MS
375 if !build_plan {
376 cx.bcx.config.shell().status("Finished", message)?;
377 }
659f8244 378 Ok(())
26690d33
AC
379 } else if let Some(e) = error {
380 Err(e)
659f8244
AC
381 } else {
382 debug!("queue: {:#?}", self.queue);
383 Err(internal("finished with jobs still left in the queue"))
384 }
7c8ea19b 385 }
79768eb0 386
659f8244
AC
387 /// Executes a job in the `scope` given, pushing the spawned thread's
388 /// handled onto `threads`.
1e682848
AC
389 fn run(
390 &mut self,
391 key: Key<'a>,
392 fresh: Freshness,
393 job: Job,
394 config: &Config,
395 scope: &Scope<'a>,
72e6b9d3 396 build_plan: bool,
1e682848 397 ) -> CargoResult<()> {
659f8244 398 info!("start: {:?}", key);
79768eb0 399
a753b500 400 self.active.push(key);
dae87a26 401 *self.counts.get_mut(&key.pkg).unwrap() -= 1;
79768eb0 402
659f8244 403 let my_tx = self.tx.clone();
0af2914b 404 let doit = move || {
1e682848 405 let res = job.run(fresh, &JobState { tx: my_tx.clone() });
cbf25a9b 406 my_tx.send(Message::Finish(key, res)).unwrap();
0af2914b 407 };
52f45b53
EH
408
409 if !build_plan {
410 // Print out some nice progress information
411 self.note_working_on(config, &key, fresh)?;
412 }
413
0af2914b
AC
414 match fresh {
415 Freshness::Fresh => doit(),
1e682848 416 Freshness::Dirty => {
5e71ad6c 417 scope.spawn(move |_| doit());
1e682848 418 }
0af2914b 419 }
a0d499e0 420
79768eb0
AC
421 Ok(())
422 }
23d3defd 423
575d6e81 424 fn emit_warnings(&self, msg: Option<&str>, key: &Key<'a>, cx: &mut Context) -> CargoResult<()> {
4adae9eb 425 let output = cx.build_state.outputs.lock().unwrap();
c32e395c 426 let bcx = &mut cx.bcx;
dae87a26 427 if let Some(output) = output.get(&(key.pkg, key.kind)) {
4adae9eb
SB
428 if let Some(msg) = msg {
429 if !output.warnings.is_empty() {
c32e395c 430 writeln!(bcx.config.shell().err(), "{}\n", msg)?;
a30f612e
AC
431 }
432 }
4adae9eb
SB
433
434 for warning in output.warnings.iter() {
c32e395c 435 bcx.config.shell().warn(warning)?;
4adae9eb
SB
436 }
437
438 if !output.warnings.is_empty() && msg.is_some() {
439 // Output an empty line.
385b54b3 440 writeln!(bcx.config.shell().err())?;
4adae9eb 441 }
a30f612e 442 }
4adae9eb
SB
443
444 Ok(())
445 }
446
447 fn finish(&mut self, key: Key<'a>, cx: &mut Context) -> CargoResult<()> {
c32e395c 448 if key.mode.is_run_custom_build() && cx.bcx.show_warnings(key.pkg) {
575d6e81 449 self.emit_warnings(None, &key, cx)?;
4adae9eb
SB
450 }
451
a30f612e
AC
452 let state = self.pending.get_mut(&key).unwrap();
453 state.amt -= 1;
454 if state.amt == 0 {
455 self.queue.finish(&key, state.fresh);
456 }
457 Ok(())
458 }
459
74076d2a 460 // This isn't super trivial because we don't want to print loads and
23d3defd
AC
461 // loads of information to the console, but we also want to produce a
462 // faithful representation of what's happening. This is somewhat nuanced
463 // as a package can start compiling *very* early on because of custom
464 // build commands and such.
465 //
466 // In general, we try to print "Compiling" for the first nontrivial task
467 // run for a package, regardless of when that is. We then don't print
468 // out any more information for a package after we've printed it once.
1e682848
AC
469 fn note_working_on(
470 &mut self,
471 config: &Config,
472 key: &Key<'a>,
473 fresh: Freshness,
474 ) -> CargoResult<()> {
dae87a26
E
475 if (self.compiled.contains(&key.pkg) && !key.mode.is_doc())
476 || (self.documented.contains(&key.pkg) && key.mode.is_doc())
1e682848
AC
477 {
478 return Ok(());
659f8244 479 }
23d3defd
AC
480
481 match fresh {
482 // Any dirty stage which runs at least one command gets printed as
483 // being a compiled package
23d3defd 484 Dirty => {
575d6e81
EH
485 if key.mode.is_doc() {
486 // Skip Doctest
487 if !key.mode.is_any_test() {
fb1736ef
AC
488 self.documented.insert(key.pkg);
489 config.shell().status("Documenting", key.pkg)?;
490 }
925fb508
EY
491 } else {
492 self.compiled.insert(key.pkg);
575d6e81 493 if key.mode.is_check() {
8dd1341f 494 config.shell().status("Checking", key.pkg)?;
495 } else {
496 config.shell().status("Compiling", key.pkg)?;
497 }
925fb508 498 }
23d3defd 499 }
478d1ce2
EH
500 Fresh => {
501 // If doctest is last, only print "Fresh" if nothing has been printed.
dae87a26
E
502 if self.counts[&key.pkg] == 0
503 && !(key.mode == CompileMode::Doctest && self.compiled.contains(&key.pkg))
478d1ce2
EH
504 {
505 self.compiled.insert(key.pkg);
506 config.shell().verbose(|c| c.status("Fresh", key.pkg))?;
507 }
23d3defd 508 }
23d3defd
AC
509 }
510 Ok(())
511 }
7c8ea19b 512}
2874b68e 513
b56b61c5
AC
514impl<'a> Key<'a> {
515 fn new(unit: &Unit<'a>) -> Key<'a> {
516 Key {
517 pkg: unit.pkg.package_id(),
518 target: unit.target,
73660740 519 profile: unit.profile,
b56b61c5 520 kind: unit.kind,
575d6e81 521 mode: unit.mode,
b56b61c5
AC
522 }
523 }
a0d499e0 524
1e682848 525 fn dependencies<'cfg>(&self, cx: &Context<'a, 'cfg>) -> CargoResult<Vec<Key<'a>>> {
659f8244 526 let unit = Unit {
44a7ee7d 527 pkg: cx.get_package(self.pkg)?,
659f8244 528 target: self.target,
73660740 529 profile: self.profile,
659f8244 530 kind: self.kind,
575d6e81 531 mode: self.mode,
659f8244 532 };
1b63fcf7 533 let targets = cx.dep_targets(&unit);
1e682848
AC
534 Ok(targets
535 .iter()
536 .filter_map(|unit| {
537 // Binaries aren't actually needed to *compile* tests, just to run
538 // them, so we don't include this dependency edge in the job graph.
539 if self.target.is_test() && unit.target.is_bin() {
540 None
541 } else {
542 Some(Key::new(unit))
543 }
544 })
545 .collect())
659f8244
AC
546 }
547}
23d3defd 548
659f8244
AC
549impl<'a> fmt::Debug for Key<'a> {
550 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1e682848
AC
551 write!(
552 f,
73660740
EH
553 "{} => {}/{} => {:?}",
554 self.pkg, self.target, self.profile, self.kind
1e682848 555 )
659f8244
AC
556 }
557}