]> git.proxmox.com Git - cargo.git/blob - src/cargo/core/compiler/job_queue.rs
store jobs priorities in the pending queue
[cargo.git] / src / cargo / core / compiler / job_queue.rs
1 //! This module implements the job queue which determines the ordering in which
2 //! rustc is spawned off. It also manages the allocation of jobserver tokens to
3 //! rustc beyond the implicit token each rustc owns (i.e., the ones used for
4 //! parallel LLVM work and parallel rustc threads).
5 //!
6 //! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
7 //! other, which is due to scaling issues with sharing a single jobserver
8 //! amongst what is potentially hundreds of threads of work on many-cored
9 //! systems on (at least) linux, and likely other platforms as well.
10 //!
11 //! The details of this algorithm are (also) written out in
12 //! src/librustc_jobserver/lib.rs. What follows is a description focusing on the
13 //! Cargo side of things.
14 //!
15 //! Cargo wants to complete the build as quickly as possible, fully saturating
16 //! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
17 //! more than N threads of work: the total amount of tokens we have floating
18 //! around must always be limited to N.
19 //!
20 //! It is not really possible to optimally choose which crate should build first
21 //! or last; nor is it possible to decide whether to give an additional token to
22 //! rustc first or rather spawn a new crate of work. For now, the algorithm we
23 //! implement prioritizes spawning as many crates (i.e., rustc processes) as
24 //! possible, and then filling each rustc with tokens on demand.
25 //!
26 //! The primary loop is in `drain_the_queue` below.
27 //!
28 //! We integrate with the jobserver, originating from GNU make, to make sure
29 //! that build scripts which use make to build C code can cooperate with us on
30 //! the number of used tokens and avoid overfilling the system we're on.
31 //!
32 //! The jobserver is unfortunately a very simple protocol, so we enhance it a
33 //! little when we know that there is a rustc on the other end. Via the stderr
34 //! pipe we have to rustc, we get messages such as "NeedsToken" and
35 //! "ReleaseToken" from rustc.
36 //!
37 //! "NeedsToken" indicates that a rustc is interested in acquiring a token, but
38 //! never that it would be impossible to make progress without one (i.e., it
39 //! would be incorrect for rustc to not terminate due to an unfulfilled
40 //! NeedsToken request); we do not usually fulfill all NeedsToken requests for a
41 //! given rustc.
42 //!
43 //! "ReleaseToken" indicates that a rustc is done with one of its tokens and is
44 //! ready for us to re-acquire ownership -- we will either release that token
45 //! back into the general pool or reuse it ourselves. Note that rustc will
46 //! inform us that it is releasing a token even if it itself is also requesting
47 //! tokens; is is up to us whether to return the token to that same rustc.
48 //!
49 //! The current scheduling algorithm is relatively primitive and could likely be
50 //! improved.
51
52 use std::cell::{Cell, RefCell};
53 use std::collections::{BTreeMap, HashMap, HashSet};
54 use std::fmt::Write as _;
55 use std::io;
56 use std::marker;
57 use std::sync::Arc;
58 use std::thread::{self, Scope};
59 use std::time::Duration;
60
61 use anyhow::{format_err, Context as _};
62 use cargo_util::ProcessBuilder;
63 use jobserver::{Acquired, Client, HelperThread};
64 use log::{debug, trace};
65 use semver::Version;
66
67 use super::context::OutputFile;
68 use super::job::{
69 Freshness::{self, Dirty, Fresh},
70 Job,
71 };
72 use super::timings::Timings;
73 use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
74 use crate::core::compiler::future_incompat::{
75 self, FutureBreakageItem, FutureIncompatReportPackage,
76 };
77 use crate::core::resolver::ResolveBehavior;
78 use crate::core::{PackageId, Shell, TargetKind};
79 use crate::util::diagnostic_server::{self, DiagnosticPrinter};
80 use crate::util::errors::AlreadyPrintedError;
81 use crate::util::machine_message::{self, Message as _};
82 use crate::util::CargoResult;
83 use crate::util::{self, internal, profile};
84 use crate::util::{Config, DependencyQueue, Progress, ProgressStyle, Queue};
85
86 /// This structure is backed by the `DependencyQueue` type and manages the
87 /// queueing of compilation steps for each package. Packages enqueue units of
88 /// work and then later on the entire graph is converted to DrainState and
89 /// executed.
90 pub struct JobQueue<'cfg> {
91 queue: DependencyQueue<Unit, Artifact, Job>,
92 counts: HashMap<PackageId, usize>,
93 timings: Timings<'cfg>,
94 }
95
96 /// This structure is backed by the `DependencyQueue` type and manages the
97 /// actual compilation step of each package. Packages enqueue units of work and
98 /// then later on the entire graph is processed and compiled.
99 ///
100 /// It is created from JobQueue when we have fully assembled the crate graph
101 /// (i.e., all package dependencies are known).
102 ///
103 /// # Message queue
104 ///
105 /// Each thread running a process uses the message queue to send messages back
106 /// to the main thread. The main thread coordinates everything, and handles
107 /// printing output.
108 ///
109 /// It is important to be careful which messages use `push` vs `push_bounded`.
110 /// `push` is for priority messages (like tokens, or "finished") where the
111 /// sender shouldn't block. We want to handle those so real work can proceed
112 /// ASAP.
113 ///
114 /// `push_bounded` is only for messages being printed to stdout/stderr. Being
115 /// bounded prevents a flood of messages causing a large amount of memory
116 /// being used.
117 ///
118 /// `push` also avoids blocking which helps avoid deadlocks. For example, when
119 /// the diagnostic server thread is dropped, it waits for the thread to exit.
120 /// But if the thread is blocked on a full queue, and there is a critical
121 /// error, the drop will deadlock. This should be fixed at some point in the
122 /// future. The jobserver thread has a similar problem, though it will time
123 /// out after 1 second.
124 struct DrainState<'cfg> {
125 // This is the length of the DependencyQueue when starting out
126 total_units: usize,
127
128 queue: DependencyQueue<Unit, Artifact, Job>,
129 messages: Arc<Queue<Message>>,
130 /// Diagnostic deduplication support.
131 diag_dedupe: DiagDedupe<'cfg>,
132 /// Count of warnings, used to print a summary after the job succeeds.
133 ///
134 /// First value is the total number of warnings, and the second value is
135 /// the number that were suppressed because they were duplicates of a
136 /// previous warning.
137 warning_count: HashMap<JobId, (usize, usize)>,
138 active: HashMap<JobId, Unit>,
139 compiled: HashSet<PackageId>,
140 documented: HashSet<PackageId>,
141 counts: HashMap<PackageId, usize>,
142 progress: Progress<'cfg>,
143 next_id: u32,
144 timings: Timings<'cfg>,
145
146 /// Tokens that are currently owned by this Cargo, and may be "associated"
147 /// with a rustc process. They may also be unused, though if so will be
148 /// dropped on the next loop iteration.
149 ///
150 /// Note that the length of this may be zero, but we will still spawn work,
151 /// as we share the implicit token given to this Cargo process with a
152 /// single rustc process.
153 tokens: Vec<Acquired>,
154
155 /// rustc per-thread tokens, when in jobserver-per-rustc mode.
156 rustc_tokens: HashMap<JobId, Vec<Acquired>>,
157
158 /// This represents the list of rustc jobs (processes) and associated
159 /// clients that are interested in receiving a token.
160 to_send_clients: BTreeMap<JobId, Vec<Client>>,
161
162 /// The list of jobs that we have not yet started executing, but have
163 /// retrieved from the `queue`. We eagerly pull jobs off the main queue to
164 /// allow us to request jobserver tokens pretty early.
165 pending_queue: Vec<(Unit, Job, usize)>,
166 print: DiagnosticPrinter<'cfg>,
167
168 /// How many jobs we've finished
169 finished: usize,
170 per_package_future_incompat_reports: Vec<FutureIncompatReportPackage>,
171 }
172
173 pub struct ErrorsDuringDrain {
174 pub count: usize,
175 }
176
177 struct ErrorToHandle {
178 error: anyhow::Error,
179
180 /// This field is true for "interesting" errors and false for "mundane"
181 /// errors. If false, we print the above error only if it's the first one
182 /// encountered so far while draining the job queue.
183 ///
184 /// At most places that an error is propagated, we set this to false to
185 /// avoid scenarios where Cargo might end up spewing tons of redundant error
186 /// messages. For example if an i/o stream got closed somewhere, we don't
187 /// care about individually reporting every thread that it broke; just the
188 /// first is enough.
189 ///
190 /// The exception where print_always is true is that we do report every
191 /// instance of a rustc invocation that failed with diagnostics. This
192 /// corresponds to errors from Message::Finish.
193 print_always: bool,
194 }
195
196 impl<E> From<E> for ErrorToHandle
197 where
198 anyhow::Error: From<E>,
199 {
200 fn from(error: E) -> Self {
201 ErrorToHandle {
202 error: anyhow::Error::from(error),
203 print_always: false,
204 }
205 }
206 }
207
208 #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
209 pub struct JobId(pub u32);
210
211 impl std::fmt::Display for JobId {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 write!(f, "{}", self.0)
214 }
215 }
216
217 /// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
218 /// necessary to communicate between the main thread and the execution of the job.
219 ///
220 /// The job may execute on either a dedicated thread or the main thread. If the job executes on the
221 /// main thread, the `output` field must be set to prevent a deadlock.
222 pub struct JobState<'a, 'cfg> {
223 /// Channel back to the main thread to coordinate messages and such.
224 ///
225 /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
226 /// the message queue to prevent a deadlock.
227 messages: Arc<Queue<Message>>,
228
229 /// Normally output is sent to the job queue with backpressure. When the job is fresh
230 /// however we need to immediately display the output to prevent a deadlock as the
231 /// output messages are processed on the same thread as they are sent from. `output`
232 /// defines where to output in this case.
233 ///
234 /// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed
235 /// between threads. This means that it isn't possible for multiple output messages to be
236 /// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
237 /// interleaving is still prevented as the lock would be held for the whole printing of an
238 /// output message.
239 output: Option<&'a DiagDedupe<'cfg>>,
240
241 /// The job id that this state is associated with, used when sending
242 /// messages back to the main thread.
243 id: JobId,
244
245 /// Whether or not we're expected to have a call to `rmeta_produced`. Once
246 /// that method is called this is dynamically set to `false` to prevent
247 /// sending a double message later on.
248 rmeta_required: Cell<bool>,
249
250 // Historical versions of Cargo made use of the `'a` argument here, so to
251 // leave the door open to future refactorings keep it here.
252 _marker: marker::PhantomData<&'a ()>,
253 }
254
255 /// Handler for deduplicating diagnostics.
256 struct DiagDedupe<'cfg> {
257 seen: RefCell<HashSet<u64>>,
258 config: &'cfg Config,
259 }
260
261 impl<'cfg> DiagDedupe<'cfg> {
262 fn new(config: &'cfg Config) -> Self {
263 DiagDedupe {
264 seen: RefCell::new(HashSet::new()),
265 config,
266 }
267 }
268
269 /// Emits a diagnostic message.
270 ///
271 /// Returns `true` if the message was emitted, or `false` if it was
272 /// suppressed for being a duplicate.
273 fn emit_diag(&self, diag: &str) -> CargoResult<bool> {
274 let h = util::hash_u64(diag);
275 if !self.seen.borrow_mut().insert(h) {
276 return Ok(false);
277 }
278 let mut shell = self.config.shell();
279 shell.print_ansi_stderr(diag.as_bytes())?;
280 shell.err().write_all(b"\n")?;
281 Ok(true)
282 }
283 }
284
285 /// Possible artifacts that can be produced by compilations, used as edge values
286 /// in the dependency graph.
287 ///
288 /// As edge values we can have multiple kinds of edges depending on one node,
289 /// for example some units may only depend on the metadata for an rlib while
290 /// others depend on the full rlib. This `Artifact` enum is used to distinguish
291 /// this case and track the progress of compilations as they proceed.
292 #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
293 enum Artifact {
294 /// A generic placeholder for "depends on everything run by a step" and
295 /// means that we can't start the next compilation until the previous has
296 /// finished entirely.
297 All,
298
299 /// A node indicating that we only depend on the metadata of a compilation,
300 /// but the compilation is typically also producing an rlib. We can start
301 /// our step, however, before the full rlib is available.
302 Metadata,
303 }
304
305 enum Message {
306 Run(JobId, String),
307 BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
308 Stdout(String),
309 Stderr(String),
310 Diagnostic {
311 id: JobId,
312 level: String,
313 diag: String,
314 },
315 WarningCount {
316 id: JobId,
317 emitted: bool,
318 },
319 FixDiagnostic(diagnostic_server::Message),
320 Token(io::Result<Acquired>),
321 Finish(JobId, Artifact, CargoResult<()>),
322 FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
323
324 // This client should get release_raw called on it with one of our tokens
325 NeedsToken(JobId),
326
327 // A token previously passed to a NeedsToken client is being released.
328 ReleaseToken(JobId),
329 }
330
331 impl<'a, 'cfg> JobState<'a, 'cfg> {
332 pub fn running(&self, cmd: &ProcessBuilder) {
333 self.messages.push(Message::Run(self.id, cmd.to_string()));
334 }
335
336 pub fn build_plan(
337 &self,
338 module_name: String,
339 cmd: ProcessBuilder,
340 filenames: Arc<Vec<OutputFile>>,
341 ) {
342 self.messages
343 .push(Message::BuildPlanMsg(module_name, cmd, filenames));
344 }
345
346 pub fn stdout(&self, stdout: String) -> CargoResult<()> {
347 if let Some(dedupe) = self.output {
348 writeln!(dedupe.config.shell().out(), "{}", stdout)?;
349 } else {
350 self.messages.push_bounded(Message::Stdout(stdout));
351 }
352 Ok(())
353 }
354
355 pub fn stderr(&self, stderr: String) -> CargoResult<()> {
356 if let Some(dedupe) = self.output {
357 let mut shell = dedupe.config.shell();
358 shell.print_ansi_stderr(stderr.as_bytes())?;
359 shell.err().write_all(b"\n")?;
360 } else {
361 self.messages.push_bounded(Message::Stderr(stderr));
362 }
363 Ok(())
364 }
365
366 pub fn emit_diag(&self, level: String, diag: String) -> CargoResult<()> {
367 if let Some(dedupe) = self.output {
368 let emitted = dedupe.emit_diag(&diag)?;
369 if level == "warning" {
370 self.messages.push(Message::WarningCount {
371 id: self.id,
372 emitted,
373 });
374 }
375 } else {
376 self.messages.push_bounded(Message::Diagnostic {
377 id: self.id,
378 level,
379 diag,
380 });
381 }
382 Ok(())
383 }
384
385 /// A method used to signal to the coordinator thread that the rmeta file
386 /// for an rlib has been produced. This is only called for some rmeta
387 /// builds when required, and can be called at any time before a job ends.
388 /// This should only be called once because a metadata file can only be
389 /// produced once!
390 pub fn rmeta_produced(&self) {
391 self.rmeta_required.set(false);
392 self.messages
393 .push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
394 }
395
396 pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
397 self.messages
398 .push(Message::FutureIncompatReport(self.id, report));
399 }
400
401 /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
402 /// on the passed client.
403 ///
404 /// This should arrange for the associated client to eventually get a token via
405 /// `client.release_raw()`.
406 pub fn will_acquire(&self) {
407 self.messages.push(Message::NeedsToken(self.id));
408 }
409
410 /// The rustc underlying this Job is informing us that it is done with a jobserver token.
411 ///
412 /// Note that it does *not* write that token back anywhere.
413 pub fn release_token(&self) {
414 self.messages.push(Message::ReleaseToken(self.id));
415 }
416 }
417
418 impl<'cfg> JobQueue<'cfg> {
419 pub fn new(bcx: &BuildContext<'_, 'cfg>) -> JobQueue<'cfg> {
420 JobQueue {
421 queue: DependencyQueue::new(),
422 counts: HashMap::new(),
423 timings: Timings::new(bcx, &bcx.roots),
424 }
425 }
426
427 pub fn enqueue(&mut self, cx: &Context<'_, 'cfg>, unit: &Unit, job: Job) -> CargoResult<()> {
428 let dependencies = cx.unit_deps(unit);
429 let mut queue_deps = dependencies
430 .iter()
431 .filter(|dep| {
432 // Binaries aren't actually needed to *compile* tests, just to run
433 // them, so we don't include this dependency edge in the job graph.
434 (!dep.unit.target.is_test() && !dep.unit.target.is_bin())
435 || dep.unit.artifact.is_true()
436 })
437 .map(|dep| {
438 // Handle the case here where our `unit -> dep` dependency may
439 // only require the metadata, not the full compilation to
440 // finish. Use the tables in `cx` to figure out what kind
441 // of artifact is associated with this dependency.
442 let artifact = if cx.only_requires_rmeta(unit, &dep.unit) {
443 Artifact::Metadata
444 } else {
445 Artifact::All
446 };
447 (dep.unit.clone(), artifact)
448 })
449 .collect::<HashMap<_, _>>();
450
451 // This is somewhat tricky, but we may need to synthesize some
452 // dependencies for this target if it requires full upstream
453 // compilations to have completed. Because of pipelining, some
454 // dependency edges may be `Metadata` due to the above clause (as
455 // opposed to everything being `All`). For example consider:
456 //
457 // a (binary)
458 // â”” b (lib)
459 // â”” c (lib)
460 //
461 // Here the dependency edge from B to C will be `Metadata`, and the
462 // dependency edge from A to B will be `All`. For A to be compiled,
463 // however, it currently actually needs the full rlib of C. This means
464 // that we need to synthesize a dependency edge for the dependency graph
465 // from A to C. That's done here.
466 //
467 // This will walk all dependencies of the current target, and if any of
468 // *their* dependencies are `Metadata` then we depend on the `All` of
469 // the target as well. This should ensure that edges changed to
470 // `Metadata` propagate upwards `All` dependencies to anything that
471 // transitively contains the `Metadata` edge.
472 if unit.requires_upstream_objects() {
473 for dep in dependencies {
474 depend_on_deps_of_deps(cx, &mut queue_deps, dep.unit.clone());
475 }
476
477 fn depend_on_deps_of_deps(
478 cx: &Context<'_, '_>,
479 deps: &mut HashMap<Unit, Artifact>,
480 unit: Unit,
481 ) {
482 for dep in cx.unit_deps(&unit) {
483 if deps.insert(dep.unit.clone(), Artifact::All).is_none() {
484 depend_on_deps_of_deps(cx, deps, dep.unit.clone());
485 }
486 }
487 }
488 }
489
490 // For now we use a fixed placeholder value for the cost of each unit, but
491 // in the future this could be used to allow users to provide hints about
492 // relative expected costs of units, or this could be automatically set in
493 // a smarter way using timing data from a previous compilation.
494 self.queue.queue(unit.clone(), job, queue_deps, 100);
495 *self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
496 Ok(())
497 }
498
499 /// Executes all jobs necessary to build the dependency graph.
500 ///
501 /// This function will spawn off `config.jobs()` workers to build all of the
502 /// necessary dependencies, in order. Freshness is propagated as far as
503 /// possible along each dependency chain.
504 pub fn execute(mut self, cx: &mut Context<'_, '_>, plan: &mut BuildPlan) -> CargoResult<()> {
505 let _p = profile::start("executing the job graph");
506 self.queue.queue_finished();
507
508 let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
509 let state = DrainState {
510 total_units: self.queue.len(),
511 queue: self.queue,
512 // 100 here is somewhat arbitrary. It is a few screenfulls of
513 // output, and hopefully at most a few megabytes of memory for
514 // typical messages. If you change this, please update the test
515 // caching_large_output, too.
516 messages: Arc::new(Queue::new(100)),
517 diag_dedupe: DiagDedupe::new(cx.bcx.config),
518 warning_count: HashMap::new(),
519 active: HashMap::new(),
520 compiled: HashSet::new(),
521 documented: HashSet::new(),
522 counts: self.counts,
523 progress,
524 next_id: 0,
525 timings: self.timings,
526 tokens: Vec::new(),
527 rustc_tokens: HashMap::new(),
528 to_send_clients: BTreeMap::new(),
529 pending_queue: Vec::new(),
530 print: DiagnosticPrinter::new(cx.bcx.config),
531 finished: 0,
532 per_package_future_incompat_reports: Vec::new(),
533 };
534
535 // Create a helper thread for acquiring jobserver tokens
536 let messages = state.messages.clone();
537 let helper = cx
538 .jobserver
539 .clone()
540 .into_helper_thread(move |token| {
541 messages.push(Message::Token(token));
542 })
543 .with_context(|| "failed to create helper thread for jobserver management")?;
544
545 // Create a helper thread to manage the diagnostics for rustfix if
546 // necessary.
547 let messages = state.messages.clone();
548 // It is important that this uses `push` instead of `push_bounded` for
549 // now. If someone wants to fix this to be bounded, the `drop`
550 // implementation needs to be changed to avoid possible deadlocks.
551 let _diagnostic_server = cx
552 .bcx
553 .build_config
554 .rustfix_diagnostic_server
555 .borrow_mut()
556 .take()
557 .map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));
558
559 thread::scope(
560 move |scope| match state.drain_the_queue(cx, plan, scope, &helper) {
561 Some(err) => Err(err),
562 None => Ok(()),
563 },
564 )
565 }
566 }
567
568 impl<'cfg> DrainState<'cfg> {
569 fn spawn_work_if_possible<'s>(
570 &mut self,
571 cx: &mut Context<'_, '_>,
572 jobserver_helper: &HelperThread,
573 scope: &'s Scope<'s, '_>,
574 ) -> CargoResult<()> {
575 // Dequeue as much work as we can, learning about everything
576 // possible that can run. Note that this is also the point where we
577 // start requesting job tokens. Each job after the first needs to
578 // request a token.
579 while let Some((unit, job, priority)) = self.queue.dequeue() {
580 // We want to keep the pieces of work in the `pending_queue` sorted
581 // by their priorities, and insert the current job at its correctly
582 // sorted position: following the lower priority jobs, and the ones
583 // with the same priority (since they were dequeued before the
584 // current one, we also keep that relation).
585 let idx = self
586 .pending_queue
587 .partition_point(|&(_, _, p)| p <= priority);
588 self.pending_queue.insert(idx, (unit, job, priority));
589 if self.active.len() + self.pending_queue.len() > 1 {
590 jobserver_helper.request_token();
591 }
592 }
593
594 // Now that we've learned of all possible work that we can execute
595 // try to spawn it so long as we've got a jobserver token which says
596 // we're able to perform some parallel work.
597 // The `pending_queue` is sorted in ascending priority order, and we
598 // remove items from its end to schedule the highest priority items
599 // sooner.
600 while self.has_extra_tokens() && !self.pending_queue.is_empty() {
601 let (unit, job, _) = self.pending_queue.pop().unwrap();
602 *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
603 if !cx.bcx.build_config.build_plan {
604 // Print out some nice progress information.
605 // NOTE: An error here will drop the job without starting it.
606 // That should be OK, since we want to exit as soon as
607 // possible during an error.
608 self.note_working_on(cx.bcx.config, &unit, job.freshness())?;
609 }
610 self.run(&unit, job, cx, scope);
611 }
612
613 Ok(())
614 }
615
616 fn has_extra_tokens(&self) -> bool {
617 self.active.len() < self.tokens.len() + 1
618 }
619
620 // The oldest job (i.e., least job ID) is the one we grant tokens to first.
621 fn pop_waiting_client(&mut self) -> (JobId, Client) {
622 // FIXME: replace this with BTreeMap::first_entry when that stabilizes.
623 let key = *self
624 .to_send_clients
625 .keys()
626 .next()
627 .expect("at least one waiter");
628 let clients = self.to_send_clients.get_mut(&key).unwrap();
629 let client = clients.pop().unwrap();
630 if clients.is_empty() {
631 self.to_send_clients.remove(&key);
632 }
633 (key, client)
634 }
635
636 // If we managed to acquire some extra tokens, send them off to a waiting rustc.
637 fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
638 while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
639 let (id, client) = self.pop_waiting_client();
640 // This unwrap is guaranteed to succeed. `active` must be at least
641 // length 1, as otherwise there can't be a client waiting to be sent
642 // on, so tokens.len() must also be at least one.
643 let token = self.tokens.pop().unwrap();
644 self.rustc_tokens
645 .entry(id)
646 .or_insert_with(Vec::new)
647 .push(token);
648 client
649 .release_raw()
650 .with_context(|| "failed to release jobserver token")?;
651 }
652
653 Ok(())
654 }
655
656 fn handle_event(
657 &mut self,
658 cx: &mut Context<'_, '_>,
659 jobserver_helper: &HelperThread,
660 plan: &mut BuildPlan,
661 event: Message,
662 ) -> Result<(), ErrorToHandle> {
663 match event {
664 Message::Run(id, cmd) => {
665 cx.bcx
666 .config
667 .shell()
668 .verbose(|c| c.status("Running", &cmd))?;
669 self.timings.unit_start(id, self.active[&id].clone());
670 }
671 Message::BuildPlanMsg(module_name, cmd, filenames) => {
672 plan.update(&module_name, &cmd, &filenames)?;
673 }
674 Message::Stdout(out) => {
675 writeln!(cx.bcx.config.shell().out(), "{}", out)?;
676 }
677 Message::Stderr(err) => {
678 let mut shell = cx.bcx.config.shell();
679 shell.print_ansi_stderr(err.as_bytes())?;
680 shell.err().write_all(b"\n")?;
681 }
682 Message::Diagnostic { id, level, diag } => {
683 let emitted = self.diag_dedupe.emit_diag(&diag)?;
684 if level == "warning" {
685 self.bump_warning_count(id, emitted);
686 }
687 }
688 Message::WarningCount { id, emitted } => {
689 self.bump_warning_count(id, emitted);
690 }
691 Message::FixDiagnostic(msg) => {
692 self.print.print(&msg)?;
693 }
694 Message::Finish(id, artifact, result) => {
695 let unit = match artifact {
696 // If `id` has completely finished we remove it
697 // from the `active` map ...
698 Artifact::All => {
699 trace!("end: {:?}", id);
700 self.finished += 1;
701 if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
702 // This puts back the tokens that this rustc
703 // acquired into our primary token list.
704 //
705 // This represents a rustc bug: it did not
706 // release all of its thread tokens but finished
707 // completely. But we want to make Cargo resilient
708 // to such rustc bugs, as they're generally not
709 // fatal in nature (i.e., Cargo can make progress
710 // still, and the build might not even fail).
711 self.tokens.extend(rustc_tokens);
712 }
713 self.to_send_clients.remove(&id);
714 self.report_warning_count(cx.bcx.config, id);
715 self.active.remove(&id).unwrap()
716 }
717 // ... otherwise if it hasn't finished we leave it
718 // in there as we'll get another `Finish` later on.
719 Artifact::Metadata => {
720 trace!("end (meta): {:?}", id);
721 self.active[&id].clone()
722 }
723 };
724 debug!("end ({:?}): {:?}", unit, result);
725 match result {
726 Ok(()) => self.finish(id, &unit, artifact, cx)?,
727 Err(error) => {
728 let msg = "The following warnings were emitted during compilation:";
729 self.emit_warnings(Some(msg), &unit, cx)?;
730 self.back_compat_notice(cx, &unit)?;
731 return Err(ErrorToHandle {
732 error,
733 print_always: true,
734 });
735 }
736 }
737 }
738 Message::FutureIncompatReport(id, items) => {
739 let package_id = self.active[&id].pkg.package_id();
740 self.per_package_future_incompat_reports
741 .push(FutureIncompatReportPackage { package_id, items });
742 }
743 Message::Token(acquired_token) => {
744 let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
745 self.tokens.push(token);
746 }
747 Message::NeedsToken(id) => {
748 trace!("queue token request");
749 jobserver_helper.request_token();
750 let client = cx.rustc_clients[&self.active[&id]].clone();
751 self.to_send_clients
752 .entry(id)
753 .or_insert_with(Vec::new)
754 .push(client);
755 }
756 Message::ReleaseToken(id) => {
757 // Note that this pops off potentially a completely
758 // different token, but all tokens of the same job are
759 // conceptually the same so that's fine.
760 //
761 // self.tokens is a "pool" -- the order doesn't matter -- and
762 // this transfers ownership of the token into that pool. If we
763 // end up using it on the next go around, then this token will
764 // be truncated, same as tokens obtained through Message::Token.
765 let rustc_tokens = self
766 .rustc_tokens
767 .get_mut(&id)
768 .expect("no tokens associated");
769 self.tokens
770 .push(rustc_tokens.pop().expect("rustc releases token it has"));
771 }
772 }
773
774 Ok(())
775 }
776
777 // This will also tick the progress bar as appropriate
778 fn wait_for_events(&mut self) -> Vec<Message> {
779 // Drain all events at once to avoid displaying the progress bar
780 // unnecessarily. If there's no events we actually block waiting for
781 // an event, but we keep a "heartbeat" going to allow `record_cpu`
782 // to run above to calculate CPU usage over time. To do this we
783 // listen for a message with a timeout, and on timeout we run the
784 // previous parts of the loop again.
785 let mut events = self.messages.try_pop_all();
786 trace!(
787 "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
788 self.tokens.len(),
789 self.rustc_tokens
790 .iter()
791 .map(|(k, j)| (k, j.len()))
792 .collect::<Vec<_>>(),
793 self.to_send_clients
794 .iter()
795 .map(|(k, j)| (k, j.len()))
796 .collect::<Vec<_>>(),
797 events.len(),
798 );
799 if events.is_empty() {
800 loop {
801 self.tick_progress();
802 self.tokens.truncate(self.active.len() - 1);
803 match self.messages.pop(Duration::from_millis(500)) {
804 Some(message) => {
805 events.push(message);
806 break;
807 }
808 None => continue,
809 }
810 }
811 }
812 events
813 }
814
815 /// This is the "main" loop, where Cargo does all work to run the
816 /// compiler.
817 ///
818 /// This returns an Option to prevent the use of `?` on `Result` types
819 /// because it is important for the loop to carefully handle errors.
820 fn drain_the_queue<'s>(
821 mut self,
822 cx: &mut Context<'_, '_>,
823 plan: &mut BuildPlan,
824 scope: &'s Scope<'s, '_>,
825 jobserver_helper: &HelperThread,
826 ) -> Option<anyhow::Error> {
827 trace!("queue: {:#?}", self.queue);
828
829 // Iteratively execute the entire dependency graph. Each turn of the
830 // loop starts out by scheduling as much work as possible (up to the
831 // maximum number of parallel jobs we have tokens for). A local queue
832 // is maintained separately from the main dependency queue as one
833 // dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
834 // in one package).
835 //
836 // After a job has finished we update our internal state if it was
837 // successful and otherwise wait for pending work to finish if it failed
838 // and then immediately return (or keep going, if requested by the build
839 // config).
840 let mut errors = ErrorsDuringDrain { count: 0 };
841 // CAUTION! Do not use `?` or break out of the loop early. Every error
842 // must be handled in such a way that the loop is still allowed to
843 // drain event messages.
844 loop {
845 if errors.count == 0 || cx.bcx.build_config.keep_going {
846 if let Err(e) = self.spawn_work_if_possible(cx, jobserver_helper, scope) {
847 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
848 }
849 }
850
851 // If after all that we're not actually running anything then we're
852 // done!
853 if self.active.is_empty() {
854 break;
855 }
856
857 if let Err(e) = self.grant_rustc_token_requests() {
858 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
859 }
860
861 // And finally, before we block waiting for the next event, drop any
862 // excess tokens we may have accidentally acquired. Due to how our
863 // jobserver interface is architected we may acquire a token that we
864 // don't actually use, and if this happens just relinquish it back
865 // to the jobserver itself.
866 for event in self.wait_for_events() {
867 if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
868 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
869 }
870 }
871 }
872 self.progress.clear();
873
874 let profile_name = cx.bcx.build_config.requested_profile;
875 // NOTE: this may be a bit inaccurate, since this may not display the
876 // profile for what was actually built. Profile overrides can change
877 // these settings, and in some cases different targets are built with
878 // different profiles. To be accurate, it would need to collect a
879 // list of Units built, and maybe display a list of the different
880 // profiles used. However, to keep it simple and compatible with old
881 // behavior, we just display what the base profile is.
882 let profile = cx.bcx.profiles.base_profile();
883 let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
884 "unoptimized"
885 } else {
886 "optimized"
887 });
888 if profile.debuginfo.unwrap_or(0) != 0 {
889 opt_type += " + debuginfo";
890 }
891
892 let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
893 if let Err(e) = self.timings.finished(cx, &errors.to_error()) {
894 self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
895 }
896 if cx.bcx.build_config.emit_json() {
897 let mut shell = cx.bcx.config.shell();
898 let msg = machine_message::BuildFinished {
899 success: errors.count == 0,
900 }
901 .to_json_string();
902 if let Err(e) = writeln!(shell.out(), "{}", msg) {
903 self.handle_error(&mut shell, &mut errors, e);
904 }
905 }
906
907 if let Some(error) = errors.to_error() {
908 // Any errors up to this point have already been printed via the
909 // `display_error` inside `handle_error`.
910 Some(anyhow::Error::new(AlreadyPrintedError::new(error)))
911 } else if self.queue.is_empty() && self.pending_queue.is_empty() {
912 let message = format!(
913 "{} [{}] target(s) in {}",
914 profile_name, opt_type, time_elapsed
915 );
916 if !cx.bcx.build_config.build_plan {
917 // It doesn't really matter if this fails.
918 drop(cx.bcx.config.shell().status("Finished", message));
919 future_incompat::save_and_display_report(
920 cx.bcx,
921 &self.per_package_future_incompat_reports,
922 );
923 }
924
925 None
926 } else {
927 debug!("queue: {:#?}", self.queue);
928 Some(internal("finished with jobs still left in the queue"))
929 }
930 }
931
932 fn handle_error(
933 &self,
934 shell: &mut Shell,
935 err_state: &mut ErrorsDuringDrain,
936 new_err: impl Into<ErrorToHandle>,
937 ) {
938 let new_err = new_err.into();
939 if new_err.print_always || err_state.count == 0 {
940 crate::display_error(&new_err.error, shell);
941 if err_state.count == 0 && !self.active.is_empty() {
942 drop(shell.warn("build failed, waiting for other jobs to finish..."));
943 }
944 err_state.count += 1;
945 } else {
946 log::warn!("{:?}", new_err.error);
947 }
948 }
949
950 // This also records CPU usage and marks concurrency; we roughly want to do
951 // this as often as we spin on the events receiver (at least every 500ms or
952 // so).
953 fn tick_progress(&mut self) {
954 // Record some timing information if `--timings` is enabled, and
955 // this'll end up being a noop if we're not recording this
956 // information.
957 self.timings.mark_concurrency(
958 self.active.len(),
959 self.pending_queue.len(),
960 self.queue.len(),
961 self.rustc_tokens.len(),
962 );
963 self.timings.record_cpu();
964
965 let active_names = self
966 .active
967 .values()
968 .map(|u| self.name_for_progress(u))
969 .collect::<Vec<_>>();
970 drop(self.progress.tick_now(
971 self.finished,
972 self.total_units,
973 &format!(": {}", active_names.join(", ")),
974 ));
975 }
976
977 fn name_for_progress(&self, unit: &Unit) -> String {
978 let pkg_name = unit.pkg.name();
979 let target_name = unit.target.name();
980 match unit.mode {
981 CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
982 CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
983 CompileMode::Test | CompileMode::Check { test: true } => match unit.target.kind() {
984 TargetKind::Lib(_) => format!("{}(test)", target_name),
985 TargetKind::CustomBuild => panic!("cannot test build script"),
986 TargetKind::Bin => format!("{}(bin test)", target_name),
987 TargetKind::Test => format!("{}(test)", target_name),
988 TargetKind::Bench => format!("{}(bench)", target_name),
989 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
990 format!("{}(example test)", target_name)
991 }
992 },
993 _ => match unit.target.kind() {
994 TargetKind::Lib(_) => pkg_name.to_string(),
995 TargetKind::CustomBuild => format!("{}(build.rs)", pkg_name),
996 TargetKind::Bin => format!("{}(bin)", target_name),
997 TargetKind::Test => format!("{}(test)", target_name),
998 TargetKind::Bench => format!("{}(bench)", target_name),
999 TargetKind::ExampleBin | TargetKind::ExampleLib(_) => {
1000 format!("{}(example)", target_name)
1001 }
1002 },
1003 }
1004 }
1005
1006 /// Executes a job.
1007 ///
1008 /// Fresh jobs block until finished (which should be very fast!), Dirty
1009 /// jobs will spawn a thread in the background and return immediately.
1010 fn run<'s>(&mut self, unit: &Unit, job: Job, cx: &Context<'_, '_>, scope: &'s Scope<'s, '_>) {
1011 let id = JobId(self.next_id);
1012 self.next_id = self.next_id.checked_add(1).unwrap();
1013
1014 debug!("start {}: {:?}", id, unit);
1015
1016 assert!(self.active.insert(id, unit.clone()).is_none());
1017
1018 let messages = self.messages.clone();
1019 let fresh = job.freshness();
1020 let rmeta_required = cx.rmeta_required(unit);
1021
1022 let doit = move |state: JobState<'_, '_>| {
1023 let mut sender = FinishOnDrop {
1024 messages: &state.messages,
1025 id,
1026 result: None,
1027 };
1028 sender.result = Some(job.run(&state));
1029
1030 // If the `rmeta_required` wasn't consumed but it was set
1031 // previously, then we either have:
1032 //
1033 // 1. The `job` didn't do anything because it was "fresh".
1034 // 2. The `job` returned an error and didn't reach the point where
1035 // it called `rmeta_produced`.
1036 // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
1037 //
1038 // Ruling out the third, the other two are pretty common for 2
1039 // we'll just naturally abort the compilation operation but for 1
1040 // we need to make sure that the metadata is flagged as produced so
1041 // send a synthetic message here.
1042 if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
1043 state
1044 .messages
1045 .push(Message::Finish(state.id, Artifact::Metadata, Ok(())));
1046 }
1047
1048 // Use a helper struct with a `Drop` implementation to guarantee
1049 // that a `Finish` message is sent even if our job panics. We
1050 // shouldn't panic unless there's a bug in Cargo, so we just need
1051 // to make sure nothing hangs by accident.
1052 struct FinishOnDrop<'a> {
1053 messages: &'a Queue<Message>,
1054 id: JobId,
1055 result: Option<CargoResult<()>>,
1056 }
1057
1058 impl Drop for FinishOnDrop<'_> {
1059 fn drop(&mut self) {
1060 let result = self
1061 .result
1062 .take()
1063 .unwrap_or_else(|| Err(format_err!("worker panicked")));
1064 self.messages
1065 .push(Message::Finish(self.id, Artifact::All, result));
1066 }
1067 }
1068 };
1069
1070 match fresh {
1071 Freshness::Fresh => {
1072 self.timings.add_fresh();
1073 // Running a fresh job on the same thread is often much faster than spawning a new
1074 // thread to run the job.
1075 doit(JobState {
1076 id,
1077 messages,
1078 output: Some(&self.diag_dedupe),
1079 rmeta_required: Cell::new(rmeta_required),
1080 _marker: marker::PhantomData,
1081 });
1082 }
1083 Freshness::Dirty => {
1084 self.timings.add_dirty();
1085 scope.spawn(move || {
1086 doit(JobState {
1087 id,
1088 messages: messages.clone(),
1089 output: None,
1090 rmeta_required: Cell::new(rmeta_required),
1091 _marker: marker::PhantomData,
1092 })
1093 });
1094 }
1095 }
1096 }
1097
1098 fn emit_warnings(
1099 &mut self,
1100 msg: Option<&str>,
1101 unit: &Unit,
1102 cx: &mut Context<'_, '_>,
1103 ) -> CargoResult<()> {
1104 let outputs = cx.build_script_outputs.lock().unwrap();
1105 let metadata = match cx.find_build_script_metadata(unit) {
1106 Some(metadata) => metadata,
1107 None => return Ok(()),
1108 };
1109 let bcx = &mut cx.bcx;
1110 if let Some(output) = outputs.get(metadata) {
1111 if !output.warnings.is_empty() {
1112 if let Some(msg) = msg {
1113 writeln!(bcx.config.shell().err(), "{}\n", msg)?;
1114 }
1115
1116 for warning in output.warnings.iter() {
1117 bcx.config.shell().warn(warning)?;
1118 }
1119
1120 if msg.is_some() {
1121 // Output an empty line.
1122 writeln!(bcx.config.shell().err())?;
1123 }
1124 }
1125 }
1126
1127 Ok(())
1128 }
1129
1130 fn bump_warning_count(&mut self, id: JobId, emitted: bool) {
1131 let cnts = self.warning_count.entry(id).or_default();
1132 cnts.0 += 1;
1133 if !emitted {
1134 cnts.1 += 1;
1135 }
1136 }
1137
1138 /// Displays a final report of the warnings emitted by a particular job.
1139 fn report_warning_count(&mut self, config: &Config, id: JobId) {
1140 let count = match self.warning_count.remove(&id) {
1141 Some(count) => count,
1142 None => return,
1143 };
1144 let unit = &self.active[&id];
1145 let mut message = format!("`{}` ({}", unit.pkg.name(), unit.target.description_named());
1146 if unit.mode.is_rustc_test() && !(unit.target.is_test() || unit.target.is_bench()) {
1147 message.push_str(" test");
1148 } else if unit.mode.is_doc_test() {
1149 message.push_str(" doctest");
1150 } else if unit.mode.is_doc() {
1151 message.push_str(" doc");
1152 }
1153 message.push_str(") generated ");
1154 match count.0 {
1155 1 => message.push_str("1 warning"),
1156 n => drop(write!(message, "{} warnings", n)),
1157 };
1158 match count.1 {
1159 0 => {}
1160 1 => message.push_str(" (1 duplicate)"),
1161 n => drop(write!(message, " ({} duplicates)", n)),
1162 }
1163 // Errors are ignored here because it is tricky to handle them
1164 // correctly, and they aren't important.
1165 drop(config.shell().warn(message));
1166 }
1167
1168 fn finish(
1169 &mut self,
1170 id: JobId,
1171 unit: &Unit,
1172 artifact: Artifact,
1173 cx: &mut Context<'_, '_>,
1174 ) -> CargoResult<()> {
1175 if unit.mode.is_run_custom_build() && unit.show_warnings(cx.bcx.config) {
1176 self.emit_warnings(None, unit, cx)?;
1177 }
1178 let unlocked = self.queue.finish(unit, &artifact);
1179 match artifact {
1180 Artifact::All => self.timings.unit_finished(id, unlocked),
1181 Artifact::Metadata => self.timings.unit_rmeta_finished(id, unlocked),
1182 }
1183 Ok(())
1184 }
1185
1186 // This isn't super trivial because we don't want to print loads and
1187 // loads of information to the console, but we also want to produce a
1188 // faithful representation of what's happening. This is somewhat nuanced
1189 // as a package can start compiling *very* early on because of custom
1190 // build commands and such.
1191 //
1192 // In general, we try to print "Compiling" for the first nontrivial task
1193 // run for a package, regardless of when that is. We then don't print
1194 // out any more information for a package after we've printed it once.
1195 fn note_working_on(
1196 &mut self,
1197 config: &Config,
1198 unit: &Unit,
1199 fresh: Freshness,
1200 ) -> CargoResult<()> {
1201 if (self.compiled.contains(&unit.pkg.package_id()) && !unit.mode.is_doc())
1202 || (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
1203 {
1204 return Ok(());
1205 }
1206
1207 match fresh {
1208 // Any dirty stage which runs at least one command gets printed as
1209 // being a compiled package.
1210 Dirty => {
1211 if unit.mode.is_doc() {
1212 self.documented.insert(unit.pkg.package_id());
1213 config.shell().status("Documenting", &unit.pkg)?;
1214 } else if unit.mode.is_doc_test() {
1215 // Skip doc test.
1216 } else {
1217 self.compiled.insert(unit.pkg.package_id());
1218 if unit.mode.is_check() {
1219 config.shell().status("Checking", &unit.pkg)?;
1220 } else {
1221 config.shell().status("Compiling", &unit.pkg)?;
1222 }
1223 }
1224 }
1225 Fresh => {
1226 // If doc test are last, only print "Fresh" if nothing has been printed.
1227 if self.counts[&unit.pkg.package_id()] == 0
1228 && !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
1229 {
1230 self.compiled.insert(unit.pkg.package_id());
1231 config.shell().verbose(|c| c.status("Fresh", &unit.pkg))?;
1232 }
1233 }
1234 }
1235 Ok(())
1236 }
1237
1238 fn back_compat_notice(&self, cx: &Context<'_, '_>, unit: &Unit) -> CargoResult<()> {
1239 if unit.pkg.name() != "diesel"
1240 || unit.pkg.version() >= &Version::new(1, 4, 8)
1241 || cx.bcx.ws.resolve_behavior() == ResolveBehavior::V1
1242 || !unit.pkg.package_id().source_id().is_registry()
1243 || !unit.features.is_empty()
1244 {
1245 return Ok(());
1246 }
1247 if !cx
1248 .bcx
1249 .unit_graph
1250 .keys()
1251 .any(|unit| unit.pkg.name() == "diesel" && !unit.features.is_empty())
1252 {
1253 return Ok(());
1254 }
1255 cx.bcx.config.shell().note(
1256 "\
1257 This error may be due to an interaction between diesel and Cargo's new
1258 feature resolver. Try updating to diesel 1.4.8 to fix this error.
1259 ",
1260 )?;
1261 Ok(())
1262 }
1263 }
1264
1265 impl ErrorsDuringDrain {
1266 fn to_error(&self) -> Option<anyhow::Error> {
1267 match self.count {
1268 0 => None,
1269 1 => Some(format_err!("1 job failed")),
1270 n => Some(format_err!("{} jobs failed", n)),
1271 }
1272 }
1273 }