]> git.proxmox.com Git - cargo.git/blob - src/cargo/core/compiler/job_queue.rs
sort the pending queue according to cost/priority
[cargo.git] / src / cargo / core / compiler / job_queue.rs
1 //! This module implements the job queue which determines the ordering in which
2 //! rustc is spawned off. It also manages the allocation of jobserver tokens to
3 //! rustc beyond the implicit token each rustc owns (i.e., the ones used for
4 //! parallel LLVM work and parallel rustc threads).
5 //!
6 //! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
7 //! other, which is due to scaling issues with sharing a single jobserver
8 //! amongst what is potentially hundreds of threads of work on many-cored
9 //! systems on (at least) linux, and likely other platforms as well.
10 //!
11 //! The details of this algorithm are (also) written out in
12 //! src/librustc_jobserver/lib.rs. What follows is a description focusing on the
13 //! Cargo side of things.
14 //!
15 //! Cargo wants to complete the build as quickly as possible, fully saturating
16 //! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
17 //! more than N threads of work: the total amount of tokens we have floating
18 //! around must always be limited to N.
19 //!
20 //! It is not really possible to optimally choose which crate should build first
21 //! or last; nor is it possible to decide whether to give an additional token to
22 //! rustc first or rather spawn a new crate of work. For now, the algorithm we
23 //! implement prioritizes spawning as many crates (i.e., rustc processes) as
24 //! possible, and then filling each rustc with tokens on demand.
25 //!
26 //! The primary loop is in `drain_the_queue` below.
27 //!
28 //! We integrate with the jobserver, originating from GNU make, to make sure
29 //! that build scripts which use make to build C code can cooperate with us on
30 //! the number of used tokens and avoid overfilling the system we're on.
31 //!
32 //! The jobserver is unfortunately a very simple protocol, so we enhance it a
33 //! little when we know that there is a rustc on the other end. Via the stderr
34 //! pipe we have to rustc, we get messages such as "NeedsToken" and
35 //! "ReleaseToken" from rustc.
36 //!
37 //! "NeedsToken" indicates that a rustc is interested in acquiring a token, but
38 //! never that it would be impossible to make progress without one (i.e., it
39 //! would be incorrect for rustc to not terminate due to an unfulfilled
40 //! NeedsToken request); we do not usually fulfill all NeedsToken requests for a
41 //! given rustc.
42 //!
43 //! "ReleaseToken" indicates that a rustc is done with one of its tokens and is
44 //! ready for us to re-acquire ownership -- we will either release that token
45 //! back into the general pool or reuse it ourselves. Note that rustc will
46 //! inform us that it is releasing a token even if it itself is also requesting
47 //! tokens; is is up to us whether to return the token to that same rustc.
48 //!
49 //! The current scheduling algorithm is relatively primitive and could likely be
50 //! improved.
51
52 use std::cell::{Cell, RefCell};
53 use std::collections::{BTreeMap, HashMap, HashSet};
54 use std::fmt::Write as _;
55 use std::io;
56 use std::marker;
57 use std::sync::Arc;
58 use std::thread::{self, Scope};
59 use std::time::Duration;
60
61 use anyhow::{format_err, Context as _};
62 use cargo_util::ProcessBuilder;
63 use jobserver::{Acquired, Client, HelperThread};
64 use log::{debug, trace};
65 use semver::Version;
66
67 use super::context::OutputFile;
68 use super::job::{
69 Freshness::{self, Dirty, Fresh},
70 Job,
71 };
72 use super::timings::Timings;
73 use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
74 use crate::core::compiler::future_incompat::{
75 self, FutureBreakageItem, FutureIncompatReportPackage,
76 };
77 use crate::core::resolver::ResolveBehavior;
78 use crate::core::{PackageId, Shell, TargetKind};
79 use crate::util::diagnostic_server::{self, DiagnosticPrinter};
80 use crate::util::errors::AlreadyPrintedError;
81 use crate::util::machine_message::{self, Message as _};
82 use crate::util::CargoResult;
83 use crate::util::{self, internal, profile};
84 use crate::util::{Config, DependencyQueue, Progress, ProgressStyle, Queue};
85
86 /// This structure is backed by the `DependencyQueue` type and manages the
87 /// queueing of compilation steps for each package. Packages enqueue units of
88 /// work and then later on the entire graph is converted to DrainState and
89 /// executed.
90 pub struct JobQueue<'cfg> {
91 queue: DependencyQueue<Unit, Artifact, Job>,
92 counts: HashMap<PackageId, usize>,
93 timings: Timings<'cfg>,
94 }
95
96 /// This structure is backed by the `DependencyQueue` type and manages the
97 /// actual compilation step of each package. Packages enqueue units of work and
98 /// then later on the entire graph is processed and compiled.
99 ///
100 /// It is created from JobQueue when we have fully assembled the crate graph
101 /// (i.e., all package dependencies are known).
102 ///
103 /// # Message queue
104 ///
105 /// Each thread running a process uses the message queue to send messages back
106 /// to the main thread. The main thread coordinates everything, and handles
107 /// printing output.
108 ///
109 /// It is important to be careful which messages use `push` vs `push_bounded`.
110 /// `push` is for priority messages (like tokens, or "finished") where the
111 /// sender shouldn't block. We want to handle those so real work can proceed
112 /// ASAP.
113 ///
114 /// `push_bounded` is only for messages being printed to stdout/stderr. Being
115 /// bounded prevents a flood of messages causing a large amount of memory
116 /// being used.
117 ///
118 /// `push` also avoids blocking which helps avoid deadlocks. For example, when
119 /// the diagnostic server thread is dropped, it waits for the thread to exit.
120 /// But if the thread is blocked on a full queue, and there is a critical
121 /// error, the drop will deadlock. This should be fixed at some point in the
122 /// future. The jobserver thread has a similar problem, though it will time
123 /// out after 1 second.
124 struct DrainState<'cfg> {
125 // This is the length of the DependencyQueue when starting out
126 total_units: usize,
127
128 queue: DependencyQueue<Unit, Artifact, Job>,
129 messages: Arc<Queue<Message>>,
130 /// Diagnostic deduplication support.
131 diag_dedupe: DiagDedupe<'cfg>,
132 /// Count of warnings, used to print a summary after the job succeeds.
133 ///
134 /// First value is the total number of warnings, and the second value is
135 /// the number that were suppressed because they were duplicates of a
136 /// previous warning.
137 warning_count: HashMap<JobId, (usize, usize)>,
138 active: HashMap<JobId, Unit>,
139 compiled: HashSet<PackageId>,
140 documented: HashSet<PackageId>,
141 counts: HashMap<PackageId, usize>,
142 progress: Progress<'cfg>,
143 next_id: u32,
144 timings: Timings<'cfg>,
145
146 /// Tokens that are currently owned by this Cargo, and may be "associated"
147 /// with a rustc process. They may also be unused, though if so will be
148 /// dropped on the next loop iteration.
149 ///
150 /// Note that the length of this may be zero, but we will still spawn work,
151 /// as we share the implicit token given to this Cargo process with a
152 /// single rustc process.
153 tokens: Vec<Acquired>,
154
155 /// rustc per-thread tokens, when in jobserver-per-rustc mode.
156 rustc_tokens: HashMap<JobId, Vec<Acquired>>,
157
158 /// This represents the list of rustc jobs (processes) and associated
159 /// clients that are interested in receiving a token.
160 to_send_clients: BTreeMap<JobId, Vec<Client>>,
161
162 /// The list of jobs that we have not yet started executing, but have
163 /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
164 /// allow us to request jobserver tokens pretty early.
165 pending_queue: Vec<(Unit, Job)>,
166 print: DiagnosticPrinter<'cfg>,
167
168 /// How many jobs we've finished
169 finished: usize,
170 per_package_future_incompat_reports: Vec<FutureIncompatReportPackage>,
171 }
172
173 pub struct ErrorsDuringDrain {
174 pub count: usize,
175 }
176
177 struct ErrorToHandle {
178 error: anyhow::Error,
179
180 /// This field is true for "interesting" errors and false for "mundane"
181 /// errors. If false, we print the above error only if it's the first one
182 /// encountered so far while draining the job queue.
183 ///
184 /// At most places that an error is propagated, we set this to false to
185 /// avoid scenarios where Cargo might end up spewing tons of redundant error
186 /// messages. For example if an i/o stream got closed somewhere, we don't
187 /// care about individually reporting every thread that it broke; just the
188 /// first is enough.
189 ///
190 /// The exception where print_always is true is that we do report every
191 /// instance of a rustc invocation that failed with diagnostics. This
192 /// corresponds to errors from Message::Finish.
193 print_always: bool,
194 }
195
196 impl<E> From<E> for ErrorToHandle
197 where
198 anyhow::Error: From<E>,
199 {
200 fn from(error: E) -> Self {
201 ErrorToHandle {
202 error: anyhow::Error::from(error),
203 print_always: false,
204 }
205 }
206 }
207
208 #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
209 pub struct JobId(pub u32);
210
211 impl std::fmt::Display for JobId {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 write!(f, "{}", self.0)
214 }
215 }
216
217 /// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
218 /// necessary to communicate between the main thread and the execution of the job.
219 ///
220 /// The job may execute on either a dedicated thread or the main thread. If the job executes on the
221 /// main thread, the `output` field must be set to prevent a deadlock.
222 pub struct JobState<'a, 'cfg> {
223 /// Channel back to the main thread to coordinate messages and such.
224 ///
225 /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
226 /// the message queue to prevent a deadlock.
227 messages: Arc<Queue<Message>>,
228
229 /// Normally output is sent to the job queue with backpressure. When the job is fresh
230 /// however we need to immediately display the output to prevent a deadlock as the
231 /// output messages are processed on the same thread as they are sent from. `output`
232 /// defines where to output in this case.
233 ///
234 /// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed
235 /// between threads. This means that it isn't possible for multiple output messages to be
236 /// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
237 /// interleaving is still prevented as the lock would be held for the whole printing of an
238 /// output message.
239 output: Option<&'a DiagDedupe<'cfg>>,
240
241 /// The job id that this state is associated with, used when sending
242 /// messages back to the main thread.
243 id: JobId,
244
245 /// Whether or not we're expected to have a call to `rmeta_produced`. Once
246 /// that method is called this is dynamically set to `false` to prevent
247 /// sending a double message later on.
248 rmeta_required: Cell<bool>,
249
250 // Historical versions of Cargo made use of the `'a` argument here, so to
251 // leave the door open to future refactorings keep it here.
252 _marker: marker::PhantomData<&'a ()>,
253 }
254
255 /// Handler for deduplicating diagnostics.
256 struct DiagDedupe<'cfg> {
257 seen: RefCell<HashSet<u64>>,
258 config: &'cfg Config,
259 }
260
261 impl<'cfg> DiagDedupe<'cfg> {
262 fn new(config: &'cfg Config) -> Self {
263 DiagDedupe {
264 seen: RefCell::new(HashSet::new()),
265 config,
266 }
267 }
268
269 /// Emits a diagnostic message.
270 ///
271 /// Returns `true` if the message was emitted, or `false` if it was
272 /// suppressed for being a duplicate.
273 fn emit_diag(&self, diag: &str) -> CargoResult<bool> {
274 let h = util::hash_u64(diag);
275 if !self.seen.borrow_mut().insert(h) {
276 return Ok(false);
277 }
278 let mut shell = self.config.shell();
279 shell.print_ansi_stderr(diag.as_bytes())?;
280 shell.err().write_all(b"\n")?;
281 Ok(true)
282 }
283 }
284
285 /// Possible artifacts that can be produced by compilations, used as edge values
286 /// in the dependency graph.
287 ///
288 /// As edge values we can have multiple kinds of edges depending on one node,
289 /// for example some units may only depend on the metadata for an rlib while
290 /// others depend on the full rlib. This `Artifact` enum is used to distinguish
291 /// this case and track the progress of compilations as they proceed.
292 #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
293 enum Artifact {
294 /// A generic placeholder for "depends on everything run by a step" and
295 /// means that we can't start the next compilation until the previous has
296 /// finished entirely.
297 All,
298
299 /// A node indicating that we only depend on the metadata of a compilation,
300 /// but the compilation is typically also producing an rlib. We can start
301 /// our step, however, before the full rlib is available.
302 Metadata,
303 }
304
305 enum Message {
306 Run(JobId, String),
307 BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
308 Stdout(String),
309 Stderr(String),
310 Diagnostic {
311 id: JobId,
312 level: String,
313 diag: String,
314 },
315 WarningCount {
316 id: JobId,
317 emitted: bool,
318 },
319 FixDiagnostic(diagnostic_server::Message),
320 Token(io::Result<Acquired>),
321 Finish(JobId, Artifact, CargoResult<()>),
322 FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
323
324 // This client should get release_raw called on it with one of our tokens
325 NeedsToken(JobId),
326
327 // A token previously passed to a NeedsToken client is being released.
328 ReleaseToken(JobId),
329 }
330
331 impl<'a, 'cfg> JobState<'a, 'cfg> {
332 pub fn running(&self, cmd: &ProcessBuilder) {
333 self.messages.push(Message::Run(self.id, cmd.to_string()));
334 }
335
336 pub fn build_plan(
337 &self,
338 module_name: String,
339 cmd: ProcessBuilder,
340 filenames: Arc<Vec<OutputFile>>,
341 ) {
342 self.messages
343 .push(Message::BuildPlanMsg(module_name, cmd, filenames));
344 }
345
346 pub fn stdout(&self, stdout: String) -> CargoResult<()> {
347 if let Some(dedupe) = self.output {
348 writeln!(dedupe.config.shell().out(), "{}", stdout)?;
349 } else {
350 self.messages.push_bounded(Message::Stdout(stdout));
351 }
352 Ok(())
353 }
354
355 pub fn stderr(&self, stderr: String) -> CargoResult<()> {
356 if let Some(dedupe) = self.output {
357 let mut shell = dedupe.config.shell();
358 shell.print_ansi_stderr(stderr.as_bytes())?;
359 shell.err().write_all(b"\n")?;
360 } else {
361 self.messages.push_bounded(Message::Stderr(stderr));
362 }
363 Ok(())
364 }
365
366 pub fn emit_diag(&self, level: String, diag: String) -> CargoResult<()> {
367 if let Some(dedupe) = self.output {
368 let emitted = dedupe.emit_diag(&diag)?;
369 if level == "warning" {
370 self.messages.push(Message::WarningCount {
371 id: self.id,
372 emitted,
373 });
374 }
375 } else {
376 self.messages.push_bounded(Message::Diagnostic {
377 id: self.id,
378 level,
379 diag,
380 });
381 }
382 Ok(())
383 }
384
385 /// A method used to signal to the coordinator thread that the rmeta file
386 /// for an rlib has been produced. This is only called for some rmeta
387 /// builds when required, and can be called at any time before a job ends.
388 /// This should only be called once because a metadata file can only be
389 /// produced once!
390 pub fn rmeta_produced(&self) {
391 self.rmeta_required.set(false);
392 self.messages
393 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
394 }
395
396 pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
397 self.messages
398 .push(Message::FutureIncompatReport(self.id, report));
399 }
400
401 /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
402 /// on the passed client.
403 ///
404 /// This should arrange for the associated client to eventually get a token via
405 /// `client.release_raw()`.
406 pub fn will_acquire(&self) {
407 self.messages.push(Message::NeedsToken(self.id));
408 }
409
410 /// The rustc underlying this Job is informing us that it is done with a jobserver token.
411 ///
412 /// Note that it does *not* write that token back anywhere.
413 pub fn release_token(&self) {
414 self.messages.push(Message::ReleaseToken(self.id));
415 }
416 }
417
418 impl<'cfg> JobQueue<'cfg> {
419 pub fn new(bcx: &BuildContext<'_, 'cfg>) -> JobQueue<'cfg> {
420 JobQueue {
421 queue: DependencyQueue::new(),
422 counts: HashMap::new(),
423 timings: Timings::new(bcx, &bcx.roots),
424 }
425 }
426
427 pub fn enqueue(&mut self, cx: &Context<'_, 'cfg>, unit: &Unit, job: Job) -> CargoResult<()> {
428 let dependencies = cx.unit_deps(unit);
429 let mut queue_deps = dependencies
430 .iter()
431 .filter(|dep| {
432 // Binaries aren't actually needed to *compile* tests, just to run
433 // them, so we don't include this dependency edge in the job graph.
434 (!dep.unit.target.is_test() && !dep.unit.target.is_bin())
435 || dep.unit.artifact.is_true()
436 })
437 .map(|dep| {
438 // Handle the case here where our `unit -> dep` dependency may
439 // only require the metadata, not the full compilation to
440 // finish. Use the tables in `cx` to figure out what kind
441 // of artifact is associated with this dependency.
442 let artifact = if cx.only_requires_rmeta(unit, &dep.unit) {
443 Artifact::Metadata
444 } else {
445 Artifact::All
446 };
447 (dep.unit.clone(), artifact)
448 })
449 .collect::<HashMap<_, _>>();
450
451 // This is somewhat tricky, but we may need to synthesize some
452 // dependencies for this target if it requires full upstream
453 // compilations to have completed. Because of pipelining, some
454 // dependency edges may be `Metadata` due to the above clause (as
455 // opposed to everything being `All`). For example consider:
456 //
457 // a (binary)
458 // â”” b (lib)
459 // â”” c (lib)
460 //
461 // Here the dependency edge from B to C will be `Metadata`, and the
462 // dependency edge from A to B will be `All`. For A to be compiled,
463 // however, it currently actually needs the full rlib of C. This means
464 // that we need to synthesize a dependency edge for the dependency graph
465 // from A to C. That's done here.
466 //
467 // This will walk all dependencies of the current target, and if any of
468 // *their* dependencies are `Metadata` then we depend on the `All` of
469 // the target as well. This should ensure that edges changed to
470 // `Metadata` propagate upwards `All` dependencies to anything that
471 // transitively contains the `Metadata` edge.
472 if unit.requires_upstream_objects() {
473 for dep in dependencies {
474 depend_on_deps_of_deps(cx, &mut queue_deps, dep.unit.clone());
475 }
476
477 fn depend_on_deps_of_deps(
478 cx: &Context<'_, '_>,
479 deps: &mut HashMap<Unit, Artifact>,
480 unit: Unit,
481 ) {
482 for dep in cx.unit_deps(&unit) {
483 if deps.insert(dep.unit.clone(), Artifact::All).is_none() {
484 depend_on_deps_of_deps(cx, deps, dep.unit.clone());
485 }
486 }
487 }
488 }
489
490 // For now we use a fixed placeholder value for the cost of each unit, but
491 // in the future this could be used to allow users to provide hints about
492 // relative expected costs of units, or this could be automatically set in
493 // a smarter way using timing data from a previous compilation.
494 self.queue.queue(unit.clone(), job, queue_deps, 100);
495 *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
496 Ok(())
497 }
498
499 /// Executes all jobs necessary to build the dependency graph.
500 ///
501 /// This function will spawn off `config.jobs()` workers to build all of the
502 /// necessary dependencies, in order. Freshness is propagated as far as
503 /// possible along each dependency chain.
504 pub fn execute(mut self, cx: &mut Context<'_, '_>, plan: &mut BuildPlan) -> CargoResult<()> {
505 let _p = profile::start("executing the job graph");
506 self.queue.queue_finished();
507
508 let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
509 let state = DrainState {
510 total_units: self.queue.len(),
511 queue: self.queue,
512 // 100 here is somewhat arbitrary. It is a few screenfulls of
513 // output, and hopefully at most a few megabytes of memory for
514 // typical messages. If you change this, please update the test
515 // caching_large_output, too.
516 messages: Arc::new(Queue::new(100)),
517 diag_dedupe: DiagDedupe::new(cx.bcx.config),
518 warning_count: HashMap::new(),
519 active: HashMap::new(),
520 compiled: HashSet::new(),
521 documented: HashSet::new(),
522 counts: self.counts,
523 progress,
524 next_id: 0,
525 timings: self.timings,
526 tokens: Vec::new(),
527 rustc_tokens: HashMap::new(),
528 to_send_clients: BTreeMap::new(),
529 pending_queue: Vec::new(),
530 print: DiagnosticPrinter::new(cx.bcx.config),
531 finished: 0,
532 per_package_future_incompat_reports: Vec::new(),
533 };
534
535 // Create a helper thread for acquiring jobserver tokens
536 let messages = state.messages.clone();
537 let helper = cx
538 .jobserver
539 .clone()
540 .into_helper_thread(move |token| {
541 messages.push(Message::Token(token));
542 })
543 .with_context(|| "failed to create helper thread for jobserver management")?;
544
545 // Create a helper thread to manage the diagnostics for rustfix if
546 // necessary.
547 let messages = state.messages.clone();
548 // It is important that this uses `push` instead of `push_bounded` for
549 // now. If someone wants to fix this to be bounded, the `drop`
550 // implementation needs to be changed to avoid possible deadlocks.
551 let _diagnostic_server = cx
552 .bcx
553 .build_config
554 .rustfix_diagnostic_server
555 .borrow_mut()
556 .take()
557 .map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
558
559 thread::scope(
560 move |scope| match state.drain_the_queue(cx, plan, scope, &helper) {
561 Some(err) => Err(err),
562 None => Ok(()),
563 },
564 )
565 }
566 }
567
568 impl<'cfg> DrainState<'cfg> {
569 fn spawn_work_if_possible<'s>(
570 &mut self,
571 cx: &mut Context<'_, '_>,
572 jobserver_helper: &HelperThread,
573 scope: &'s Scope<'s, '_>,
574 ) -> CargoResult<()> {
575 // Dequeue as much work as we can, learning about everything
576 // possible that can run. Note that this is also the point where we
577 // start requesting job tokens. Each job after the first needs to
578 // request a token.
579 while let Some((unit, job)) = self.queue.dequeue() {
580 self.pending_queue.push((unit, job));
581 if self.active.len() + self.pending_queue.len() > 1 {
582 jobserver_helper.request_token();
583 }
584 }
585
586 // If multiple pieces of work are waiting in the pending queue, we can
587 // sort it according to their priorities: higher priorities should be
588 // scheduled sooner.
589 self.pending_queue
590 .sort_by_cached_key(|(unit, _)| self.queue.priority(unit));
591
592 // Now that we've learned of all possible work that we can execute
593 // try to spawn it so long as we've got a jobserver token which says
594 // we're able to perform some parallel work.
595 // The `pending_queue` is sorted in ascending priority order, and we're
596 // removing the highest priority items from its end.
597 while self.has_extra_tokens() && !self.pending_queue.is_empty() {
598 let (unit, job) = self.pending_queue.pop().unwrap();
599 *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
600 if !cx.bcx.build_config.build_plan {
601 // Print out some nice progress information.
602 // NOTE: An error here will drop the job without starting it.
603 // That should be OK, since we want to exit as soon as
604 // possible during an error.
605 self.note_working_on(cx.bcx.config, &unit, job.freshness())?;
606 }
607 self.run(&unit, job, cx, scope);
608 }
609
610 Ok(())
611 }
612
613 fn has_extra_tokens(&self) -> bool {
614 self.active.len() < self.tokens.len() + 1
615 }
616
617 // The oldest job (i.e., least job ID) is the one we grant tokens to first.
618 fn pop_waiting_client(&mut self) -> (JobId, Client) {
619 // FIXME: replace this with BTreeMap::first_entry when that stabilizes.
620 let key = *self
621 .to_send_clients
622 .keys()
623 .next()
624 .expect("at least one waiter");
625 let clients = self.to_send_clients.get_mut(&key).unwrap();
626 let client = clients.pop().unwrap();
627 if clients.is_empty() {
628 self.to_send_clients.remove(&key);
629 }
630 (key, client)
631 }
632
633 // If we managed to acquire some extra tokens, send them off to a waiting rustc.
634 fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
635 while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
636 let (id, client) = self.pop_waiting_client();
637 // This unwrap is guaranteed to succeed. `active` must be at least
638 // length 1, as otherwise there can't be a client waiting to be sent
639 // on, so tokens.len() must also be at least one.
640 let token = self.tokens.pop().unwrap();
641 self.rustc_tokens
642 .entry(id)
643 .or_insert_with(Vec::new)
644 .push(token);
645 client
646 .release_raw()
647 .with_context(|| "failed to release jobserver token")?;
648 }
649
650 Ok(())
651 }
652
653 fn handle_event(
654 &mut self,
655 cx: &mut Context<'_, '_>,
656 jobserver_helper: &HelperThread,
657 plan: &mut BuildPlan,
658 event: Message,
659 ) -> Result<(), ErrorToHandle> {
660 match event {
661 Message::Run(id, cmd) => {
662 cx.bcx
663 .config
664 .shell()
665 .verbose(|c| c.status("Running", &cmd))?;
666 self.timings.unit_start(id, self.active[&id].clone());
667 }
668 Message::BuildPlanMsg(module_name, cmd, filenames) => {
669 plan.update(&module_name, &cmd, &filenames)?;
670 }
671 Message::Stdout(out) => {
672 writeln!(cx.bcx.config.shell().out(), "{}", out)?;
673 }
674 Message::Stderr(err) => {
675 let mut shell = cx.bcx.config.shell();
676 shell.print_ansi_stderr(err.as_bytes())?;
677 shell.err().write_all(b"\n")?;
678 }
679 Message::Diagnostic { id, level, diag } => {
680 let emitted = self.diag_dedupe.emit_diag(&diag)?;
681 if level == "warning" {
682 self.bump_warning_count(id, emitted);
683 }
684 }
685 Message::WarningCount { id, emitted } => {
686 self.bump_warning_count(id, emitted);
687 }
688 Message::FixDiagnostic(msg) => {
689 self.print.print(&msg)?;
690 }
691 Message::Finish(id, artifact, result) => {
692 let unit = match artifact {
693 // If `id` has completely finished we remove it
694 // from the `active` map ...
695 Artifact::All => {
696 trace!("end: {:?}", id);
697 self.finished += 1;
698 if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
699 // This puts back the tokens that this rustc
700 // acquired into our primary token list.
701 //
702 // This represents a rustc bug: it did not
703 // release all of its thread tokens but finished
704 // completely. But we want to make Cargo resilient
705 // to such rustc bugs, as they're generally not
706 // fatal in nature (i.e., Cargo can make progress
707 // still, and the build might not even fail).
708 self.tokens.extend(rustc_tokens);
709 }
710 self.to_send_clients.remove(&id);
711 self.report_warning_count(cx.bcx.config, id);
712 self.active.remove(&id).unwrap()
713 }
714 // ... otherwise if it hasn't finished we leave it
715 // in there as we'll get another `Finish` later on.
716 Artifact::Metadata => {
717 trace!("end (meta): {:?}", id);
718 self.active[&id].clone()
719 }
720 };
721 debug!("end ({:?}): {:?}", unit, result);
722 match result {
723 Ok(()) => self.finish(id, &unit, artifact, cx)?,
724 Err(error) => {
725 let msg = "The following warnings were emitted during compilation:";
726 self.emit_warnings(Some(msg), &unit, cx)?;
727 self.back_compat_notice(cx, &unit)?;
728 return Err(ErrorToHandle {
729 error,
730 print_always: true,
731 });
732 }
733 }
734 }
735 Message::FutureIncompatReport(id, items) => {
736 let package_id = self.active[&id].pkg.package_id();
737 self.per_package_future_incompat_reports
738 .push(FutureIncompatReportPackage { package_id, items });
739 }
740 Message::Token(acquired_token) => {
741 let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
742 self.tokens.push(token);
743 }
744 Message::NeedsToken(id) => {
745 trace!("queue token request");
746 jobserver_helper.request_token();
747 let client = cx.rustc_clients[&self.active[&id]].clone();
748 self.to_send_clients
749 .entry(id)
750 .or_insert_with(Vec::new)
751 .push(client);
752 }
753 Message::ReleaseToken(id) => {
754 // Note that this pops off potentially a completely
755 // different token, but all tokens of the same job are
756 // conceptually the same so that's fine.
757 //
758 // self.tokens is a "pool" -- the order doesn't matter -- and
759 // this transfers ownership of the token into that pool. If we
760 // end up using it on the next go around, then this token will
761 // be truncated, same as tokens obtained through Message::Token.
762 let rustc_tokens = self
763 .rustc_tokens
764 .get_mut(&id)
765 .expect("no tokens associated");
766 self.tokens
767 .push(rustc_tokens.pop().expect("rustc releases token it has"));
768 }
769 }
770
771 Ok(())
772 }
773
774 // This will also tick the progress bar as appropriate
775 fn wait_for_events(&mut self) -> Vec<Message> {
776 // Drain all events at once to avoid displaying the progress bar
777 // unnecessarily. If there's no events we actually block waiting for
778 // an event, but we keep a "heartbeat" going to allow `record_cpu`
779 // to run above to calculate CPU usage over time. To do this we
780 // listen for a message with a timeout, and on timeout we run the
781 // previous parts of the loop again.
782 let mut events = self.messages.try_pop_all();
783 trace!(
784 "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
785 self.tokens.len(),
786 self.rustc_tokens
787 .iter()
788 .map(|(k, j)| (k, j.len()))
789 .collect::<Vec<_>>(),
790 self.to_send_clients
791 .iter()
792 .map(|(k, j)| (k, j.len()))
793 .collect::<Vec<_>>(),
794 events.len(),
795 );
796 if events.is_empty() {
797 loop {
798 self.tick_progress();
799 self.tokens.truncate(self.active.len() - 1);
800 match self.messages.pop(Duration::from_millis(500)) {
801 Some(message) => {
802 events.push(message);
803 break;
804 }
805 None => continue,
806 }
807 }
808 }
809 events
810 }
811
812 /// This is the "main" loop, where Cargo does all work to run the
813 /// compiler.
814 ///
815 /// This returns an Option to prevent the use of `?` on `Result` types
816 /// because it is important for the loop to carefully handle errors.
817 fn drain_the_queue<'s>(
818 mut self,
819 cx: &mut Context<'_, '_>,
820 plan: &mut BuildPlan,
821 scope: &'s Scope<'s, '_>,
822 jobserver_helper: &HelperThread,
823 ) -> Option<anyhow::Error> {
824 trace!("queue: {:#?}", self.queue);
825
826 // Iteratively execute the entire dependency graph. Each turn of the
827 // loop starts out by scheduling as much work as possible (up to the
828 // maximum number of parallel jobs we have tokens for). A local queue
829 // is maintained separately from the main dependency queue as one
830 // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
831 // in one package).
832 //
833 // After a job has finished we update our internal state if it was
834 // successful and otherwise wait for pending work to finish if it failed
835 // and then immediately return (or keep going, if requested by the build
836 // config).
837 let mut errors = ErrorsDuringDrain { count: 0 };
838 // CAUTION! Do not use `?` or break out of the loop early. Every error
839 // must be handled in such a way that the loop is still allowed to
840 // drain event messages.
841 loop {
842 if errors.count == 0 || cx.bcx.build_config.keep_going {
843 if let Err(e) = self.spawn_work_if_possible(cx, jobserver_helper, scope) {
844 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
845 }
846 }
847
848 // If after all that we're not actually running anything then we're
849 // done!
850 if self.active.is_empty() {
851 break;
852 }
853
854 if let Err(e) = self.grant_rustc_token_requests() {
855 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
856 }
857
858 // And finally, before we block waiting for the next event, drop any
859 // excess tokens we may have accidentally acquired. Due to how our
860 // jobserver interface is architected we may acquire a token that we
861 // don't actually use, and if this happens just relinquish it back
862 // to the jobserver itself.
863 for event in self.wait_for_events() {
864 if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
865 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
866 }
867 }
868 }
869 self.progress.clear();
870
871 let profile_name = cx.bcx.build_config.requested_profile;
872 // NOTE: this may be a bit inaccurate, since this may not display the
873 // profile for what was actually built. Profile overrides can change
874 // these settings, and in some cases different targets are built with
875 // different profiles. To be accurate, it would need to collect a
876 // list of Units built, and maybe display a list of the different
877 // profiles used. However, to keep it simple and compatible with old
878 // behavior, we just display what the base profile is.
879 let profile = cx.bcx.profiles.base_profile();
880 let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
881 "unoptimized"
882 } else {
883 "optimized"
884 });
885 if profile.debuginfo.unwrap_or(0) != 0 {
886 opt_type += " + debuginfo";
887 }
888
889 let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
890 if let Err(e) = self.timings.finished(cx, &errors.to_error()) {
891 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
892 }
893 if cx.bcx.build_config.emit_json() {
894 let mut shell = cx.bcx.config.shell();
895 let msg = machine_message::BuildFinished {
896 success: errors.count == 0,
897 }
898 .to_json_string();
899 if let Err(e) = writeln!(shell.out(), "{}", msg) {
900 self.handle_error(&mut shell, &mut errors, e);
901 }
902 }
903
904 if let Some(error) = errors.to_error() {
905 // Any errors up to this point have already been printed via the
906 // `display_error` inside `handle_error`.
907 Some(anyhow::Error::new(AlreadyPrintedError::new(error)))
908 } else if self.queue.is_empty() && self.pending_queue.is_empty() {
909 let message = format!(
910 "{} [{}] target(s) in {}",
911 profile_name, opt_type, time_elapsed
912 );
913 if !cx.bcx.build_config.build_plan {
914 // It doesn't really matter if this fails.
915 drop(cx.bcx.config.shell().status("Finished", message));
916 future_incompat::save_and_display_report(
917 cx.bcx,
918 &self.per_package_future_incompat_reports,
919 );
920 }
921
922 None
923 } else {
924 debug!("queue: {:#?}", self.queue);
925 Some(internal("finished with jobs still left in the queue"))
926 }
927 }
928
929 fn handle_error(
930 &self,
931 shell: &mut Shell,
932 err_state: &mut ErrorsDuringDrain,
933 new_err: impl Into<ErrorToHandle>,
934 ) {
935 let new_err = new_err.into();
936 if new_err.print_always || err_state.count == 0 {
937 crate::display_error(&new_err.error, shell);
938 if err_state.count == 0 && !self.active.is_empty() {
939 drop(shell.warn("build failed, waiting for other jobs to finish..."));
940 }
941 err_state.count += 1;
942 } else {
943 log::warn!("{:?}", new_err.error);
944 }
945 }
946
947 // This also records CPU usage and marks concurrency; we roughly want to do
948 // this as often as we spin on the events receiver (at least every 500ms or
949 // so).
950 fn tick_progress(&mut self) {
951 // Record some timing information if `--timings` is enabled, and
952 // this'll end up being a noop if we're not recording this
953 // information.
954 self.timings.mark_concurrency(
955 self.active.len(),
956 self.pending_queue.len(),
957 self.queue.len(),
958 self.rustc_tokens.len(),
959 );
960 self.timings.record_cpu();
961
962 let active_names = self
963 .active
964 .values()
965 .map(|u| self.name_for_progress(u))
966 .collect::<Vec<_>>();
967 drop(self.progress.tick_now(
968 self.finished,
969 self.total_units,
970 &format!(": {}", active_names.join(", ")),
971 ));
972 }
973
974 fn name_for_progress(&self, unit: &Unit) -> String {
975 let pkg_name = unit.pkg.name();
976 let target_name = unit.target.name();
977 match unit.mode {
978 CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
979 CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
980 CompileMode::Test | CompileMode::Check { test: true } => match unit.target.kind() {
981 TargetKind::Lib(_) => format!("{}(test)", target_name),
982 TargetKind::CustomBuild => panic!("cannot test build script"),
983 TargetKind::Bin => format!("{}(bin test)", target_name),
984 TargetKind::Test => format!("{}(test)", target_name),
985 TargetKind::Bench => format!("{}(bench)", target_name),
986 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
987 format!("{}(example test)", target_name)
988 }
989 },
990 _ => match unit.target.kind() {
991 TargetKind::Lib(_) => pkg_name.to_string(),
992 TargetKind::CustomBuild => format!("{}(build.rs)", pkg_name),
993 TargetKind::Bin => format!("{}(bin)", target_name),
994 TargetKind::Test => format!("{}(test)", target_name),
995 TargetKind::Bench => format!("{}(bench)", target_name),
996 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
997 format!("{}(example)", target_name)
998 }
999 },
1000 }
1001 }
1002
1003 /// Executes a job.
1004 ///
1005 /// Fresh jobs block until finished (which should be very fast!), Dirty
1006 /// jobs will spawn a thread in the background and return immediately.
1007 fn run<'s>(&mut self, unit: &Unit, job: Job, cx: &Context<'_, '_>, scope: &'s Scope<'s, '_>) {
1008 let id = JobId(self.next_id);
1009 self.next_id = self.next_id.checked_add(1).unwrap();
1010
1011 debug!("start {}: {:?}", id, unit);
1012
1013 assert!(self.active.insert(id, unit.clone()).is_none());
1014
1015 let messages = self.messages.clone();
1016 let fresh = job.freshness();
1017 let rmeta_required = cx.rmeta_required(unit);
1018
1019 let doit = move |state: JobState<'_, '_>| {
1020 let mut sender = FinishOnDrop {
1021 messages: &state.messages,
1022 id,
1023 result: None,
1024 };
1025 sender.result = Some(job.run(&state));
1026
1027 // If the `rmeta_required` wasn't consumed but it was set
1028 // previously, then we either have:
1029 //
1030 // 1. The `job` didn't do anything because it was "fresh".
1031 // 2. The `job` returned an error and didn't reach the point where
1032 // it called `rmeta_produced`.
1033 // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
1034 //
1035 // Ruling out the third, the other two are pretty common for 2
1036 // we'll just naturally abort the compilation operation but for 1
1037 // we need to make sure that the metadata is flagged as produced so
1038 // send a synthetic message here.
1039 if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
1040 state
1041 .messages
1042 .push(Message::Finish(state.id, Artifact::Metadata, Ok(())));
1043 }
1044
1045 // Use a helper struct with a `Drop` implementation to guarantee
1046 // that a `Finish` message is sent even if our job panics. We
1047 // shouldn't panic unless there's a bug in Cargo, so we just need
1048 // to make sure nothing hangs by accident.
1049 struct FinishOnDrop<'a> {
1050 messages: &'a Queue<Message>,
1051 id: JobId,
1052 result: Option<CargoResult<()>>,
1053 }
1054
1055 impl Drop for FinishOnDrop<'_> {
1056 fn drop(&mut self) {
1057 let result = self
1058 .result
1059 .take()
1060 .unwrap_or_else(|| Err(format_err!("worker panicked")));
1061 self.messages
1062 .push(Message::Finish(self.id, Artifact::All, result));
1063 }
1064 }
1065 };
1066
1067 match fresh {
1068 Freshness::Fresh => {
1069 self.timings.add_fresh();
1070 // Running a fresh job on the same thread is often much faster than spawning a new
1071 // thread to run the job.
1072 doit(JobState {
1073 id,
1074 messages,
1075 output: Some(&self.diag_dedupe),
1076 rmeta_required: Cell::new(rmeta_required),
1077 _marker: marker::PhantomData,
1078 });
1079 }
1080 Freshness::Dirty => {
1081 self.timings.add_dirty();
1082 scope.spawn(move || {
1083 doit(JobState {
1084 id,
1085 messages: messages.clone(),
1086 output: None,
1087 rmeta_required: Cell::new(rmeta_required),
1088 _marker: marker::PhantomData,
1089 })
1090 });
1091 }
1092 }
1093 }
1094
1095 fn emit_warnings(
1096 &mut self,
1097 msg: Option<&str>,
1098 unit: &Unit,
1099 cx: &mut Context<'_, '_>,
1100 ) -> CargoResult<()> {
1101 let outputs = cx.build_script_outputs.lock().unwrap();
1102 let metadata = match cx.find_build_script_metadata(unit) {
1103 Some(metadata) => metadata,
1104 None => return Ok(()),
1105 };
1106 let bcx = &mut cx.bcx;
1107 if let Some(output) = outputs.get(metadata) {
1108 if !output.warnings.is_empty() {
1109 if let Some(msg) = msg {
1110 writeln!(bcx.config.shell().err(), "{}\n", msg)?;
1111 }
1112
1113 for warning in output.warnings.iter() {
1114 bcx.config.shell().warn(warning)?;
1115 }
1116
1117 if msg.is_some() {
1118 // Output an empty line.
1119 writeln!(bcx.config.shell().err())?;
1120 }
1121 }
1122 }
1123
1124 Ok(())
1125 }
1126
1127 fn bump_warning_count(&mut self, id: JobId, emitted: bool) {
1128 let cnts = self.warning_count.entry(id).or_default();
1129 cnts.0 += 1;
1130 if !emitted {
1131 cnts.1 += 1;
1132 }
1133 }
1134
1135 /// Displays a final report of the warnings emitted by a particular job.
1136 fn report_warning_count(&mut self, config: &Config, id: JobId) {
1137 let count = match self.warning_count.remove(&id) {
1138 Some(count) => count,
1139 None => return,
1140 };
1141 let unit = &self.active[&id];
1142 let mut message = format!("`{}` ({}", unit.pkg.name(), unit.target.description_named());
1143 if unit.mode.is_rustc_test() && !(unit.target.is_test() || unit.target.is_bench()) {
1144 message.push_str(" test");
1145 } else if unit.mode.is_doc_test() {
1146 message.push_str(" doctest");
1147 } else if unit.mode.is_doc() {
1148 message.push_str(" doc");
1149 }
1150 message.push_str(") generated ");
1151 match count.0 {
1152 1 => message.push_str("1 warning"),
1153 n => drop(write!(message, "{} warnings", n)),
1154 };
1155 match count.1 {
1156 0 => {}
1157 1 => message.push_str(" (1 duplicate)"),
1158 n => drop(write!(message, " ({} duplicates)", n)),
1159 }
1160 // Errors are ignored here because it is tricky to handle them
1161 // correctly, and they aren't important.
1162 drop(config.shell().warn(message));
1163 }
1164
1165 fn finish(
1166 &mut self,
1167 id: JobId,
1168 unit: &Unit,
1169 artifact: Artifact,
1170 cx: &mut Context<'_, '_>,
1171 ) -> CargoResult<()> {
1172 if unit.mode.is_run_custom_build() && unit.show_warnings(cx.bcx.config) {
1173 self.emit_warnings(None, unit, cx)?;
1174 }
1175 let unlocked = self.queue.finish(unit, &artifact);
1176 match artifact {
1177 Artifact::All => self.timings.unit_finished(id, unlocked),
1178 Artifact::Metadata => self.timings.unit_rmeta_finished(id, unlocked),
1179 }
1180 Ok(())
1181 }
1182
1183 // This isn't super trivial because we don't want to print loads and
1184 // loads of information to the console, but we also want to produce a
1185 // faithful representation of what's happening. This is somewhat nuanced
1186 // as a package can start compiling *very* early on because of custom
1187 // build commands and such.
1188 //
1189 // In general, we try to print "Compiling" for the first nontrivial task
1190 // run for a package, regardless of when that is. We then don't print
1191 // out any more information for a package after we've printed it once.
1192 fn note_working_on(
1193 &mut self,
1194 config: &Config,
1195 unit: &Unit,
1196 fresh: Freshness,
1197 ) -> CargoResult<()> {
1198 if (self.compiled.contains(&unit.pkg.package_id()) && !unit.mode.is_doc())
1199 || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
1200 {
1201 return Ok(());
1202 }
1203
1204 match fresh {
1205 // Any dirty stage which runs at least one command gets printed as
1206 // being a compiled package.
1207 Dirty => {
1208 if unit.mode.is_doc() {
1209 self.documented.insert(unit.pkg.package_id());
1210 config.shell().status("Documenting", &unit.pkg)?;
1211 } else if unit.mode.is_doc_test() {
1212 // Skip doc test.
1213 } else {
1214 self.compiled.insert(unit.pkg.package_id());
1215 if unit.mode.is_check() {
1216 config.shell().status("Checking", &unit.pkg)?;
1217 } else {
1218 config.shell().status("Compiling", &unit.pkg)?;
1219 }
1220 }
1221 }
1222 Fresh => {
1223 // If doc test are last, only print "Fresh" if nothing has been printed.
1224 if self.counts[&unit.pkg.package_id()] == 0
1225 && !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
1226 {
1227 self.compiled.insert(unit.pkg.package_id());
1228 config.shell().verbose(|c| c.status("Fresh", &unit.pkg))?;
1229 }
1230 }
1231 }
1232 Ok(())
1233 }
1234
1235 fn back_compat_notice(&self, cx: &Context<'_, '_>, unit: &Unit) -> CargoResult<()> {
1236 if unit.pkg.name() != "diesel"
1237 || unit.pkg.version() >= &Version::new(1, 4, 8)
1238 || cx.bcx.ws.resolve_behavior() == ResolveBehavior::V1
1239 || !unit.pkg.package_id().source_id().is_registry()
1240 || !unit.features.is_empty()
1241 {
1242 return Ok(());
1243 }
1244 if !cx
1245 .bcx
1246 .unit_graph
1247 .keys()
1248 .any(|unit| unit.pkg.name() == "diesel" && !unit.features.is_empty())
1249 {
1250 return Ok(());
1251 }
1252 cx.bcx.config.shell().note(
1253 "\
1254 This error may be due to an interaction between diesel and Cargo's new
1255 feature resolver. Try updating to diesel 1.4.8 to fix this error.
1256 ",
1257 )?;
1258 Ok(())
1259 }
1260 }
1261
1262 impl ErrorsDuringDrain {
1263 fn to_error(&self) -> Option<anyhow::Error> {
1264 match self.count {
1265 0 => None,
1266 1 => Some(format_err!("1 job failed")),
1267 n => Some(format_err!("{} jobs failed", n)),
1268 }
1269 }
1270 }