]> git.proxmox.com Git - cargo.git/blob - src/cargo/core/compiler/job_queue.rs
Fix BuildScriptOutput when a build script is run multiple times.
[cargo.git] / src / cargo / core / compiler / job_queue.rs
1 //! This module implements the job queue which determines the ordering in which
2 //! rustc is spawned off. It also manages the allocation of jobserver tokens to
3 //! rustc beyond the implicit token each rustc owns (i.e., the ones used for
4 //! parallel LLVM work and parallel rustc threads).
5 //!
6 //! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
7 //! other, which is due to scaling issues with sharing a single jobserver
8 //! amongst what is potentially hundreds of threads of work on many-cored
9 //! systems on (at least) linux, and likely other platforms as well.
10 //!
11 //! The details of this algorithm are (also) written out in
12 //! src/librustc_jobserver/lib.rs. What follows is a description focusing on the
13 //! Cargo side of things.
14 //!
15 //! Cargo wants to complete the build as quickly as possible, fully saturating
16 //! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
17 //! more than N threads of work: the total amount of tokens we have floating
18 //! around must always be limited to N.
19 //!
20 //! It is not really possible to optimally choose which crate should build first
21 //! or last; nor is it possible to decide whether to give an additional token to
22 //! rustc first or rather spawn a new crate of work. For now, the algorithm we
23 //! implement prioritizes spawning as many crates (i.e., rustc processes) as
24 //! possible, and then filling each rustc with tokens on demand.
25 //!
26 //! The primary loop is in `drain_the_queue` below.
27 //!
28 //! We integrate with the jobserver, originating from GNU make, to make sure
29 //! that build scripts which use make to build C code can cooperate with us on
30 //! the number of used tokens and avoid overfilling the system we're on.
31 //!
32 //! The jobserver is unfortunately a very simple protocol, so we enhance it a
33 //! little when we know that there is a rustc on the other end. Via the stderr
34 //! pipe we have to rustc, we get messages such as "NeedsToken" and
35 //! "ReleaseToken" from rustc.
36 //!
37 //! "NeedsToken" indicates that a rustc is interested in acquiring a token, but
38 //! never that it would be impossible to make progress without one (i.e., it
39 //! would be incorrect for rustc to not terminate due to a unfulfilled
40 //! NeedsToken request); we do not usually fulfill all NeedsToken requests for a
41 //! given rustc.
42 //!
43 //! "ReleaseToken" indicates that a rustc is done with one of its tokens and is
44 //! ready for us to re-acquire ownership -- we will either release that token
45 //! back into the general pool or reuse it ourselves. Note that rustc will
46 //! inform us that it is releasing a token even if it itself is also requesting
47 //! tokens; is is up to us whether to return the token to that same rustc.
48 //!
49 //! The current scheduling algorithm is relatively primitive and could likely be
50 //! improved.
51
52 use std::cell::Cell;
53 use std::collections::{BTreeMap, HashMap, HashSet};
54 use std::io;
55 use std::marker;
56 use std::mem;
57 use std::sync::Arc;
58 use std::time::Duration;
59
60 use anyhow::format_err;
61 use crossbeam_channel::{unbounded, Receiver, Sender};
62 use crossbeam_utils::thread::Scope;
63 use jobserver::{Acquired, Client, HelperThread};
64 use log::{debug, info, trace};
65
66 use super::context::OutputFile;
67 use super::job::{
68 Freshness::{self, Dirty, Fresh},
69 Job,
70 };
71 use super::timings::Timings;
72 use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
73 use crate::core::{PackageId, TargetKind};
74 use crate::handle_error;
75 use crate::util;
76 use crate::util::diagnostic_server::{self, DiagnosticPrinter};
77 use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
78 use crate::util::{Config, DependencyQueue};
79 use crate::util::{Progress, ProgressStyle};
80
81 /// This structure is backed by the `DependencyQueue` type and manages the
82 /// queueing of compilation steps for each package. Packages enqueue units of
83 /// work and then later on the entire graph is converted to DrainState and
84 /// executed.
85 pub struct JobQueue<'a, 'cfg> {
86 queue: DependencyQueue<Unit<'a>, Artifact, Job>,
87 counts: HashMap<PackageId, usize>,
88 timings: Timings<'a, 'cfg>,
89 }
90
91 /// This structure is backed by the `DependencyQueue` type and manages the
92 /// actual compilation step of each package. Packages enqueue units of work and
93 /// then later on the entire graph is processed and compiled.
94 ///
95 /// It is created from JobQueue when we have fully assembled the crate graph
96 /// (i.e., all package dependencies are known).
97 struct DrainState<'a, 'cfg> {
98 // This is the length of the DependencyQueue when starting out
99 total_units: usize,
100
101 queue: DependencyQueue<Unit<'a>, Artifact, Job>,
102 tx: Sender<Message>,
103 rx: Receiver<Message>,
104 active: HashMap<JobId, Unit<'a>>,
105 compiled: HashSet<PackageId>,
106 documented: HashSet<PackageId>,
107 counts: HashMap<PackageId, usize>,
108 progress: Progress<'cfg>,
109 next_id: u32,
110 timings: Timings<'a, 'cfg>,
111
112 /// Tokens that are currently owned by this Cargo, and may be "associated"
113 /// with a rustc process. They may also be unused, though if so will be
114 /// dropped on the next loop iteration.
115 ///
116 /// Note that the length of this may be zero, but we will still spawn work,
117 /// as we share the implicit token given to this Cargo process with a
118 /// single rustc process.
119 tokens: Vec<Acquired>,
120
121 /// rustc per-thread tokens, when in jobserver-per-rustc mode.
122 rustc_tokens: HashMap<JobId, Vec<Acquired>>,
123
124 /// This represents the list of rustc jobs (processes) and associated
125 /// clients that are interested in receiving a token.
126 to_send_clients: BTreeMap<JobId, Vec<Client>>,
127
128 /// The list of jobs that we have not yet started executing, but have
129 /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
130 /// allow us to request jobserver tokens pretty early.
131 pending_queue: Vec<(Unit<'a>, Job)>,
132 print: DiagnosticPrinter<'cfg>,
133
134 // How many jobs we've finished
135 finished: usize,
136 }
137
138 #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
139 pub struct JobId(pub u32);
140
141 impl std::fmt::Display for JobId {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 write!(f, "{}", self.0)
144 }
145 }
146
147 pub struct JobState<'a> {
148 /// Channel back to the main thread to coordinate messages and such.
149 tx: Sender<Message>,
150
151 /// The job id that this state is associated with, used when sending
152 /// messages back to the main thread.
153 id: JobId,
154
155 /// Whether or not we're expected to have a call to `rmeta_produced`. Once
156 /// that method is called this is dynamically set to `false` to prevent
157 /// sending a double message later on.
158 rmeta_required: Cell<bool>,
159
160 // Historical versions of Cargo made use of the `'a` argument here, so to
161 // leave the door open to future refactorings keep it here.
162 _marker: marker::PhantomData<&'a ()>,
163 }
164
165 /// Possible artifacts that can be produced by compilations, used as edge values
166 /// in the dependency graph.
167 ///
168 /// As edge values we can have multiple kinds of edges depending on one node,
169 /// for example some units may only depend on the metadata for an rlib while
170 /// others depend on the full rlib. This `Artifact` enum is used to distinguish
171 /// this case and track the progress of compilations as they proceed.
172 #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
173 enum Artifact {
174 /// A generic placeholder for "depends on everything run by a step" and
175 /// means that we can't start the next compilation until the previous has
176 /// finished entirely.
177 All,
178
179 /// A node indicating that we only depend on the metadata of a compilation,
180 /// but the compilation is typically also producing an rlib. We can start
181 /// our step, however, before the full rlib is available.
182 Metadata,
183 }
184
185 enum Message {
186 Run(JobId, String),
187 BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
188 Stdout(String),
189 Stderr(String),
190 FixDiagnostic(diagnostic_server::Message),
191 Token(io::Result<Acquired>),
192 Finish(JobId, Artifact, CargoResult<()>),
193
194 // This client should get release_raw called on it with one of our tokens
195 NeedsToken(JobId),
196
197 // A token previously passed to a NeedsToken client is being released.
198 ReleaseToken(JobId),
199 }
200
201 impl<'a> JobState<'a> {
202 pub fn running(&self, cmd: &ProcessBuilder) {
203 let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
204 }
205
206 pub fn build_plan(
207 &self,
208 module_name: String,
209 cmd: ProcessBuilder,
210 filenames: Arc<Vec<OutputFile>>,
211 ) {
212 let _ = self
213 .tx
214 .send(Message::BuildPlanMsg(module_name, cmd, filenames));
215 }
216
217 pub fn stdout(&self, stdout: String) {
218 drop(self.tx.send(Message::Stdout(stdout)));
219 }
220
221 pub fn stderr(&self, stderr: String) {
222 drop(self.tx.send(Message::Stderr(stderr)));
223 }
224
225 /// A method used to signal to the coordinator thread that the rmeta file
226 /// for an rlib has been produced. This is only called for some rmeta
227 /// builds when required, and can be called at any time before a job ends.
228 /// This should only be called once because a metadata file can only be
229 /// produced once!
230 pub fn rmeta_produced(&self) {
231 self.rmeta_required.set(false);
232 let _ = self
233 .tx
234 .send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
235 }
236
237 /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
238 /// on the passed client.
239 ///
240 /// This should arrange for the associated client to eventually get a token via
241 /// `client.release_raw()`.
242 pub fn will_acquire(&self) {
243 let _ = self.tx.send(Message::NeedsToken(self.id));
244 }
245
246 /// The rustc underlying this Job is informing us that it is done with a jobserver token.
247 ///
248 /// Note that it does *not* write that token back anywhere.
249 pub fn release_token(&self) {
250 let _ = self.tx.send(Message::ReleaseToken(self.id));
251 }
252 }
253
254 impl<'a, 'cfg> JobQueue<'a, 'cfg> {
255 pub fn new(bcx: &BuildContext<'a, 'cfg>, root_units: &[Unit<'a>]) -> JobQueue<'a, 'cfg> {
256 JobQueue {
257 queue: DependencyQueue::new(),
258 counts: HashMap::new(),
259 timings: Timings::new(bcx, root_units),
260 }
261 }
262
263 pub fn enqueue(
264 &mut self,
265 cx: &Context<'a, 'cfg>,
266 unit: &Unit<'a>,
267 job: Job,
268 ) -> CargoResult<()> {
269 let dependencies = cx.unit_deps(unit);
270 let mut queue_deps = dependencies
271 .iter()
272 .filter(|dep| {
273 // Binaries aren't actually needed to *compile* tests, just to run
274 // them, so we don't include this dependency edge in the job graph.
275 !dep.unit.target.is_test() && !dep.unit.target.is_bin()
276 })
277 .map(|dep| {
278 // Handle the case here where our `unit -> dep` dependency may
279 // only require the metadata, not the full compilation to
280 // finish. Use the tables in `cx` to figure out what kind
281 // of artifact is associated with this dependency.
282 let artifact = if cx.only_requires_rmeta(unit, &dep.unit) {
283 Artifact::Metadata
284 } else {
285 Artifact::All
286 };
287 (dep.unit, artifact)
288 })
289 .collect::<HashMap<_, _>>();
290
291 // This is somewhat tricky, but we may need to synthesize some
292 // dependencies for this target if it requires full upstream
293 // compilations to have completed. If we're in pipelining mode then some
294 // dependency edges may be `Metadata` due to the above clause (as
295 // opposed to everything being `All`). For example consider:
296 //
297 // a (binary)
298 // â”” b (lib)
299 // â”” c (lib)
300 //
301 // Here the dependency edge from B to C will be `Metadata`, and the
302 // dependency edge from A to B will be `All`. For A to be compiled,
303 // however, it currently actually needs the full rlib of C. This means
304 // that we need to synthesize a dependency edge for the dependency graph
305 // from A to C. That's done here.
306 //
307 // This will walk all dependencies of the current target, and if any of
308 // *their* dependencies are `Metadata` then we depend on the `All` of
309 // the target as well. This should ensure that edges changed to
310 // `Metadata` propagate upwards `All` dependencies to anything that
311 // transitively contains the `Metadata` edge.
312 if unit.requires_upstream_objects() {
313 for dep in dependencies {
314 depend_on_deps_of_deps(cx, &mut queue_deps, dep.unit);
315 }
316
317 fn depend_on_deps_of_deps<'a>(
318 cx: &Context<'a, '_>,
319 deps: &mut HashMap<Unit<'a>, Artifact>,
320 unit: Unit<'a>,
321 ) {
322 for dep in cx.unit_deps(&unit) {
323 if deps.insert(dep.unit, Artifact::All).is_none() {
324 depend_on_deps_of_deps(cx, deps, dep.unit);
325 }
326 }
327 }
328 }
329
330 self.queue.queue(*unit, job, queue_deps);
331 *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
332 Ok(())
333 }
334
335 /// Executes all jobs necessary to build the dependency graph.
336 ///
337 /// This function will spawn off `config.jobs()` workers to build all of the
338 /// necessary dependencies, in order. Freshness is propagated as far as
339 /// possible along each dependency chain.
340 pub fn execute(mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> {
341 let _p = profile::start("executing the job graph");
342 self.queue.queue_finished();
343
344 let (tx, rx) = unbounded();
345 let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
346 let state = DrainState {
347 total_units: self.queue.len(),
348 queue: self.queue,
349 tx,
350 rx,
351 active: HashMap::new(),
352 compiled: HashSet::new(),
353 documented: HashSet::new(),
354 counts: self.counts,
355 progress,
356 next_id: 0,
357 timings: self.timings,
358
359 tokens: Vec::new(),
360 rustc_tokens: HashMap::new(),
361 to_send_clients: BTreeMap::new(),
362 pending_queue: Vec::new(),
363 print: DiagnosticPrinter::new(cx.bcx.config),
364 finished: 0,
365 };
366
367 // Create a helper thread for acquiring jobserver tokens
368 let tx = state.tx.clone();
369 let helper = cx
370 .jobserver
371 .clone()
372 .into_helper_thread(move |token| {
373 drop(tx.send(Message::Token(token)));
374 })
375 .chain_err(|| "failed to create helper thread for jobserver management")?;
376
377 // Create a helper thread to manage the diagnostics for rustfix if
378 // necessary.
379 let tx = state.tx.clone();
380 let _diagnostic_server = cx
381 .bcx
382 .build_config
383 .rustfix_diagnostic_server
384 .borrow_mut()
385 .take()
386 .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
387
388 crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
389 .expect("child threads shouldn't panic")
390 }
391 }
392
393 impl<'a, 'cfg> DrainState<'a, 'cfg> {
394 fn spawn_work_if_possible(
395 &mut self,
396 cx: &mut Context<'a, '_>,
397 jobserver_helper: &HelperThread,
398 scope: &Scope<'_>,
399 has_errored: bool,
400 ) -> CargoResult<()> {
401 // Dequeue as much work as we can, learning about everything
402 // possible that can run. Note that this is also the point where we
403 // start requesting job tokens. Each job after the first needs to
404 // request a token.
405 while let Some((unit, job)) = self.queue.dequeue() {
406 self.pending_queue.push((unit, job));
407 if self.active.len() + self.pending_queue.len() > 1 {
408 jobserver_helper.request_token();
409 }
410 }
411
412 // Do not actually spawn the new work if we've errored out
413 if has_errored {
414 return Ok(());
415 }
416
417 // Now that we've learned of all possible work that we can execute
418 // try to spawn it so long as we've got a jobserver token which says
419 // we're able to perform some parallel work.
420 while self.has_extra_tokens() && !self.pending_queue.is_empty() {
421 let (unit, job) = self.pending_queue.remove(0);
422 self.run(&unit, job, cx, scope)?;
423 }
424
425 Ok(())
426 }
427
428 fn has_extra_tokens(&self) -> bool {
429 self.active.len() < self.tokens.len() + 1
430 }
431
432 // The oldest job (i.e., least job ID) is the one we grant tokens to first.
433 fn pop_waiting_client(&mut self) -> (JobId, Client) {
434 // FIXME: replace this with BTreeMap::first_entry when that stabilizes.
435 let key = *self
436 .to_send_clients
437 .keys()
438 .next()
439 .expect("at least one waiter");
440 let clients = self.to_send_clients.get_mut(&key).unwrap();
441 let client = clients.pop().unwrap();
442 if clients.is_empty() {
443 self.to_send_clients.remove(&key);
444 }
445 (key, client)
446 }
447
448 // If we managed to acquire some extra tokens, send them off to a waiting rustc.
449 fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
450 while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
451 let (id, client) = self.pop_waiting_client();
452 // This unwrap is guaranteed to succeed. `active` must be at least
453 // length 1, as otherwise there can't be a client waiting to be sent
454 // on, so tokens.len() must also be at least one.
455 let token = self.tokens.pop().unwrap();
456 self.rustc_tokens
457 .entry(id)
458 .or_insert_with(Vec::new)
459 .push(token);
460 client
461 .release_raw()
462 .chain_err(|| "failed to release jobserver token")?;
463 }
464
465 Ok(())
466 }
467
468 fn handle_event(
469 &mut self,
470 cx: &mut Context<'a, '_>,
471 jobserver_helper: &HelperThread,
472 plan: &mut BuildPlan,
473 event: Message,
474 ) -> CargoResult<Option<anyhow::Error>> {
475 match event {
476 Message::Run(id, cmd) => {
477 cx.bcx
478 .config
479 .shell()
480 .verbose(|c| c.status("Running", &cmd))?;
481 self.timings.unit_start(id, self.active[&id]);
482 }
483 Message::BuildPlanMsg(module_name, cmd, filenames) => {
484 plan.update(&module_name, &cmd, &filenames)?;
485 }
486 Message::Stdout(out) => {
487 cx.bcx.config.shell().stdout_println(out);
488 }
489 Message::Stderr(err) => {
490 let mut shell = cx.bcx.config.shell();
491 shell.print_ansi(err.as_bytes())?;
492 shell.err().write_all(b"\n")?;
493 }
494 Message::FixDiagnostic(msg) => {
495 self.print.print(&msg)?;
496 }
497 Message::Finish(id, artifact, result) => {
498 let unit = match artifact {
499 // If `id` has completely finished we remove it
500 // from the `active` map ...
501 Artifact::All => {
502 info!("end: {:?}", id);
503 self.finished += 1;
504 if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
505 // This puts back the tokens that this rustc
506 // acquired into our primary token list.
507 //
508 // This represents a rustc bug: it did not
509 // release all of its thread tokens but finished
510 // completely. But we want to make Cargo resilient
511 // to such rustc bugs, as they're generally not
512 // fatal in nature (i.e., Cargo can make progress
513 // still, and the build might not even fail).
514 self.tokens.extend(rustc_tokens);
515 }
516 self.to_send_clients.remove(&id);
517 self.active.remove(&id).unwrap()
518 }
519 // ... otherwise if it hasn't finished we leave it
520 // in there as we'll get another `Finish` later on.
521 Artifact::Metadata => {
522 info!("end (meta): {:?}", id);
523 self.active[&id]
524 }
525 };
526 info!("end ({:?}): {:?}", unit, result);
527 match result {
528 Ok(()) => self.finish(id, &unit, artifact, cx)?,
529 Err(e) => {
530 let msg = "The following warnings were emitted during compilation:";
531 self.emit_warnings(Some(msg), &unit, cx)?;
532
533 if !self.active.is_empty() {
534 handle_error(&e, &mut *cx.bcx.config.shell());
535 cx.bcx.config.shell().warn(
536 "build failed, waiting for other \
537 jobs to finish...",
538 )?;
539 return Ok(Some(anyhow::format_err!("build failed")));
540 } else {
541 return Ok(Some(e));
542 }
543 }
544 }
545 }
546 Message::Token(acquired_token) => {
547 let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?;
548 self.tokens.push(token);
549 }
550 Message::NeedsToken(id) => {
551 log::info!("queue token request");
552 jobserver_helper.request_token();
553 let client = cx.rustc_clients[&self.active[&id]].clone();
554 self.to_send_clients
555 .entry(id)
556 .or_insert_with(Vec::new)
557 .push(client);
558 }
559 Message::ReleaseToken(id) => {
560 // Note that this pops off potentially a completely
561 // different token, but all tokens of the same job are
562 // conceptually the same so that's fine.
563 //
564 // self.tokens is a "pool" -- the order doesn't matter -- and
565 // this transfers ownership of the token into that pool. If we
566 // end up using it on the next go around, then this token will
567 // be truncated, same as tokens obtained through Message::Token.
568 let rustc_tokens = self
569 .rustc_tokens
570 .get_mut(&id)
571 .expect("no tokens associated");
572 self.tokens
573 .push(rustc_tokens.pop().expect("rustc releases token it has"));
574 }
575 }
576
577 Ok(None)
578 }
579
580 // This will also tick the progress bar as appropriate
581 fn wait_for_events(&mut self) -> Vec<Message> {
582 // Drain all events at once to avoid displaying the progress bar
583 // unnecessarily. If there's no events we actually block waiting for
584 // an event, but we keep a "heartbeat" going to allow `record_cpu`
585 // to run above to calculate CPU usage over time. To do this we
586 // listen for a message with a timeout, and on timeout we run the
587 // previous parts of the loop again.
588 let events: Vec<_> = self.rx.try_iter().collect();
589 info!(
590 "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
591 self.tokens.len(),
592 self.rustc_tokens
593 .iter()
594 .map(|(k, j)| (k, j.len()))
595 .collect::<Vec<_>>(),
596 self.to_send_clients
597 .iter()
598 .map(|(k, j)| (k, j.len()))
599 .collect::<Vec<_>>(),
600 events.len(),
601 );
602 if events.is_empty() {
603 loop {
604 self.tick_progress();
605 self.tokens.truncate(self.active.len() - 1);
606 match self.rx.recv_timeout(Duration::from_millis(500)) {
607 Ok(message) => break vec![message],
608 Err(_) => continue,
609 }
610 }
611 } else {
612 events
613 }
614 }
615
616 fn drain_the_queue(
617 mut self,
618 cx: &mut Context<'a, '_>,
619 plan: &mut BuildPlan,
620 scope: &Scope<'a>,
621 jobserver_helper: &HelperThread,
622 ) -> CargoResult<()> {
623 trace!("queue: {:#?}", self.queue);
624
625 // Iteratively execute the entire dependency graph. Each turn of the
626 // loop starts out by scheduling as much work as possible (up to the
627 // maximum number of parallel jobs we have tokens for). A local queue
628 // is maintained separately from the main dependency queue as one
629 // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
630 // in one package).
631 //
632 // After a job has finished we update our internal state if it was
633 // successful and otherwise wait for pending work to finish if it failed
634 // and then immediately return.
635 let mut error = None;
636 loop {
637 self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?;
638
639 // If after all that we're not actually running anything then we're
640 // done!
641 if self.active.is_empty() {
642 break;
643 }
644
645 self.grant_rustc_token_requests()?;
646
647 // And finally, before we block waiting for the next event, drop any
648 // excess tokens we may have accidentally acquired. Due to how our
649 // jobserver interface is architected we may acquire a token that we
650 // don't actually use, and if this happens just relinquish it back
651 // to the jobserver itself.
652 for event in self.wait_for_events() {
653 if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? {
654 error = Some(err);
655 }
656 }
657 }
658 self.progress.clear();
659
660 let profile_name = cx.bcx.build_config.requested_profile;
661 // NOTE: this may be a bit inaccurate, since this may not display the
662 // profile for what was actually built. Profile overrides can change
663 // these settings, and in some cases different targets are built with
664 // different profiles. To be accurate, it would need to collect a
665 // list of Units built, and maybe display a list of the different
666 // profiles used. However, to keep it simple and compatible with old
667 // behavior, we just display what the base profile is.
668 let profile = cx.bcx.profiles.base_profile();
669 let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
670 "unoptimized"
671 } else {
672 "optimized"
673 });
674 if profile.debuginfo.unwrap_or(0) != 0 {
675 opt_type += " + debuginfo";
676 }
677
678 let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
679
680 if let Some(e) = error {
681 Err(e)
682 } else if self.queue.is_empty() && self.pending_queue.is_empty() {
683 let message = format!(
684 "{} [{}] target(s) in {}",
685 profile_name, opt_type, time_elapsed
686 );
687 if !cx.bcx.build_config.build_plan {
688 cx.bcx.config.shell().status("Finished", message)?;
689 }
690 self.timings.finished(cx.bcx)?;
691 Ok(())
692 } else {
693 debug!("queue: {:#?}", self.queue);
694 Err(internal("finished with jobs still left in the queue"))
695 }
696 }
697
698 // This also records CPU usage and marks concurrency; we roughly want to do
699 // this as often as we spin on the events receiver (at least every 500ms or
700 // so).
701 fn tick_progress(&mut self) {
702 // Record some timing information if `-Ztimings` is enabled, and
703 // this'll end up being a noop if we're not recording this
704 // information.
705 self.timings.mark_concurrency(
706 self.active.len(),
707 self.pending_queue.len(),
708 self.queue.len(),
709 self.rustc_tokens.len(),
710 );
711 self.timings.record_cpu();
712
713 let active_names = self
714 .active
715 .values()
716 .map(|u| self.name_for_progress(u))
717 .collect::<Vec<_>>();
718 drop(self.progress.tick_now(
719 self.finished,
720 self.total_units,
721 &format!(": {}", active_names.join(", ")),
722 ));
723 }
724
725 fn name_for_progress(&self, unit: &Unit<'_>) -> String {
726 let pkg_name = unit.pkg.name();
727 match unit.mode {
728 CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
729 CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
730 _ => {
731 let annotation = match unit.target.kind() {
732 TargetKind::Lib(_) => return pkg_name.to_string(),
733 TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
734 TargetKind::Bin => "bin",
735 TargetKind::Test => "test",
736 TargetKind::Bench => "bench",
737 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
738 };
739 format!("{}({})", unit.target.name(), annotation)
740 }
741 }
742 }
743
744 /// Executes a job, pushing the spawned thread's handled onto `threads`.
745 fn run(
746 &mut self,
747 unit: &Unit<'a>,
748 job: Job,
749 cx: &Context<'a, '_>,
750 scope: &Scope<'_>,
751 ) -> CargoResult<()> {
752 let id = JobId(self.next_id);
753 self.next_id = self.next_id.checked_add(1).unwrap();
754
755 info!("start {}: {:?}", id, unit);
756
757 assert!(self.active.insert(id, *unit).is_none());
758 *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
759
760 let my_tx = self.tx.clone();
761 let fresh = job.freshness();
762 let rmeta_required = cx.rmeta_required(unit);
763
764 if !cx.bcx.build_config.build_plan {
765 // Print out some nice progress information.
766 self.note_working_on(cx.bcx.config, unit, fresh)?;
767 }
768
769 let doit = move || {
770 let state = JobState {
771 id,
772 tx: my_tx.clone(),
773 rmeta_required: Cell::new(rmeta_required),
774 _marker: marker::PhantomData,
775 };
776
777 let mut sender = FinishOnDrop {
778 tx: &my_tx,
779 id,
780 result: Err(format_err!("worker panicked")),
781 };
782 sender.result = job.run(&state);
783
784 // If the `rmeta_required` wasn't consumed but it was set
785 // previously, then we either have:
786 //
787 // 1. The `job` didn't do anything because it was "fresh".
788 // 2. The `job` returned an error and didn't reach the point where
789 // it called `rmeta_produced`.
790 // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
791 //
792 // Ruling out the third, the other two are pretty common for 2
793 // we'll just naturally abort the compilation operation but for 1
794 // we need to make sure that the metadata is flagged as produced so
795 // send a synthetic message here.
796 if state.rmeta_required.get() && sender.result.is_ok() {
797 my_tx
798 .send(Message::Finish(id, Artifact::Metadata, Ok(())))
799 .unwrap();
800 }
801
802 // Use a helper struct with a `Drop` implementation to guarantee
803 // that a `Finish` message is sent even if our job panics. We
804 // shouldn't panic unless there's a bug in Cargo, so we just need
805 // to make sure nothing hangs by accident.
806 struct FinishOnDrop<'a> {
807 tx: &'a Sender<Message>,
808 id: JobId,
809 result: CargoResult<()>,
810 }
811
812 impl Drop for FinishOnDrop<'_> {
813 fn drop(&mut self) {
814 let msg = mem::replace(&mut self.result, Ok(()));
815 drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
816 }
817 }
818 };
819
820 match fresh {
821 Freshness::Fresh => {
822 self.timings.add_fresh();
823 doit();
824 }
825 Freshness::Dirty => {
826 self.timings.add_dirty();
827 scope.spawn(move |_| doit());
828 }
829 }
830
831 Ok(())
832 }
833
834 fn emit_warnings(
835 &mut self,
836 msg: Option<&str>,
837 unit: &Unit<'a>,
838 cx: &mut Context<'a, '_>,
839 ) -> CargoResult<()> {
840 let outputs = cx.build_script_outputs.lock().unwrap();
841 let metadata = match cx.find_build_script_metadata(*unit) {
842 Some(metadata) => metadata,
843 None => return Ok(()),
844 };
845 let bcx = &mut cx.bcx;
846 if let Some(output) = outputs.get(unit.pkg.package_id(), metadata) {
847 if !output.warnings.is_empty() {
848 if let Some(msg) = msg {
849 writeln!(bcx.config.shell().err(), "{}\n", msg)?;
850 }
851
852 for warning in output.warnings.iter() {
853 bcx.config.shell().warn(warning)?;
854 }
855
856 if msg.is_some() {
857 // Output an empty line.
858 writeln!(bcx.config.shell().err())?;
859 }
860 }
861 }
862
863 Ok(())
864 }
865
866 fn finish(
867 &mut self,
868 id: JobId,
869 unit: &Unit<'a>,
870 artifact: Artifact,
871 cx: &mut Context<'a, '_>,
872 ) -> CargoResult<()> {
873 if unit.mode.is_run_custom_build() && cx.bcx.show_warnings(unit.pkg.package_id()) {
874 self.emit_warnings(None, unit, cx)?;
875 }
876 let unlocked = self.queue.finish(unit, &artifact);
877 match artifact {
878 Artifact::All => self.timings.unit_finished(id, unlocked),
879 Artifact::Metadata => self.timings.unit_rmeta_finished(id, unlocked),
880 }
881 Ok(())
882 }
883
884 // This isn't super trivial because we don't want to print loads and
885 // loads of information to the console, but we also want to produce a
886 // faithful representation of what's happening. This is somewhat nuanced
887 // as a package can start compiling *very* early on because of custom
888 // build commands and such.
889 //
890 // In general, we try to print "Compiling" for the first nontrivial task
891 // run for a package, regardless of when that is. We then don't print
892 // out any more information for a package after we've printed it once.
893 fn note_working_on(
894 &mut self,
895 config: &Config,
896 unit: &Unit<'a>,
897 fresh: Freshness,
898 ) -> CargoResult<()> {
899 if (self.compiled.contains(&unit.pkg.package_id()) && !unit.mode.is_doc())
900 || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
901 {
902 return Ok(());
903 }
904
905 match fresh {
906 // Any dirty stage which runs at least one command gets printed as
907 // being a compiled package.
908 Dirty => {
909 if unit.mode.is_doc() {
910 self.documented.insert(unit.pkg.package_id());
911 config.shell().status("Documenting", unit.pkg)?;
912 } else if unit.mode.is_doc_test() {
913 // Skip doc test.
914 } else {
915 self.compiled.insert(unit.pkg.package_id());
916 if unit.mode.is_check() {
917 config.shell().status("Checking", unit.pkg)?;
918 } else {
919 config.shell().status("Compiling", unit.pkg)?;
920 }
921 }
922 }
923 Fresh => {
924 // If doc test are last, only print "Fresh" if nothing has been printed.
925 if self.counts[&unit.pkg.package_id()] == 0
926 && !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
927 {
928 self.compiled.insert(unit.pkg.package_id());
929 config.shell().verbose(|c| c.status("Fresh", unit.pkg))?;
930 }
931 }
932 }
933 Ok(())
934 }
935 }