1 //! This module implements the job queue which determines the ordering in which
2 //! rustc is spawned off. It also manages the allocation of jobserver tokens to
3 //! rustc beyond the implicit token each rustc owns (i.e., the ones used for
4 //! parallel LLVM work and parallel rustc threads).
6 //! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
7 //! other, which is due to scaling issues with sharing a single jobserver
8 //! amongst what is potentially hundreds of threads of work on many-cored
9 //! systems on (at least) linux, and likely other platforms as well.
11 //! The details of this algorithm are (also) written out in
12 //! src/librustc_jobserver/lib.rs. What follows is a description focusing on the
13 //! Cargo side of things.
15 //! Cargo wants to complete the build as quickly as possible, fully saturating
16 //! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
17 //! more than N threads of work: the total amount of tokens we have floating
18 //! around must always be limited to N.
20 //! It is not really possible to optimally choose which crate should build first
21 //! or last; nor is it possible to decide whether to give an additional token to
22 //! rustc first or rather spawn a new crate of work. For now, the algorithm we
23 //! implement prioritizes spawning as many crates (i.e., rustc processes) as
24 //! possible, and then filling each rustc with tokens on demand.
26 //! The primary loop is in `drain_the_queue` below.
28 //! We integrate with the jobserver, originating from GNU make, to make sure
29 //! that build scripts which use make to build C code can cooperate with us on
30 //! the number of used tokens and avoid overfilling the system we're on.
32 //! The jobserver is unfortunately a very simple protocol, so we enhance it a
33 //! little when we know that there is a rustc on the other end. Via the stderr
34 //! pipe we have to rustc, we get messages such as "NeedsToken" and
35 //! "ReleaseToken" from rustc.
37 //! "NeedsToken" indicates that a rustc is interested in acquiring a token, but
38 //! never that it would be impossible to make progress without one (i.e., it
39 //! would be incorrect for rustc to not terminate due to an unfulfilled
40 //! NeedsToken request); we do not usually fulfill all NeedsToken requests for a
43 //! "ReleaseToken" indicates that a rustc is done with one of its tokens and is
44 //! ready for us to re-acquire ownership -- we will either release that token
45 //! back into the general pool or reuse it ourselves. Note that rustc will
46 //! inform us that it is releasing a token even if it itself is also requesting
47 //! tokens; is is up to us whether to return the token to that same rustc.
49 //! The current scheduling algorithm is relatively primitive and could likely be
52 use std
::cell
::{Cell, RefCell}
;
53 use std
::collections
::{BTreeMap, HashMap, HashSet}
;
54 use std
::fmt
::Write
as _
;
58 use std
::thread
::{self, Scope}
;
59 use std
::time
::Duration
;
61 use anyhow
::{format_err, Context as _}
;
62 use cargo_util
::ProcessBuilder
;
63 use jobserver
::{Acquired, Client, HelperThread}
;
64 use log
::{debug, trace}
;
67 use super::context
::OutputFile
;
69 Freshness
::{self, Dirty, Fresh}
,
72 use super::timings
::Timings
;
73 use super::{BuildContext, BuildPlan, CompileMode, Context, Unit}
;
74 use crate::core
::compiler
::future_incompat
::{
75 self, FutureBreakageItem
, FutureIncompatReportPackage
,
77 use crate::core
::resolver
::ResolveBehavior
;
78 use crate::core
::{PackageId, Shell, TargetKind}
;
79 use crate::util
::diagnostic_server
::{self, DiagnosticPrinter}
;
80 use crate::util
::errors
::AlreadyPrintedError
;
81 use crate::util
::machine_message
::{self, Message as _}
;
82 use crate::util
::CargoResult
;
83 use crate::util
::{self, internal, profile}
;
84 use crate::util
::{Config, DependencyQueue, Progress, ProgressStyle, Queue}
;
86 /// This structure is backed by the `DependencyQueue` type and manages the
87 /// queueing of compilation steps for each package. Packages enqueue units of
88 /// work and then later on the entire graph is converted to DrainState and
90 pub struct JobQueue
<'cfg
> {
91 queue
: DependencyQueue
<Unit
, Artifact
, Job
>,
92 counts
: HashMap
<PackageId
, usize>,
93 timings
: Timings
<'cfg
>,
96 /// This structure is backed by the `DependencyQueue` type and manages the
97 /// actual compilation step of each package. Packages enqueue units of work and
98 /// then later on the entire graph is processed and compiled.
100 /// It is created from JobQueue when we have fully assembled the crate graph
101 /// (i.e., all package dependencies are known).
105 /// Each thread running a process uses the message queue to send messages back
106 /// to the main thread. The main thread coordinates everything, and handles
109 /// It is important to be careful which messages use `push` vs `push_bounded`.
110 /// `push` is for priority messages (like tokens, or "finished") where the
111 /// sender shouldn't block. We want to handle those so real work can proceed
114 /// `push_bounded` is only for messages being printed to stdout/stderr. Being
115 /// bounded prevents a flood of messages causing a large amount of memory
118 /// `push` also avoids blocking which helps avoid deadlocks. For example, when
119 /// the diagnostic server thread is dropped, it waits for the thread to exit.
120 /// But if the thread is blocked on a full queue, and there is a critical
121 /// error, the drop will deadlock. This should be fixed at some point in the
122 /// future. The jobserver thread has a similar problem, though it will time
123 /// out after 1 second.
124 struct DrainState
<'cfg
> {
125 // This is the length of the DependencyQueue when starting out
128 queue
: DependencyQueue
<Unit
, Artifact
, Job
>,
129 messages
: Arc
<Queue
<Message
>>,
130 /// Diagnostic deduplication support.
131 diag_dedupe
: DiagDedupe
<'cfg
>,
132 /// Count of warnings, used to print a summary after the job succeeds.
134 /// First value is the total number of warnings, and the second value is
135 /// the number that were suppressed because they were duplicates of a
136 /// previous warning.
137 warning_count
: HashMap
<JobId
, (usize, usize)>,
138 active
: HashMap
<JobId
, Unit
>,
139 compiled
: HashSet
<PackageId
>,
140 documented
: HashSet
<PackageId
>,
141 counts
: HashMap
<PackageId
, usize>,
142 progress
: Progress
<'cfg
>,
144 timings
: Timings
<'cfg
>,
146 /// Tokens that are currently owned by this Cargo, and may be "associated"
147 /// with a rustc process. They may also be unused, though if so will be
148 /// dropped on the next loop iteration.
150 /// Note that the length of this may be zero, but we will still spawn work,
151 /// as we share the implicit token given to this Cargo process with a
152 /// single rustc process.
153 tokens
: Vec
<Acquired
>,
155 /// rustc per-thread tokens, when in jobserver-per-rustc mode.
156 rustc_tokens
: HashMap
<JobId
, Vec
<Acquired
>>,
158 /// This represents the list of rustc jobs (processes) and associated
159 /// clients that are interested in receiving a token.
160 to_send_clients
: BTreeMap
<JobId
, Vec
<Client
>>,
162 /// The list of jobs that we have not yet started executing, but have
163 /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
164 /// allow us to request jobserver tokens pretty early.
165 pending_queue
: Vec
<(Unit
, Job
)>,
166 print
: DiagnosticPrinter
<'cfg
>,
168 /// How many jobs we've finished
170 per_package_future_incompat_reports
: Vec
<FutureIncompatReportPackage
>,
173 pub struct ErrorsDuringDrain
{
177 struct ErrorToHandle
{
178 error
: anyhow
::Error
,
180 /// This field is true for "interesting" errors and false for "mundane"
181 /// errors. If false, we print the above error only if it's the first one
182 /// encountered so far while draining the job queue.
184 /// At most places that an error is propagated, we set this to false to
185 /// avoid scenarios where Cargo might end up spewing tons of redundant error
186 /// messages. For example if an i/o stream got closed somewhere, we don't
187 /// care about individually reporting every thread that it broke; just the
190 /// The exception where print_always is true is that we do report every
191 /// instance of a rustc invocation that failed with diagnostics. This
192 /// corresponds to errors from Message::Finish.
196 impl<E
> From
<E
> for ErrorToHandle
198 anyhow
::Error
: From
<E
>,
200 fn from(error
: E
) -> Self {
202 error
: anyhow
::Error
::from(error
),
208 #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
209 pub struct JobId(pub u32);
211 impl std
::fmt
::Display
for JobId
{
212 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
213 write
!(f
, "{}", self.0)
217 /// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
218 /// necessary to communicate between the main thread and the execution of the job.
220 /// The job may execute on either a dedicated thread or the main thread. If the job executes on the
221 /// main thread, the `output` field must be set to prevent a deadlock.
222 pub struct JobState
<'a
, 'cfg
> {
223 /// Channel back to the main thread to coordinate messages and such.
225 /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
226 /// the message queue to prevent a deadlock.
227 messages
: Arc
<Queue
<Message
>>,
229 /// Normally output is sent to the job queue with backpressure. When the job is fresh
230 /// however we need to immediately display the output to prevent a deadlock as the
231 /// output messages are processed on the same thread as they are sent from. `output`
232 /// defines where to output in this case.
234 /// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed
235 /// between threads. This means that it isn't possible for multiple output messages to be
236 /// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
237 /// interleaving is still prevented as the lock would be held for the whole printing of an
239 output
: Option
<&'a DiagDedupe
<'cfg
>>,
241 /// The job id that this state is associated with, used when sending
242 /// messages back to the main thread.
245 /// Whether or not we're expected to have a call to `rmeta_produced`. Once
246 /// that method is called this is dynamically set to `false` to prevent
247 /// sending a double message later on.
248 rmeta_required
: Cell
<bool
>,
250 // Historical versions of Cargo made use of the `'a` argument here, so to
251 // leave the door open to future refactorings keep it here.
252 _marker
: marker
::PhantomData
<&'
a ()>,
255 /// Handler for deduplicating diagnostics.
256 struct DiagDedupe
<'cfg
> {
257 seen
: RefCell
<HashSet
<u64>>,
258 config
: &'cfg Config
,
261 impl<'cfg
> DiagDedupe
<'cfg
> {
262 fn new(config
: &'cfg Config
) -> Self {
264 seen
: RefCell
::new(HashSet
::new()),
269 /// Emits a diagnostic message.
271 /// Returns `true` if the message was emitted, or `false` if it was
272 /// suppressed for being a duplicate.
273 fn emit_diag(&self, diag
: &str) -> CargoResult
<bool
> {
274 let h
= util
::hash_u64(diag
);
275 if !self.seen
.borrow_mut().insert(h
) {
278 let mut shell
= self.config
.shell();
279 shell
.print_ansi_stderr(diag
.as_bytes())?
;
280 shell
.err().write_all(b
"\n")?
;
285 /// Possible artifacts that can be produced by compilations, used as edge values
286 /// in the dependency graph.
288 /// As edge values we can have multiple kinds of edges depending on one node,
289 /// for example some units may only depend on the metadata for an rlib while
290 /// others depend on the full rlib. This `Artifact` enum is used to distinguish
291 /// this case and track the progress of compilations as they proceed.
292 #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
294 /// A generic placeholder for "depends on everything run by a step" and
295 /// means that we can't start the next compilation until the previous has
296 /// finished entirely.
299 /// A node indicating that we only depend on the metadata of a compilation,
300 /// but the compilation is typically also producing an rlib. We can start
301 /// our step, however, before the full rlib is available.
307 BuildPlanMsg(String
, ProcessBuilder
, Arc
<Vec
<OutputFile
>>),
319 FixDiagnostic(diagnostic_server
::Message
),
320 Token(io
::Result
<Acquired
>),
321 Finish(JobId
, Artifact
, CargoResult
<()>),
322 FutureIncompatReport(JobId
, Vec
<FutureBreakageItem
>),
324 // This client should get release_raw called on it with one of our tokens
327 // A token previously passed to a NeedsToken client is being released.
331 impl<'a
, 'cfg
> JobState
<'a
, 'cfg
> {
332 pub fn running(&self, cmd
: &ProcessBuilder
) {
333 self.messages
.push(Message
::Run(self.id
, cmd
.to_string()));
340 filenames
: Arc
<Vec
<OutputFile
>>,
343 .push(Message
::BuildPlanMsg(module_name
, cmd
, filenames
));
346 pub fn stdout(&self, stdout
: String
) -> CargoResult
<()> {
347 if let Some(dedupe
) = self.output
{
348 writeln
!(dedupe
.config
.shell().out(), "{}", stdout
)?
;
350 self.messages
.push_bounded(Message
::Stdout(stdout
));
355 pub fn stderr(&self, stderr
: String
) -> CargoResult
<()> {
356 if let Some(dedupe
) = self.output
{
357 let mut shell
= dedupe
.config
.shell();
358 shell
.print_ansi_stderr(stderr
.as_bytes())?
;
359 shell
.err().write_all(b
"\n")?
;
361 self.messages
.push_bounded(Message
::Stderr(stderr
));
366 pub fn emit_diag(&self, level
: String
, diag
: String
) -> CargoResult
<()> {
367 if let Some(dedupe
) = self.output
{
368 let emitted
= dedupe
.emit_diag(&diag
)?
;
369 if level
== "warning" {
370 self.messages
.push(Message
::WarningCount
{
376 self.messages
.push_bounded(Message
::Diagnostic
{
385 /// A method used to signal to the coordinator thread that the rmeta file
386 /// for an rlib has been produced. This is only called for some rmeta
387 /// builds when required, and can be called at any time before a job ends.
388 /// This should only be called once because a metadata file can only be
390 pub fn rmeta_produced(&self) {
391 self.rmeta_required
.set(false);
393 .push(Message
::Finish(self.id
, Artifact
::Metadata
, Ok(())));
396 pub fn future_incompat_report(&self, report
: Vec
<FutureBreakageItem
>) {
398 .push(Message
::FutureIncompatReport(self.id
, report
));
401 /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
402 /// on the passed client.
404 /// This should arrange for the associated client to eventually get a token via
405 /// `client.release_raw()`.
406 pub fn will_acquire(&self) {
407 self.messages
.push(Message
::NeedsToken(self.id
));
410 /// The rustc underlying this Job is informing us that it is done with a jobserver token.
412 /// Note that it does *not* write that token back anywhere.
413 pub fn release_token(&self) {
414 self.messages
.push(Message
::ReleaseToken(self.id
));
418 impl<'cfg
> JobQueue
<'cfg
> {
419 pub fn new(bcx
: &BuildContext
<'_
, 'cfg
>) -> JobQueue
<'cfg
> {
421 queue
: DependencyQueue
::new(),
422 counts
: HashMap
::new(),
423 timings
: Timings
::new(bcx
, &bcx
.roots
),
427 pub fn enqueue(&mut self, cx
: &Context
<'_
, 'cfg
>, unit
: &Unit
, job
: Job
) -> CargoResult
<()> {
428 let dependencies
= cx
.unit_deps(unit
);
429 let mut queue_deps
= dependencies
432 // Binaries aren't actually needed to *compile* tests, just to run
433 // them, so we don't include this dependency edge in the job graph.
434 (!dep
.unit
.target
.is_test() && !dep
.unit
.target
.is_bin())
435 || dep
.unit
.artifact
.is_true()
438 // Handle the case here where our `unit -> dep` dependency may
439 // only require the metadata, not the full compilation to
440 // finish. Use the tables in `cx` to figure out what kind
441 // of artifact is associated with this dependency.
442 let artifact
= if cx
.only_requires_rmeta(unit
, &dep
.unit
) {
447 (dep
.unit
.clone(), artifact
)
449 .collect
::<HashMap
<_
, _
>>();
451 // This is somewhat tricky, but we may need to synthesize some
452 // dependencies for this target if it requires full upstream
453 // compilations to have completed. Because of pipelining, some
454 // dependency edges may be `Metadata` due to the above clause (as
455 // opposed to everything being `All`). For example consider:
461 // Here the dependency edge from B to C will be `Metadata`, and the
462 // dependency edge from A to B will be `All`. For A to be compiled,
463 // however, it currently actually needs the full rlib of C. This means
464 // that we need to synthesize a dependency edge for the dependency graph
465 // from A to C. That's done here.
467 // This will walk all dependencies of the current target, and if any of
468 // *their* dependencies are `Metadata` then we depend on the `All` of
469 // the target as well. This should ensure that edges changed to
470 // `Metadata` propagate upwards `All` dependencies to anything that
471 // transitively contains the `Metadata` edge.
472 if unit
.requires_upstream_objects() {
473 for dep
in dependencies
{
474 depend_on_deps_of_deps(cx
, &mut queue_deps
, dep
.unit
.clone());
477 fn depend_on_deps_of_deps(
478 cx
: &Context
<'_
, '_
>,
479 deps
: &mut HashMap
<Unit
, Artifact
>,
482 for dep
in cx
.unit_deps(&unit
) {
483 if deps
.insert(dep
.unit
.clone(), Artifact
::All
).is_none() {
484 depend_on_deps_of_deps(cx
, deps
, dep
.unit
.clone());
490 // For now we use a fixed placeholder value for the cost of each unit, but
491 // in the future this could be used to allow users to provide hints about
492 // relative expected costs of units, or this could be automatically set in
493 // a smarter way using timing data from a previous compilation.
494 self.queue
.queue(unit
.clone(), job
, queue_deps
, 100);
495 *self.counts
.entry(unit
.pkg
.package_id()).or_insert(0) += 1;
499 /// Executes all jobs necessary to build the dependency graph.
501 /// This function will spawn off `config.jobs()` workers to build all of the
502 /// necessary dependencies, in order. Freshness is propagated as far as
503 /// possible along each dependency chain.
504 pub fn execute(mut self, cx
: &mut Context
<'_
, '_
>, plan
: &mut BuildPlan
) -> CargoResult
<()> {
505 let _p
= profile
::start("executing the job graph");
506 self.queue
.queue_finished();
508 let progress
= Progress
::with_style("Building", ProgressStyle
::Ratio
, cx
.bcx
.config
);
509 let state
= DrainState
{
510 total_units
: self.queue
.len(),
512 // 100 here is somewhat arbitrary. It is a few screenfulls of
513 // output, and hopefully at most a few megabytes of memory for
514 // typical messages. If you change this, please update the test
515 // caching_large_output, too.
516 messages
: Arc
::new(Queue
::new(100)),
517 diag_dedupe
: DiagDedupe
::new(cx
.bcx
.config
),
518 warning_count
: HashMap
::new(),
519 active
: HashMap
::new(),
520 compiled
: HashSet
::new(),
521 documented
: HashSet
::new(),
525 timings
: self.timings
,
527 rustc_tokens
: HashMap
::new(),
528 to_send_clients
: BTreeMap
::new(),
529 pending_queue
: Vec
::new(),
530 print
: DiagnosticPrinter
::new(cx
.bcx
.config
),
532 per_package_future_incompat_reports
: Vec
::new(),
535 // Create a helper thread for acquiring jobserver tokens
536 let messages
= state
.messages
.clone();
540 .into_helper_thread(move |token
| {
541 messages
.push(Message
::Token(token
));
543 .with_context(|| "failed to create helper thread for jobserver management")?
;
545 // Create a helper thread to manage the diagnostics for rustfix if
547 let messages
= state
.messages
.clone();
548 // It is important that this uses `push` instead of `push_bounded` for
549 // now. If someone wants to fix this to be bounded, the `drop`
550 // implementation needs to be changed to avoid possible deadlocks.
551 let _diagnostic_server
= cx
554 .rustfix_diagnostic_server
557 .map(move |srv
| srv
.start(move |msg
| messages
.push(Message
::FixDiagnostic(msg
))));
560 move |scope
| match state
.drain_the_queue(cx
, plan
, scope
, &helper
) {
561 Some(err
) => Err(err
),
568 impl<'cfg
> DrainState
<'cfg
> {
569 fn spawn_work_if_possible
<'s
>(
571 cx
: &mut Context
<'_
, '_
>,
572 jobserver_helper
: &HelperThread
,
573 scope
: &'s Scope
<'s
, '_
>,
574 ) -> CargoResult
<()> {
575 // Dequeue as much work as we can, learning about everything
576 // possible that can run. Note that this is also the point where we
577 // start requesting job tokens. Each job after the first needs to
579 while let Some((unit
, job
)) = self.queue
.dequeue() {
580 self.pending_queue
.push((unit
, job
));
581 if self.active
.len() + self.pending_queue
.len() > 1 {
582 jobserver_helper
.request_token();
586 // If multiple pieces of work are waiting in the pending queue, we can
587 // sort it according to their priorities: higher priorities should be
590 .sort_by_cached_key(|(unit
, _
)| self.queue
.priority(unit
));
592 // Now that we've learned of all possible work that we can execute
593 // try to spawn it so long as we've got a jobserver token which says
594 // we're able to perform some parallel work.
595 // The `pending_queue` is sorted in ascending priority order, and we're
596 // removing the highest priority items from its end.
597 while self.has_extra_tokens() && !self.pending_queue
.is_empty() {
598 let (unit
, job
) = self.pending_queue
.pop().unwrap();
599 *self.counts
.get_mut(&unit
.pkg
.package_id()).unwrap() -= 1;
600 if !cx
.bcx
.build_config
.build_plan
{
601 // Print out some nice progress information.
602 // NOTE: An error here will drop the job without starting it.
603 // That should be OK, since we want to exit as soon as
604 // possible during an error.
605 self.note_working_on(cx
.bcx
.config
, &unit
, job
.freshness())?
;
607 self.run(&unit
, job
, cx
, scope
);
613 fn has_extra_tokens(&self) -> bool
{
614 self.active
.len() < self.tokens
.len() + 1
617 // The oldest job (i.e., least job ID) is the one we grant tokens to first.
618 fn pop_waiting_client(&mut self) -> (JobId
, Client
) {
619 // FIXME: replace this with BTreeMap::first_entry when that stabilizes.
624 .expect("at least one waiter");
625 let clients
= self.to_send_clients
.get_mut(&key
).unwrap();
626 let client
= clients
.pop().unwrap();
627 if clients
.is_empty() {
628 self.to_send_clients
.remove(&key
);
633 // If we managed to acquire some extra tokens, send them off to a waiting rustc.
634 fn grant_rustc_token_requests(&mut self) -> CargoResult
<()> {
635 while !self.to_send_clients
.is_empty() && self.has_extra_tokens() {
636 let (id
, client
) = self.pop_waiting_client();
637 // This unwrap is guaranteed to succeed. `active` must be at least
638 // length 1, as otherwise there can't be a client waiting to be sent
639 // on, so tokens.len() must also be at least one.
640 let token
= self.tokens
.pop().unwrap();
643 .or_insert_with(Vec
::new
)
647 .with_context(|| "failed to release jobserver token")?
;
655 cx
: &mut Context
<'_
, '_
>,
656 jobserver_helper
: &HelperThread
,
657 plan
: &mut BuildPlan
,
659 ) -> Result
<(), ErrorToHandle
> {
661 Message
::Run(id
, cmd
) => {
665 .verbose(|c
| c
.status("Running", &cmd
))?
;
666 self.timings
.unit_start(id
, self.active
[&id
].clone());
668 Message
::BuildPlanMsg(module_name
, cmd
, filenames
) => {
669 plan
.update(&module_name
, &cmd
, &filenames
)?
;
671 Message
::Stdout(out
) => {
672 writeln
!(cx
.bcx
.config
.shell().out(), "{}", out
)?
;
674 Message
::Stderr(err
) => {
675 let mut shell
= cx
.bcx
.config
.shell();
676 shell
.print_ansi_stderr(err
.as_bytes())?
;
677 shell
.err().write_all(b
"\n")?
;
679 Message
::Diagnostic { id, level, diag }
=> {
680 let emitted
= self.diag_dedupe
.emit_diag(&diag
)?
;
681 if level
== "warning" {
682 self.bump_warning_count(id
, emitted
);
685 Message
::WarningCount { id, emitted }
=> {
686 self.bump_warning_count(id
, emitted
);
688 Message
::FixDiagnostic(msg
) => {
689 self.print
.print(&msg
)?
;
691 Message
::Finish(id
, artifact
, result
) => {
692 let unit
= match artifact
{
693 // If `id` has completely finished we remove it
694 // from the `active` map ...
696 trace
!("end: {:?}", id
);
698 if let Some(rustc_tokens
) = self.rustc_tokens
.remove(&id
) {
699 // This puts back the tokens that this rustc
700 // acquired into our primary token list.
702 // This represents a rustc bug: it did not
703 // release all of its thread tokens but finished
704 // completely. But we want to make Cargo resilient
705 // to such rustc bugs, as they're generally not
706 // fatal in nature (i.e., Cargo can make progress
707 // still, and the build might not even fail).
708 self.tokens
.extend(rustc_tokens
);
710 self.to_send_clients
.remove(&id
);
711 self.report_warning_count(cx
.bcx
.config
, id
);
712 self.active
.remove(&id
).unwrap()
714 // ... otherwise if it hasn't finished we leave it
715 // in there as we'll get another `Finish` later on.
716 Artifact
::Metadata
=> {
717 trace
!("end (meta): {:?}", id
);
718 self.active
[&id
].clone()
721 debug
!("end ({:?}): {:?}", unit
, result
);
723 Ok(()) => self.finish(id
, &unit
, artifact
, cx
)?
,
725 let msg
= "The following warnings were emitted during compilation:";
726 self.emit_warnings(Some(msg
), &unit
, cx
)?
;
727 self.back_compat_notice(cx
, &unit
)?
;
728 return Err(ErrorToHandle
{
735 Message
::FutureIncompatReport(id
, items
) => {
736 let package_id
= self.active
[&id
].pkg
.package_id();
737 self.per_package_future_incompat_reports
738 .push(FutureIncompatReportPackage { package_id, items }
);
740 Message
::Token(acquired_token
) => {
741 let token
= acquired_token
.with_context(|| "failed to acquire jobserver token")?
;
742 self.tokens
.push(token
);
744 Message
::NeedsToken(id
) => {
745 trace
!("queue token request");
746 jobserver_helper
.request_token();
747 let client
= cx
.rustc_clients
[&self.active
[&id
]].clone();
750 .or_insert_with(Vec
::new
)
753 Message
::ReleaseToken(id
) => {
754 // Note that this pops off potentially a completely
755 // different token, but all tokens of the same job are
756 // conceptually the same so that's fine.
758 // self.tokens is a "pool" -- the order doesn't matter -- and
759 // this transfers ownership of the token into that pool. If we
760 // end up using it on the next go around, then this token will
761 // be truncated, same as tokens obtained through Message::Token.
762 let rustc_tokens
= self
765 .expect("no tokens associated");
767 .push(rustc_tokens
.pop().expect("rustc releases token it has"));
774 // This will also tick the progress bar as appropriate
775 fn wait_for_events(&mut self) -> Vec
<Message
> {
776 // Drain all events at once to avoid displaying the progress bar
777 // unnecessarily. If there's no events we actually block waiting for
778 // an event, but we keep a "heartbeat" going to allow `record_cpu`
779 // to run above to calculate CPU usage over time. To do this we
780 // listen for a message with a timeout, and on timeout we run the
781 // previous parts of the loop again.
782 let mut events
= self.messages
.try_pop_all();
784 "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
788 .map(|(k
, j
)| (k
, j
.len()))
789 .collect
::<Vec
<_
>>(),
792 .map(|(k
, j
)| (k
, j
.len()))
793 .collect
::<Vec
<_
>>(),
796 if events
.is_empty() {
798 self.tick_progress();
799 self.tokens
.truncate(self.active
.len() - 1);
800 match self.messages
.pop(Duration
::from_millis(500)) {
802 events
.push(message
);
812 /// This is the "main" loop, where Cargo does all work to run the
815 /// This returns an Option to prevent the use of `?` on `Result` types
816 /// because it is important for the loop to carefully handle errors.
817 fn drain_the_queue
<'s
>(
819 cx
: &mut Context
<'_
, '_
>,
820 plan
: &mut BuildPlan
,
821 scope
: &'s Scope
<'s
, '_
>,
822 jobserver_helper
: &HelperThread
,
823 ) -> Option
<anyhow
::Error
> {
824 trace
!("queue: {:#?}", self.queue
);
826 // Iteratively execute the entire dependency graph. Each turn of the
827 // loop starts out by scheduling as much work as possible (up to the
828 // maximum number of parallel jobs we have tokens for). A local queue
829 // is maintained separately from the main dependency queue as one
830 // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
833 // After a job has finished we update our internal state if it was
834 // successful and otherwise wait for pending work to finish if it failed
835 // and then immediately return (or keep going, if requested by the build
837 let mut errors
= ErrorsDuringDrain { count: 0 }
;
838 // CAUTION! Do not use `?` or break out of the loop early. Every error
839 // must be handled in such a way that the loop is still allowed to
840 // drain event messages.
842 if errors
.count
== 0 || cx
.bcx
.build_config
.keep_going
{
843 if let Err(e
) = self.spawn_work_if_possible(cx
, jobserver_helper
, scope
) {
844 self.handle_error(&mut cx
.bcx
.config
.shell(), &mut errors
, e
);
848 // If after all that we're not actually running anything then we're
850 if self.active
.is_empty() {
854 if let Err(e
) = self.grant_rustc_token_requests() {
855 self.handle_error(&mut cx
.bcx
.config
.shell(), &mut errors
, e
);
858 // And finally, before we block waiting for the next event, drop any
859 // excess tokens we may have accidentally acquired. Due to how our
860 // jobserver interface is architected we may acquire a token that we
861 // don't actually use, and if this happens just relinquish it back
862 // to the jobserver itself.
863 for event
in self.wait_for_events() {
864 if let Err(event_err
) = self.handle_event(cx
, jobserver_helper
, plan
, event
) {
865 self.handle_error(&mut cx
.bcx
.config
.shell(), &mut errors
, event_err
);
869 self.progress
.clear();
871 let profile_name
= cx
.bcx
.build_config
.requested_profile
;
872 // NOTE: this may be a bit inaccurate, since this may not display the
873 // profile for what was actually built. Profile overrides can change
874 // these settings, and in some cases different targets are built with
875 // different profiles. To be accurate, it would need to collect a
876 // list of Units built, and maybe display a list of the different
877 // profiles used. However, to keep it simple and compatible with old
878 // behavior, we just display what the base profile is.
879 let profile
= cx
.bcx
.profiles
.base_profile();
880 let mut opt_type
= String
::from(if profile
.opt_level
.as_str() == "0" {
885 if profile
.debuginfo
.unwrap_or(0) != 0 {
886 opt_type
+= " + debuginfo";
889 let time_elapsed
= util
::elapsed(cx
.bcx
.config
.creation_time().elapsed());
890 if let Err(e
) = self.timings
.finished(cx
, &errors
.to_error()) {
891 self.handle_error(&mut cx
.bcx
.config
.shell(), &mut errors
, e
);
893 if cx
.bcx
.build_config
.emit_json() {
894 let mut shell
= cx
.bcx
.config
.shell();
895 let msg
= machine_message
::BuildFinished
{
896 success
: errors
.count
== 0,
899 if let Err(e
) = writeln
!(shell
.out(), "{}", msg
) {
900 self.handle_error(&mut shell
, &mut errors
, e
);
904 if let Some(error
) = errors
.to_error() {
905 // Any errors up to this point have already been printed via the
906 // `display_error` inside `handle_error`.
907 Some(anyhow
::Error
::new(AlreadyPrintedError
::new(error
)))
908 } else if self.queue
.is_empty() && self.pending_queue
.is_empty() {
909 let message
= format
!(
910 "{} [{}] target(s) in {}",
911 profile_name
, opt_type
, time_elapsed
913 if !cx
.bcx
.build_config
.build_plan
{
914 // It doesn't really matter if this fails.
915 drop(cx
.bcx
.config
.shell().status("Finished", message
));
916 future_incompat
::save_and_display_report(
918 &self.per_package_future_incompat_reports
,
924 debug
!("queue: {:#?}", self.queue
);
925 Some(internal("finished with jobs still left in the queue"))
932 err_state
: &mut ErrorsDuringDrain
,
933 new_err
: impl Into
<ErrorToHandle
>,
935 let new_err
= new_err
.into();
936 if new_err
.print_always
|| err_state
.count
== 0 {
937 crate::display_error(&new_err
.error
, shell
);
938 if err_state
.count
== 0 && !self.active
.is_empty() {
939 drop(shell
.warn("build failed, waiting for other jobs to finish..."));
941 err_state
.count
+= 1;
943 log
::warn
!("{:?}", new_err
.error
);
947 // This also records CPU usage and marks concurrency; we roughly want to do
948 // this as often as we spin on the events receiver (at least every 500ms or
950 fn tick_progress(&mut self) {
951 // Record some timing information if `--timings` is enabled, and
952 // this'll end up being a noop if we're not recording this
954 self.timings
.mark_concurrency(
956 self.pending_queue
.len(),
958 self.rustc_tokens
.len(),
960 self.timings
.record_cpu();
962 let active_names
= self
965 .map(|u
| self.name_for_progress(u
))
966 .collect
::<Vec
<_
>>();
967 drop(self.progress
.tick_now(
970 &format
!(": {}", active_names
.join(", ")),
974 fn name_for_progress(&self, unit
: &Unit
) -> String
{
975 let pkg_name
= unit
.pkg
.name();
976 let target_name
= unit
.target
.name();
978 CompileMode
::Doc { .. }
=> format
!("{}(doc)", pkg_name
),
979 CompileMode
::RunCustomBuild
=> format
!("{}(build)", pkg_name
),
980 CompileMode
::Test
| CompileMode
::Check { test: true }
=> match unit
.target
.kind() {
981 TargetKind
::Lib(_
) => format
!("{}(test)", target_name
),
982 TargetKind
::CustomBuild
=> panic
!("cannot test build script"),
983 TargetKind
::Bin
=> format
!("{}(bin test)", target_name
),
984 TargetKind
::Test
=> format
!("{}(test)", target_name
),
985 TargetKind
::Bench
=> format
!("{}(bench)", target_name
),
986 TargetKind
::ExampleBin
| TargetKind
::ExampleLib(_
) => {
987 format
!("{}(example test)", target_name
)
990 _
=> match unit
.target
.kind() {
991 TargetKind
::Lib(_
) => pkg_name
.to_string(),
992 TargetKind
::CustomBuild
=> format
!("{}(build.rs)", pkg_name
),
993 TargetKind
::Bin
=> format
!("{}(bin)", target_name
),
994 TargetKind
::Test
=> format
!("{}(test)", target_name
),
995 TargetKind
::Bench
=> format
!("{}(bench)", target_name
),
996 TargetKind
::ExampleBin
| TargetKind
::ExampleLib(_
) => {
997 format
!("{}(example)", target_name
)
1005 /// Fresh jobs block until finished (which should be very fast!), Dirty
1006 /// jobs will spawn a thread in the background and return immediately.
1007 fn run
<'s
>(&mut self, unit
: &Unit
, job
: Job
, cx
: &Context
<'_
, '_
>, scope
: &'s Scope
<'s
, '_
>) {
1008 let id
= JobId(self.next_id
);
1009 self.next_id
= self.next_id
.checked_add(1).unwrap();
1011 debug
!("start {}: {:?}", id
, unit
);
1013 assert
!(self.active
.insert(id
, unit
.clone()).is_none());
1015 let messages
= self.messages
.clone();
1016 let fresh
= job
.freshness();
1017 let rmeta_required
= cx
.rmeta_required(unit
);
1019 let doit
= move |state
: JobState
<'_
, '_
>| {
1020 let mut sender
= FinishOnDrop
{
1021 messages
: &state
.messages
,
1025 sender
.result
= Some(job
.run(&state
));
1027 // If the `rmeta_required` wasn't consumed but it was set
1028 // previously, then we either have:
1030 // 1. The `job` didn't do anything because it was "fresh".
1031 // 2. The `job` returned an error and didn't reach the point where
1032 // it called `rmeta_produced`.
1033 // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
1035 // Ruling out the third, the other two are pretty common for 2
1036 // we'll just naturally abort the compilation operation but for 1
1037 // we need to make sure that the metadata is flagged as produced so
1038 // send a synthetic message here.
1039 if state
.rmeta_required
.get() && sender
.result
.as_ref().unwrap().is_ok() {
1042 .push(Message
::Finish(state
.id
, Artifact
::Metadata
, Ok(())));
1045 // Use a helper struct with a `Drop` implementation to guarantee
1046 // that a `Finish` message is sent even if our job panics. We
1047 // shouldn't panic unless there's a bug in Cargo, so we just need
1048 // to make sure nothing hangs by accident.
1049 struct FinishOnDrop
<'a
> {
1050 messages
: &'a Queue
<Message
>,
1052 result
: Option
<CargoResult
<()>>,
1055 impl Drop
for FinishOnDrop
<'_
> {
1056 fn drop(&mut self) {
1060 .unwrap_or_else(|| Err(format_err
!("worker panicked")));
1062 .push(Message
::Finish(self.id
, Artifact
::All
, result
));
1068 Freshness
::Fresh
=> {
1069 self.timings
.add_fresh();
1070 // Running a fresh job on the same thread is often much faster than spawning a new
1071 // thread to run the job.
1075 output
: Some(&self.diag_dedupe
),
1076 rmeta_required
: Cell
::new(rmeta_required
),
1077 _marker
: marker
::PhantomData
,
1080 Freshness
::Dirty
=> {
1081 self.timings
.add_dirty();
1082 scope
.spawn(move || {
1085 messages
: messages
.clone(),
1087 rmeta_required
: Cell
::new(rmeta_required
),
1088 _marker
: marker
::PhantomData
,
1099 cx
: &mut Context
<'_
, '_
>,
1100 ) -> CargoResult
<()> {
1101 let outputs
= cx
.build_script_outputs
.lock().unwrap();
1102 let metadata
= match cx
.find_build_script_metadata(unit
) {
1103 Some(metadata
) => metadata
,
1104 None
=> return Ok(()),
1106 let bcx
= &mut cx
.bcx
;
1107 if let Some(output
) = outputs
.get(metadata
) {
1108 if !output
.warnings
.is_empty() {
1109 if let Some(msg
) = msg
{
1110 writeln
!(bcx
.config
.shell().err(), "{}\n", msg
)?
;
1113 for warning
in output
.warnings
.iter() {
1114 bcx
.config
.shell().warn(warning
)?
;
1118 // Output an empty line.
1119 writeln
!(bcx
.config
.shell().err())?
;
1127 fn bump_warning_count(&mut self, id
: JobId
, emitted
: bool
) {
1128 let cnts
= self.warning_count
.entry(id
).or_default();
1135 /// Displays a final report of the warnings emitted by a particular job.
1136 fn report_warning_count(&mut self, config
: &Config
, id
: JobId
) {
1137 let count
= match self.warning_count
.remove(&id
) {
1138 Some(count
) => count
,
1141 let unit
= &self.active
[&id
];
1142 let mut message
= format
!("`{}` ({}", unit
.pkg
.name(), unit
.target
.description_named());
1143 if unit
.mode
.is_rustc_test() && !(unit
.target
.is_test() || unit
.target
.is_bench()) {
1144 message
.push_str(" test");
1145 } else if unit
.mode
.is_doc_test() {
1146 message
.push_str(" doctest");
1147 } else if unit
.mode
.is_doc() {
1148 message
.push_str(" doc");
1150 message
.push_str(") generated ");
1152 1 => message
.push_str("1 warning"),
1153 n
=> drop(write
!(message
, "{} warnings", n
)),
1157 1 => message
.push_str(" (1 duplicate)"),
1158 n
=> drop(write
!(message
, " ({} duplicates)", n
)),
1160 // Errors are ignored here because it is tricky to handle them
1161 // correctly, and they aren't important.
1162 drop(config
.shell().warn(message
));
1170 cx
: &mut Context
<'_
, '_
>,
1171 ) -> CargoResult
<()> {
1172 if unit
.mode
.is_run_custom_build() && unit
.show_warnings(cx
.bcx
.config
) {
1173 self.emit_warnings(None
, unit
, cx
)?
;
1175 let unlocked
= self.queue
.finish(unit
, &artifact
);
1177 Artifact
::All
=> self.timings
.unit_finished(id
, unlocked
),
1178 Artifact
::Metadata
=> self.timings
.unit_rmeta_finished(id
, unlocked
),
1183 // This isn't super trivial because we don't want to print loads and
1184 // loads of information to the console, but we also want to produce a
1185 // faithful representation of what's happening. This is somewhat nuanced
1186 // as a package can start compiling *very* early on because of custom
1187 // build commands and such.
1189 // In general, we try to print "Compiling" for the first nontrivial task
1190 // run for a package, regardless of when that is. We then don't print
1191 // out any more information for a package after we've printed it once.
1197 ) -> CargoResult
<()> {
1198 if (self.compiled
.contains(&unit
.pkg
.package_id()) && !unit
.mode
.is_doc())
1199 || (self.documented
.contains(&unit
.pkg
.package_id()) && unit
.mode
.is_doc())
1205 // Any dirty stage which runs at least one command gets printed as
1206 // being a compiled package.
1208 if unit
.mode
.is_doc() {
1209 self.documented
.insert(unit
.pkg
.package_id());
1210 config
.shell().status("Documenting", &unit
.pkg
)?
;
1211 } else if unit
.mode
.is_doc_test() {
1214 self.compiled
.insert(unit
.pkg
.package_id());
1215 if unit
.mode
.is_check() {
1216 config
.shell().status("Checking", &unit
.pkg
)?
;
1218 config
.shell().status("Compiling", &unit
.pkg
)?
;
1223 // If doc test are last, only print "Fresh" if nothing has been printed.
1224 if self.counts
[&unit
.pkg
.package_id()] == 0
1225 && !(unit
.mode
.is_doc_test() && self.compiled
.contains(&unit
.pkg
.package_id()))
1227 self.compiled
.insert(unit
.pkg
.package_id());
1228 config
.shell().verbose(|c
| c
.status("Fresh", &unit
.pkg
))?
;
1235 fn back_compat_notice(&self, cx
: &Context
<'_
, '_
>, unit
: &Unit
) -> CargoResult
<()> {
1236 if unit
.pkg
.name() != "diesel"
1237 || unit
.pkg
.version() >= &Version
::new(1, 4, 8)
1238 || cx
.bcx
.ws
.resolve_behavior() == ResolveBehavior
::V1
1239 || !unit
.pkg
.package_id().source_id().is_registry()
1240 || !unit
.features
.is_empty()
1248 .any(|unit
| unit
.pkg
.name() == "diesel" && !unit
.features
.is_empty())
1252 cx
.bcx
.config
.shell().note(
1254 This error may be due to an interaction between diesel and Cargo's new
1255 feature resolver. Try updating to diesel 1.4.8 to fix this error.
1262 impl ErrorsDuringDrain
{
1263 fn to_error(&self) -> Option
<anyhow
::Error
> {
1266 1 => Some(format_err
!("1 job failed")),
1267 n
=> Some(format_err
!("{} jobs failed", n
)),