]> git.proxmox.com Git - cargo.git/blob - src/cargo/core/compiler/job_queue.rs
Reformat after idiom lints
[cargo.git] / src / cargo / core / compiler / job_queue.rs
1 use std::collections::hash_map::HashMap;
2 use std::collections::HashSet;
3 use std::fmt;
4 use std::io;
5 use std::mem;
6 use std::process::Output;
7 use std::sync::mpsc::{channel, Receiver, Sender};
8 use std::sync::Arc;
9
10 use crossbeam_utils::thread::Scope;
11 use jobserver::{Acquired, HelperThread};
12 use log::{debug, info, trace};
13
14 use crate::core::profiles::Profile;
15 use crate::core::{PackageId, Target, TargetKind};
16 use crate::handle_error;
17 use crate::util;
18 use crate::util::diagnostic_server::{self, DiagnosticPrinter};
19 use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
20 use crate::util::{Config, DependencyQueue, Dirty, Fresh, Freshness};
21 use crate::util::{Progress, ProgressStyle};
22
23 use super::context::OutputFile;
24 use super::job::Job;
25 use super::{BuildContext, BuildPlan, CompileMode, Context, Kind, Unit};
26
27 /// A management structure of the entire dependency graph to compile.
28 ///
29 /// This structure is backed by the `DependencyQueue` type and manages the
30 /// actual compilation step of each package. Packages enqueue units of work and
31 /// then later on the entire graph is processed and compiled.
32 pub struct JobQueue<'a> {
33 queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
34 tx: Sender<Message<'a>>,
35 rx: Receiver<Message<'a>>,
36 active: Vec<Key<'a>>,
37 pending: HashMap<Key<'a>, PendingBuild>,
38 compiled: HashSet<PackageId>,
39 documented: HashSet<PackageId>,
40 counts: HashMap<PackageId, usize>,
41 is_release: bool,
42 }
43
44 /// A helper structure for metadata about the state of a building package.
45 struct PendingBuild {
46 /// Number of jobs currently active
47 amt: usize,
48 /// Current freshness state of this package. Any dirty target within a
49 /// package will cause the entire package to become dirty.
50 fresh: Freshness,
51 }
52
53 #[derive(Clone, Copy, Eq, PartialEq, Hash)]
54 struct Key<'a> {
55 pkg: PackageId,
56 target: &'a Target,
57 profile: Profile,
58 kind: Kind,
59 mode: CompileMode,
60 }
61
62 impl<'a> Key<'a> {
63 fn name_for_progress(&self) -> String {
64 let pkg_name = self.pkg.name();
65 match self.mode {
66 CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
67 CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
68 _ => {
69 let annotation = match self.target.kind() {
70 TargetKind::Lib(_) => return pkg_name.to_string(),
71 TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
72 TargetKind::Bin => "bin",
73 TargetKind::Test => "test",
74 TargetKind::Bench => "bench",
75 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
76 };
77 format!("{}({})", self.target.name(), annotation)
78 }
79 }
80 }
81 }
82
83 pub struct JobState<'a> {
84 tx: Sender<Message<'a>>,
85 }
86
87 enum Message<'a> {
88 Run(String),
89 BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
90 Stdout(String),
91 Stderr(String),
92 FixDiagnostic(diagnostic_server::Message),
93 Token(io::Result<Acquired>),
94 Finish(Key<'a>, CargoResult<()>),
95 }
96
97 impl<'a> JobState<'a> {
98 pub fn running(&self, cmd: &ProcessBuilder) {
99 let _ = self.tx.send(Message::Run(cmd.to_string()));
100 }
101
102 pub fn build_plan(
103 &self,
104 module_name: String,
105 cmd: ProcessBuilder,
106 filenames: Arc<Vec<OutputFile>>,
107 ) {
108 let _ = self
109 .tx
110 .send(Message::BuildPlanMsg(module_name, cmd, filenames));
111 }
112
113 pub fn capture_output(
114 &self,
115 cmd: &ProcessBuilder,
116 prefix: Option<String>,
117 capture_output: bool,
118 ) -> CargoResult<Output> {
119 let prefix = prefix.unwrap_or_else(String::new);
120 cmd.exec_with_streaming(
121 &mut |out| {
122 let _ = self.tx.send(Message::Stdout(format!("{}{}", prefix, out)));
123 Ok(())
124 },
125 &mut |err| {
126 let _ = self.tx.send(Message::Stderr(format!("{}{}", prefix, err)));
127 Ok(())
128 },
129 capture_output,
130 )
131 }
132 }
133
134 impl<'a> JobQueue<'a> {
135 pub fn new<'cfg>(bcx: &BuildContext<'a, 'cfg>) -> JobQueue<'a> {
136 let (tx, rx) = channel();
137 JobQueue {
138 queue: DependencyQueue::new(),
139 tx,
140 rx,
141 active: Vec::new(),
142 pending: HashMap::new(),
143 compiled: HashSet::new(),
144 documented: HashSet::new(),
145 counts: HashMap::new(),
146 is_release: bcx.build_config.release,
147 }
148 }
149
150 pub fn enqueue<'cfg>(
151 &mut self,
152 cx: &Context<'a, 'cfg>,
153 unit: &Unit<'a>,
154 job: Job,
155 fresh: Freshness,
156 ) -> CargoResult<()> {
157 let key = Key::new(unit);
158 let deps = key.dependencies(cx)?;
159 self.queue
160 .queue(Fresh, &key, Vec::new(), &deps)
161 .push((job, fresh));
162 *self.counts.entry(key.pkg).or_insert(0) += 1;
163 Ok(())
164 }
165
166 /// Execute all jobs necessary to build the dependency graph.
167 ///
168 /// This function will spawn off `config.jobs()` workers to build all of the
169 /// necessary dependencies, in order. Freshness is propagated as far as
170 /// possible along each dependency chain.
171 pub fn execute(&mut self, cx: &mut Context<'_, '_>, plan: &mut BuildPlan) -> CargoResult<()> {
172 let _p = profile::start("executing the job graph");
173 self.queue.queue_finished();
174
175 // We need to give a handle to the send half of our message queue to the
176 // jobserver and (optionally) diagnostic helper thread. Unfortunately
177 // though we need the handle to be `'static` as that's typically what's
178 // required when spawning a thread!
179 //
180 // To work around this we transmute the `Sender` to a static lifetime.
181 // we're only sending "longer living" messages and we should also
182 // destroy all references to the channel before this function exits as
183 // the destructor for the `helper` object will ensure the associated
184 // thread is no longer running.
185 //
186 // As a result, this `transmute` to a longer lifetime should be safe in
187 // practice.
188 let tx = self.tx.clone();
189 let tx = unsafe { mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx) };
190 let tx2 = tx.clone();
191 let helper = cx
192 .jobserver
193 .clone()
194 .into_helper_thread(move |token| {
195 drop(tx.send(Message::Token(token)));
196 })
197 .chain_err(|| "failed to create helper thread for jobserver management")?;
198 let _diagnostic_server = cx
199 .bcx
200 .build_config
201 .rustfix_diagnostic_server
202 .borrow_mut()
203 .take()
204 .map(move |srv| srv.start(move |msg| drop(tx2.send(Message::FixDiagnostic(msg)))));
205
206 crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper))
207 .expect("child threads should't panic")
208 }
209
210 fn drain_the_queue(
211 &mut self,
212 cx: &mut Context<'_, '_>,
213 plan: &mut BuildPlan,
214 scope: &Scope<'a>,
215 jobserver_helper: &HelperThread,
216 ) -> CargoResult<()> {
217 let mut tokens = Vec::new();
218 let mut queue = Vec::new();
219 let build_plan = cx.bcx.build_config.build_plan;
220 let mut print = DiagnosticPrinter::new(cx.bcx.config);
221 trace!("queue: {:#?}", self.queue);
222
223 // Iteratively execute the entire dependency graph. Each turn of the
224 // loop starts out by scheduling as much work as possible (up to the
225 // maximum number of parallel jobs we have tokens for). A local queue
226 // is maintained separately from the main dependency queue as one
227 // dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
228 // in one package).
229 //
230 // After a job has finished we update our internal state if it was
231 // successful and otherwise wait for pending work to finish if it failed
232 // and then immediately return.
233 let mut error = None;
234 let mut progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
235 let total = self.queue.len();
236 loop {
237 // Dequeue as much work as we can, learning about everything
238 // possible that can run. Note that this is also the point where we
239 // start requesting job tokens. Each job after the first needs to
240 // request a token.
241 while let Some((fresh, key, jobs)) = self.queue.dequeue() {
242 let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| f.combine(fresh));
243 self.pending.insert(
244 key,
245 PendingBuild {
246 amt: jobs.len(),
247 fresh: total_fresh,
248 },
249 );
250 for (job, f) in jobs {
251 queue.push((key, job, f.combine(fresh)));
252 if !self.active.is_empty() || !queue.is_empty() {
253 jobserver_helper.request_token();
254 }
255 }
256 }
257
258 // Now that we've learned of all possible work that we can execute
259 // try to spawn it so long as we've got a jobserver token which says
260 // we're able to perform some parallel work.
261 while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() {
262 let (key, job, fresh) = queue.remove(0);
263 self.run(key, fresh, job, cx.bcx.config, scope, build_plan)?;
264 }
265
266 // If after all that we're not actually running anything then we're
267 // done!
268 if self.active.is_empty() {
269 break;
270 }
271
272 // And finally, before we block waiting for the next event, drop any
273 // excess tokens we may have accidentally acquired. Due to how our
274 // jobserver interface is architected we may acquire a token that we
275 // don't actually use, and if this happens just relinquish it back
276 // to the jobserver itself.
277 tokens.truncate(self.active.len() - 1);
278
279 let count = total - self.queue.len();
280 let active_names = self
281 .active
282 .iter()
283 .map(Key::name_for_progress)
284 .collect::<Vec<_>>();
285 drop(progress.tick_now(count, total, &format!(": {}", active_names.join(", "))));
286 let event = self.rx.recv().unwrap();
287 progress.clear();
288
289 match event {
290 Message::Run(cmd) => {
291 cx.bcx
292 .config
293 .shell()
294 .verbose(|c| c.status("Running", &cmd))?;
295 }
296 Message::BuildPlanMsg(module_name, cmd, filenames) => {
297 plan.update(&module_name, &cmd, &filenames)?;
298 }
299 Message::Stdout(out) => {
300 println!("{}", out);
301 }
302 Message::Stderr(err) => {
303 let mut shell = cx.bcx.config.shell();
304 shell.print_ansi(err.as_bytes())?;
305 shell.err().write_all(b"\n")?;
306 }
307 Message::FixDiagnostic(msg) => {
308 print.print(&msg)?;
309 }
310 Message::Finish(key, result) => {
311 info!("end: {:?}", key);
312
313 // self.active.remove_item(&key); // <- switch to this when stabilized.
314 let pos = self
315 .active
316 .iter()
317 .position(|k| *k == key)
318 .expect("an unrecorded package has finished compiling");
319 self.active.remove(pos);
320 if !self.active.is_empty() {
321 assert!(!tokens.is_empty());
322 drop(tokens.pop());
323 }
324 match result {
325 Ok(()) => self.finish(key, cx)?,
326 Err(e) => {
327 let msg = "The following warnings were emitted during compilation:";
328 self.emit_warnings(Some(msg), &key, cx)?;
329
330 if !self.active.is_empty() {
331 error = Some(format_err!("build failed"));
332 handle_error(&e, &mut *cx.bcx.config.shell());
333 cx.bcx.config.shell().warn(
334 "build failed, waiting for other \
335 jobs to finish...",
336 )?;
337 } else {
338 error = Some(e);
339 }
340 }
341 }
342 }
343 Message::Token(acquired_token) => {
344 tokens.push(acquired_token.chain_err(|| "failed to acquire jobserver token")?);
345 }
346 }
347 }
348 drop(progress);
349
350 let build_type = if self.is_release { "release" } else { "dev" };
351 // NOTE: This may be a bit inaccurate, since this may not display the
352 // profile for what was actually built. Profile overrides can change
353 // these settings, and in some cases different targets are built with
354 // different profiles. To be accurate, it would need to collect a
355 // list of Units built, and maybe display a list of the different
356 // profiles used. However, to keep it simple and compatible with old
357 // behavior, we just display what the base profile is.
358 let profile = cx.bcx.profiles.base_profile(self.is_release);
359 let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
360 "unoptimized"
361 } else {
362 "optimized"
363 });
364 if profile.debuginfo.is_some() {
365 opt_type += " + debuginfo";
366 }
367
368 let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
369
370 if self.queue.is_empty() {
371 let message = format!(
372 "{} [{}] target(s) in {}",
373 build_type, opt_type, time_elapsed
374 );
375 if !build_plan {
376 cx.bcx.config.shell().status("Finished", message)?;
377 }
378 Ok(())
379 } else if let Some(e) = error {
380 Err(e)
381 } else {
382 debug!("queue: {:#?}", self.queue);
383 Err(internal("finished with jobs still left in the queue"))
384 }
385 }
386
387 /// Executes a job in the `scope` given, pushing the spawned thread's
388 /// handled onto `threads`.
389 fn run(
390 &mut self,
391 key: Key<'a>,
392 fresh: Freshness,
393 job: Job,
394 config: &Config,
395 scope: &Scope<'a>,
396 build_plan: bool,
397 ) -> CargoResult<()> {
398 info!("start: {:?}", key);
399
400 self.active.push(key);
401 *self.counts.get_mut(&key.pkg).unwrap() -= 1;
402
403 let my_tx = self.tx.clone();
404 let doit = move || {
405 let res = job.run(fresh, &JobState { tx: my_tx.clone() });
406 my_tx.send(Message::Finish(key, res)).unwrap();
407 };
408
409 if !build_plan {
410 // Print out some nice progress information
411 self.note_working_on(config, &key, fresh)?;
412 }
413
414 match fresh {
415 Freshness::Fresh => doit(),
416 Freshness::Dirty => {
417 scope.spawn(move |_| doit());
418 }
419 }
420
421 Ok(())
422 }
423
424 fn emit_warnings(
425 &self,
426 msg: Option<&str>,
427 key: &Key<'a>,
428 cx: &mut Context<'_, '_>,
429 ) -> CargoResult<()> {
430 let output = cx.build_state.outputs.lock().unwrap();
431 let bcx = &mut cx.bcx;
432 if let Some(output) = output.get(&(key.pkg, key.kind)) {
433 if let Some(msg) = msg {
434 if !output.warnings.is_empty() {
435 writeln!(bcx.config.shell().err(), "{}\n", msg)?;
436 }
437 }
438
439 for warning in output.warnings.iter() {
440 bcx.config.shell().warn(warning)?;
441 }
442
443 if !output.warnings.is_empty() && msg.is_some() {
444 // Output an empty line.
445 writeln!(bcx.config.shell().err())?;
446 }
447 }
448
449 Ok(())
450 }
451
452 fn finish(&mut self, key: Key<'a>, cx: &mut Context<'_, '_>) -> CargoResult<()> {
453 if key.mode.is_run_custom_build() && cx.bcx.show_warnings(key.pkg) {
454 self.emit_warnings(None, &key, cx)?;
455 }
456
457 let state = self.pending.get_mut(&key).unwrap();
458 state.amt -= 1;
459 if state.amt == 0 {
460 self.queue.finish(&key, state.fresh);
461 }
462 Ok(())
463 }
464
465 // This isn't super trivial because we don't want to print loads and
466 // loads of information to the console, but we also want to produce a
467 // faithful representation of what's happening. This is somewhat nuanced
468 // as a package can start compiling *very* early on because of custom
469 // build commands and such.
470 //
471 // In general, we try to print "Compiling" for the first nontrivial task
472 // run for a package, regardless of when that is. We then don't print
473 // out any more information for a package after we've printed it once.
474 fn note_working_on(
475 &mut self,
476 config: &Config,
477 key: &Key<'a>,
478 fresh: Freshness,
479 ) -> CargoResult<()> {
480 if (self.compiled.contains(&key.pkg) && !key.mode.is_doc())
481 || (self.documented.contains(&key.pkg) && key.mode.is_doc())
482 {
483 return Ok(());
484 }
485
486 match fresh {
487 // Any dirty stage which runs at least one command gets printed as
488 // being a compiled package
489 Dirty => {
490 if key.mode.is_doc() {
491 // Skip Doctest
492 if !key.mode.is_any_test() {
493 self.documented.insert(key.pkg);
494 config.shell().status("Documenting", key.pkg)?;
495 }
496 } else {
497 self.compiled.insert(key.pkg);
498 if key.mode.is_check() {
499 config.shell().status("Checking", key.pkg)?;
500 } else {
501 config.shell().status("Compiling", key.pkg)?;
502 }
503 }
504 }
505 Fresh => {
506 // If doctest is last, only print "Fresh" if nothing has been printed.
507 if self.counts[&key.pkg] == 0
508 && !(key.mode == CompileMode::Doctest && self.compiled.contains(&key.pkg))
509 {
510 self.compiled.insert(key.pkg);
511 config.shell().verbose(|c| c.status("Fresh", key.pkg))?;
512 }
513 }
514 }
515 Ok(())
516 }
517 }
518
519 impl<'a> Key<'a> {
520 fn new(unit: &Unit<'a>) -> Key<'a> {
521 Key {
522 pkg: unit.pkg.package_id(),
523 target: unit.target,
524 profile: unit.profile,
525 kind: unit.kind,
526 mode: unit.mode,
527 }
528 }
529
530 fn dependencies<'cfg>(&self, cx: &Context<'a, 'cfg>) -> CargoResult<Vec<Key<'a>>> {
531 let unit = Unit {
532 pkg: cx.get_package(self.pkg)?,
533 target: self.target,
534 profile: self.profile,
535 kind: self.kind,
536 mode: self.mode,
537 };
538 let targets = cx.dep_targets(&unit);
539 Ok(targets
540 .iter()
541 .filter_map(|unit| {
542 // Binaries aren't actually needed to *compile* tests, just to run
543 // them, so we don't include this dependency edge in the job graph.
544 if self.target.is_test() && unit.target.is_bin() {
545 None
546 } else {
547 Some(Key::new(unit))
548 }
549 })
550 .collect())
551 }
552 }
553
554 impl<'a> fmt::Debug for Key<'a> {
555 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556 write!(
557 f,
558 "{} => {}/{} => {:?}",
559 self.pkg, self.target, self.profile, self.kind
560 )
561 }
562 }