1 use std
::collections
::hash_map
::HashMap
;
2 use std
::collections
::HashSet
;
6 use std
::process
::Output
;
7 use std
::sync
::mpsc
::{channel, Receiver, Sender}
;
10 use crossbeam_utils
::thread
::Scope
;
11 use jobserver
::{Acquired, HelperThread}
;
12 use log
::{debug, info, trace}
;
14 use crate::core
::profiles
::Profile
;
15 use crate::core
::{PackageId, Target, TargetKind}
;
16 use crate::handle_error
;
18 use crate::util
::diagnostic_server
::{self, DiagnosticPrinter}
;
19 use crate::util
::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder}
;
20 use crate::util
::{Config, DependencyQueue, Dirty, Fresh, Freshness}
;
21 use crate::util
::{Progress, ProgressStyle}
;
23 use super::context
::OutputFile
;
25 use super::{BuildContext, BuildPlan, CompileMode, Context, Kind, Unit}
;
27 /// A management structure of the entire dependency graph to compile.
29 /// This structure is backed by the `DependencyQueue` type and manages the
30 /// actual compilation step of each package. Packages enqueue units of work and
31 /// then later on the entire graph is processed and compiled.
32 pub struct JobQueue
<'a
> {
33 queue
: DependencyQueue
<Key
<'a
>, Vec
<(Job
, Freshness
)>>,
34 tx
: Sender
<Message
<'a
>>,
35 rx
: Receiver
<Message
<'a
>>,
37 pending
: HashMap
<Key
<'a
>, PendingBuild
>,
38 compiled
: HashSet
<PackageId
>,
39 documented
: HashSet
<PackageId
>,
40 counts
: HashMap
<PackageId
, usize>,
44 /// A helper structure for metadata about the state of a building package.
46 /// Number of jobs currently active
48 /// Current freshness state of this package. Any dirty target within a
49 /// package will cause the entire package to become dirty.
53 #[derive(Clone, Copy, Eq, PartialEq, Hash)]
63 fn name_for_progress(&self) -> String
{
64 let pkg_name
= self.pkg
.name();
66 CompileMode
::Doc { .. }
=> format
!("{}(doc)", pkg_name
),
67 CompileMode
::RunCustomBuild
=> format
!("{}(build)", pkg_name
),
69 let annotation
= match self.target
.kind() {
70 TargetKind
::Lib(_
) => return pkg_name
.to_string(),
71 TargetKind
::CustomBuild
=> return format
!("{}(build.rs)", pkg_name
),
72 TargetKind
::Bin
=> "bin",
73 TargetKind
::Test
=> "test",
74 TargetKind
::Bench
=> "bench",
75 TargetKind
::ExampleBin
| TargetKind
::ExampleLib(_
) => "example",
77 format
!("{}({})", self.target
.name(), annotation
)
83 pub struct JobState
<'a
> {
84 tx
: Sender
<Message
<'a
>>,
89 BuildPlanMsg(String
, ProcessBuilder
, Arc
<Vec
<OutputFile
>>),
92 FixDiagnostic(diagnostic_server
::Message
),
93 Token(io
::Result
<Acquired
>),
94 Finish(Key
<'a
>, CargoResult
<()>),
97 impl<'a
> JobState
<'a
> {
98 pub fn running(&self, cmd
: &ProcessBuilder
) {
99 let _
= self.tx
.send(Message
::Run(cmd
.to_string()));
106 filenames
: Arc
<Vec
<OutputFile
>>,
110 .send(Message
::BuildPlanMsg(module_name
, cmd
, filenames
));
113 pub fn capture_output(
115 cmd
: &ProcessBuilder
,
116 prefix
: Option
<String
>,
117 capture_output
: bool
,
118 ) -> CargoResult
<Output
> {
119 let prefix
= prefix
.unwrap_or_else(String
::new
);
120 cmd
.exec_with_streaming(
122 let _
= self.tx
.send(Message
::Stdout(format
!("{}{}", prefix
, out
)));
126 let _
= self.tx
.send(Message
::Stderr(format
!("{}{}", prefix
, err
)));
134 impl<'a
> JobQueue
<'a
> {
135 pub fn new
<'cfg
>(bcx
: &BuildContext
<'a
, 'cfg
>) -> JobQueue
<'a
> {
136 let (tx
, rx
) = channel();
138 queue
: DependencyQueue
::new(),
142 pending
: HashMap
::new(),
143 compiled
: HashSet
::new(),
144 documented
: HashSet
::new(),
145 counts
: HashMap
::new(),
146 is_release
: bcx
.build_config
.release
,
150 pub fn enqueue
<'cfg
>(
152 cx
: &Context
<'a
, 'cfg
>,
156 ) -> CargoResult
<()> {
157 let key
= Key
::new(unit
);
158 let deps
= key
.dependencies(cx
)?
;
160 .queue(Fresh
, &key
, Vec
::new(), &deps
)
162 *self.counts
.entry(key
.pkg
).or_insert(0) += 1;
166 /// Execute all jobs necessary to build the dependency graph.
168 /// This function will spawn off `config.jobs()` workers to build all of the
169 /// necessary dependencies, in order. Freshness is propagated as far as
170 /// possible along each dependency chain.
171 pub fn execute(&mut self, cx
: &mut Context
<'_
, '_
>, plan
: &mut BuildPlan
) -> CargoResult
<()> {
172 let _p
= profile
::start("executing the job graph");
173 self.queue
.queue_finished();
175 // We need to give a handle to the send half of our message queue to the
176 // jobserver and (optionally) diagnostic helper thread. Unfortunately
177 // though we need the handle to be `'static` as that's typically what's
178 // required when spawning a thread!
180 // To work around this we transmute the `Sender` to a static lifetime.
181 // we're only sending "longer living" messages and we should also
182 // destroy all references to the channel before this function exits as
183 // the destructor for the `helper` object will ensure the associated
184 // thread is no longer running.
186 // As a result, this `transmute` to a longer lifetime should be safe in
188 let tx
= self.tx
.clone();
189 let tx
= unsafe { mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx) }
;
190 let tx2
= tx
.clone();
194 .into_helper_thread(move |token
| {
195 drop(tx
.send(Message
::Token(token
)));
197 .chain_err(|| "failed to create helper thread for jobserver management")?
;
198 let _diagnostic_server
= cx
201 .rustfix_diagnostic_server
204 .map(move |srv
| srv
.start(move |msg
| drop(tx2
.send(Message
::FixDiagnostic(msg
)))));
206 crossbeam_utils
::thread
::scope(|scope
| self.drain_the_queue(cx
, plan
, scope
, &helper
))
207 .expect("child threads should't panic")
212 cx
: &mut Context
<'_
, '_
>,
213 plan
: &mut BuildPlan
,
215 jobserver_helper
: &HelperThread
,
216 ) -> CargoResult
<()> {
217 let mut tokens
= Vec
::new();
218 let mut queue
= Vec
::new();
219 let build_plan
= cx
.bcx
.build_config
.build_plan
;
220 let mut print
= DiagnosticPrinter
::new(cx
.bcx
.config
);
221 trace
!("queue: {:#?}", self.queue
);
223 // Iteratively execute the entire dependency graph. Each turn of the
224 // loop starts out by scheduling as much work as possible (up to the
225 // maximum number of parallel jobs we have tokens for). A local queue
226 // is maintained separately from the main dependency queue as one
227 // dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
230 // After a job has finished we update our internal state if it was
231 // successful and otherwise wait for pending work to finish if it failed
232 // and then immediately return.
233 let mut error
= None
;
234 let mut progress
= Progress
::with_style("Building", ProgressStyle
::Ratio
, cx
.bcx
.config
);
235 let total
= self.queue
.len();
237 // Dequeue as much work as we can, learning about everything
238 // possible that can run. Note that this is also the point where we
239 // start requesting job tokens. Each job after the first needs to
241 while let Some((fresh
, key
, jobs
)) = self.queue
.dequeue() {
242 let total_fresh
= jobs
.iter().fold(fresh
, |fresh
, &(_
, f
)| f
.combine(fresh
));
250 for (job
, f
) in jobs
{
251 queue
.push((key
, job
, f
.combine(fresh
)));
252 if !self.active
.is_empty() || !queue
.is_empty() {
253 jobserver_helper
.request_token();
258 // Now that we've learned of all possible work that we can execute
259 // try to spawn it so long as we've got a jobserver token which says
260 // we're able to perform some parallel work.
261 while error
.is_none() && self.active
.len() < tokens
.len() + 1 && !queue
.is_empty() {
262 let (key
, job
, fresh
) = queue
.remove(0);
263 self.run(key
, fresh
, job
, cx
.bcx
.config
, scope
, build_plan
)?
;
266 // If after all that we're not actually running anything then we're
268 if self.active
.is_empty() {
272 // And finally, before we block waiting for the next event, drop any
273 // excess tokens we may have accidentally acquired. Due to how our
274 // jobserver interface is architected we may acquire a token that we
275 // don't actually use, and if this happens just relinquish it back
276 // to the jobserver itself.
277 tokens
.truncate(self.active
.len() - 1);
279 let count
= total
- self.queue
.len();
280 let active_names
= self
283 .map(Key
::name_for_progress
)
284 .collect
::<Vec
<_
>>();
285 drop(progress
.tick_now(count
, total
, &format
!(": {}", active_names
.join(", "))));
286 let event
= self.rx
.recv().unwrap();
290 Message
::Run(cmd
) => {
294 .verbose(|c
| c
.status("Running", &cmd
))?
;
296 Message
::BuildPlanMsg(module_name
, cmd
, filenames
) => {
297 plan
.update(&module_name
, &cmd
, &filenames
)?
;
299 Message
::Stdout(out
) => {
302 Message
::Stderr(err
) => {
303 let mut shell
= cx
.bcx
.config
.shell();
304 shell
.print_ansi(err
.as_bytes())?
;
305 shell
.err().write_all(b
"\n")?
;
307 Message
::FixDiagnostic(msg
) => {
310 Message
::Finish(key
, result
) => {
311 info
!("end: {:?}", key
);
313 // self.active.remove_item(&key); // <- switch to this when stabilized.
317 .position(|k
| *k
== key
)
318 .expect("an unrecorded package has finished compiling");
319 self.active
.remove(pos
);
320 if !self.active
.is_empty() {
321 assert
!(!tokens
.is_empty());
325 Ok(()) => self.finish(key
, cx
)?
,
327 let msg
= "The following warnings were emitted during compilation:";
328 self.emit_warnings(Some(msg
), &key
, cx
)?
;
330 if !self.active
.is_empty() {
331 error
= Some(format_err
!("build failed"));
332 handle_error(&e
, &mut *cx
.bcx
.config
.shell());
333 cx
.bcx
.config
.shell().warn(
334 "build failed, waiting for other \
343 Message
::Token(acquired_token
) => {
344 tokens
.push(acquired_token
.chain_err(|| "failed to acquire jobserver token")?
);
350 let build_type
= if self.is_release { "release" }
else { "dev" }
;
351 // NOTE: This may be a bit inaccurate, since this may not display the
352 // profile for what was actually built. Profile overrides can change
353 // these settings, and in some cases different targets are built with
354 // different profiles. To be accurate, it would need to collect a
355 // list of Units built, and maybe display a list of the different
356 // profiles used. However, to keep it simple and compatible with old
357 // behavior, we just display what the base profile is.
358 let profile
= cx
.bcx
.profiles
.base_profile(self.is_release
);
359 let mut opt_type
= String
::from(if profile
.opt_level
.as_str() == "0" {
364 if profile
.debuginfo
.is_some() {
365 opt_type
+= " + debuginfo";
368 let time_elapsed
= util
::elapsed(cx
.bcx
.config
.creation_time().elapsed());
370 if self.queue
.is_empty() {
371 let message
= format
!(
372 "{} [{}] target(s) in {}",
373 build_type
, opt_type
, time_elapsed
376 cx
.bcx
.config
.shell().status("Finished", message
)?
;
379 } else if let Some(e
) = error
{
382 debug
!("queue: {:#?}", self.queue
);
383 Err(internal("finished with jobs still left in the queue"))
387 /// Executes a job in the `scope` given, pushing the spawned thread's
388 /// handled onto `threads`.
397 ) -> CargoResult
<()> {
398 info
!("start: {:?}", key
);
400 self.active
.push(key
);
401 *self.counts
.get_mut(&key
.pkg
).unwrap() -= 1;
403 let my_tx
= self.tx
.clone();
405 let res
= job
.run(fresh
, &JobState { tx: my_tx.clone() }
);
406 my_tx
.send(Message
::Finish(key
, res
)).unwrap();
410 // Print out some nice progress information
411 self.note_working_on(config
, &key
, fresh
)?
;
415 Freshness
::Fresh
=> doit(),
416 Freshness
::Dirty
=> {
417 scope
.spawn(move |_
| doit());
428 cx
: &mut Context
<'_
, '_
>,
429 ) -> CargoResult
<()> {
430 let output
= cx
.build_state
.outputs
.lock().unwrap();
431 let bcx
= &mut cx
.bcx
;
432 if let Some(output
) = output
.get(&(key
.pkg
, key
.kind
)) {
433 if let Some(msg
) = msg
{
434 if !output
.warnings
.is_empty() {
435 writeln
!(bcx
.config
.shell().err(), "{}\n", msg
)?
;
439 for warning
in output
.warnings
.iter() {
440 bcx
.config
.shell().warn(warning
)?
;
443 if !output
.warnings
.is_empty() && msg
.is_some() {
444 // Output an empty line.
445 writeln
!(bcx
.config
.shell().err())?
;
452 fn finish(&mut self, key
: Key
<'a
>, cx
: &mut Context
<'_
, '_
>) -> CargoResult
<()> {
453 if key
.mode
.is_run_custom_build() && cx
.bcx
.show_warnings(key
.pkg
) {
454 self.emit_warnings(None
, &key
, cx
)?
;
457 let state
= self.pending
.get_mut(&key
).unwrap();
460 self.queue
.finish(&key
, state
.fresh
);
465 // This isn't super trivial because we don't want to print loads and
466 // loads of information to the console, but we also want to produce a
467 // faithful representation of what's happening. This is somewhat nuanced
468 // as a package can start compiling *very* early on because of custom
469 // build commands and such.
471 // In general, we try to print "Compiling" for the first nontrivial task
472 // run for a package, regardless of when that is. We then don't print
473 // out any more information for a package after we've printed it once.
479 ) -> CargoResult
<()> {
480 if (self.compiled
.contains(&key
.pkg
) && !key
.mode
.is_doc())
481 || (self.documented
.contains(&key
.pkg
) && key
.mode
.is_doc())
487 // Any dirty stage which runs at least one command gets printed as
488 // being a compiled package
490 if key
.mode
.is_doc() {
492 if !key
.mode
.is_any_test() {
493 self.documented
.insert(key
.pkg
);
494 config
.shell().status("Documenting", key
.pkg
)?
;
497 self.compiled
.insert(key
.pkg
);
498 if key
.mode
.is_check() {
499 config
.shell().status("Checking", key
.pkg
)?
;
501 config
.shell().status("Compiling", key
.pkg
)?
;
506 // If doctest is last, only print "Fresh" if nothing has been printed.
507 if self.counts
[&key
.pkg
] == 0
508 && !(key
.mode
== CompileMode
::Doctest
&& self.compiled
.contains(&key
.pkg
))
510 self.compiled
.insert(key
.pkg
);
511 config
.shell().verbose(|c
| c
.status("Fresh", key
.pkg
))?
;
520 fn new(unit
: &Unit
<'a
>) -> Key
<'a
> {
522 pkg
: unit
.pkg
.package_id(),
524 profile
: unit
.profile
,
530 fn dependencies
<'cfg
>(&self, cx
: &Context
<'a
, 'cfg
>) -> CargoResult
<Vec
<Key
<'a
>>> {
532 pkg
: cx
.get_package(self.pkg
)?
,
534 profile
: self.profile
,
538 let targets
= cx
.dep_targets(&unit
);
542 // Binaries aren't actually needed to *compile* tests, just to run
543 // them, so we don't include this dependency edge in the job graph.
544 if self.target
.is_test() && unit
.target
.is_bin() {
554 impl<'a
> fmt
::Debug
for Key
<'a
> {
555 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
558 "{} => {}/{} => {:?}",
559 self.pkg
, self.target
, self.profile
, self.kind