]> git.proxmox.com Git - rustc.git/blob - vendor/jobserver/src/lib.rs
New upstream version 1.38.0+dfsg1
[rustc.git] / vendor / jobserver / src / lib.rs
1 //! An implementation of the GNU make jobserver.
2 //!
3 //! This crate is an implementation, in Rust, of the GNU `make` jobserver for
4 //! CLI tools that are interoperating with make or otherwise require some form
5 //! of parallelism limiting across process boundaries. This was originally
6 //! written for usage in Cargo to both (a) work when `cargo` is invoked from
7 //! `make` (using `make`'s jobserver) and (b) work when `cargo` invokes build
8 //! scripts, exporting a jobserver implementation for `make` processes to
9 //! transitively use.
10 //!
11 //! The jobserver implementation can be found in [detail online][docs] but
12 //! basically boils down to a cross-process semaphore. On Unix this is
13 //! implemented with the `pipe` syscall and read/write ends of a pipe and on
14 //! Windows this is implemented literally with IPC semaphores.
15 //!
16 //! The jobserver protocol in `make` also dictates when tokens are acquire to
17 //! run child work, and clients using this crate should take care to implement
18 //! such details to ensure correct interoperation with `make` itself.
19 //!
20 //! ## Examples
21 //!
22 //! Connect to a jobserver that was set up by `make` or a different process:
23 //!
24 //! ```no_run
25 //! use jobserver::Client;
26 //!
27 //! // See API documentation for why this is `unsafe`
28 //! let client = match unsafe { Client::from_env() } {
29 //! Some(client) => client,
30 //! None => panic!("client not configured"),
31 //! };
32 //! ```
33 //!
34 //! Acquire and release token from a jobserver:
35 //!
36 //! ```no_run
37 //! use jobserver::Client;
38 //!
39 //! let client = unsafe { Client::from_env().unwrap() };
40 //! let token = client.acquire().unwrap(); // blocks until it is available
41 //! drop(token); // releases the token when the work is done
42 //! ```
43 //!
44 //! Create a new jobserver and configure a child process to have access:
45 //!
46 //! ```
47 //! use std::process::Command;
48 //! use jobserver::Client;
49 //!
50 //! let client = Client::new(4).expect("failed to create jobserver");
51 //! let mut cmd = Command::new("make");
52 //! client.configure(&mut cmd);
53 //! ```
54 //!
55 //! ## Caveats
56 //!
57 //! This crate makes no attempt to release tokens back to a jobserver on
58 //! abnormal exit of a process. If a process which acquires a token is killed
59 //! with ctrl-c or some similar signal then tokens will not be released and the
60 //! jobserver may be in a corrupt state.
61 //!
62 //! Note that this is typically ok as ctrl-c means that an entire build process
63 //! is being torn down, but it's worth being aware of at least!
64 //!
65 //! ## Windows caveats
66 //!
67 //! There appear to be two implementations of `make` on Windows. On MSYS2 one
68 //! typically comes as `mingw32-make` and the other as `make` itself. I'm not
69 //! personally too familiar with what's going on here, but for jobserver-related
70 //! information the `mingw32-make` implementation uses Windows semaphores
71 //! whereas the `make` program does not. The `make` program appears to use file
72 //! descriptors and I'm not really sure how it works, so this crate is not
73 //! compatible with `make` on Windows. It is, however, compatible with
74 //! `mingw32-make`.
75 //!
76 //! [docs]: http://make.mad-scientist.net/papers/jobserver-implementation/
77
78 #![deny(missing_docs, missing_debug_implementations)]
79 #![doc(html_root_url = "https://docs.rs/jobserver/0.1")]
80
81 #[macro_use]
82 extern crate log;
83
84 use std::env;
85 use std::io;
86 use std::process::Command;
87 use std::sync::mpsc::{self, Sender};
88 use std::sync::Arc;
89
90 /// A client of a jobserver
91 ///
92 /// This structure is the main type exposed by this library, and is where
93 /// interaction to a jobserver is configured through. Clients are either created
94 /// from scratch in which case the internal semphore is initialied on the spot,
95 /// or a client is created from the environment to connect to a jobserver
96 /// already created.
97 ///
98 /// Some usage examples can be found in the crate documentation for using a
99 /// client.
100 ///
101 /// Note that a `Client` implements the `Clone` trait, and all instances of a
102 /// `Client` refer to the same jobserver instance.
103 #[derive(Clone, Debug)]
104 pub struct Client {
105 inner: Arc<imp::Client>,
106 }
107
108 /// An acquired token from a jobserver.
109 ///
110 /// This token will be released back to the jobserver when it is dropped and
111 /// otherwise represents the ability to spawn off another thread of work.
112 #[derive(Debug)]
113 pub struct Acquired {
114 client: Arc<imp::Client>,
115 data: imp::Acquired,
116 }
117
118 impl Client {
119 /// Creates a new jobserver initialized with the given parallelism limit.
120 ///
121 /// A client to the jobserver created will be returned. This client will
122 /// allow at most `limit` tokens to be acquired from it in parallel. More
123 /// calls to `acquire` will cause the calling thread to block.
124 ///
125 /// Note that the created `Client` is not automatically inherited into
126 /// spawned child processes from this program. Manual usage of the
127 /// `configure` function is required for a child process to have access to a
128 /// job server.
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// use jobserver::Client;
134 ///
135 /// let client = Client::new(4).expect("failed to create jobserver");
136 /// ```
137 ///
138 /// # Errors
139 ///
140 /// Returns an error if any I/O error happens when attempting to create the
141 /// jobserver client.
142 pub fn new(limit: usize) -> io::Result<Client> {
143 Ok(Client {
144 inner: Arc::new(imp::Client::new(limit)?),
145 })
146 }
147
148 /// Attempts to connect to the jobserver specified in this process's
149 /// environment.
150 ///
151 /// When the a `make` executable calls a child process it will configure the
152 /// environment of the child to ensure that it has handles to the jobserver
153 /// it's passing down. This function will attempt to look for these details
154 /// and connect to the jobserver.
155 ///
156 /// Note that the created `Client` is not automatically inherited into
157 /// spawned child processes from this program. Manual usage of the
158 /// `configure` function is required for a child process to have access to a
159 /// job server.
160 ///
161 /// # Return value
162 ///
163 /// If a jobserver was found in the environment and it looks correct then
164 /// `Some` of the connected client will be returned. If no jobserver was
165 /// found then `None` will be returned.
166 ///
167 /// Note that on Unix the `Client` returned **takes ownership of the file
168 /// descriptors specified in the environment**. Jobservers on Unix are
169 /// implemented with `pipe` file descriptors, and they're inherited from
170 /// parent processes. This `Client` returned takes ownership of the file
171 /// descriptors for this process and will close the file descriptors after
172 /// this value is dropped.
173 ///
174 /// Additionally on Unix this function will configure the file descriptors
175 /// with `CLOEXEC` so they're not automatically inherited by spawned
176 /// children.
177 ///
178 /// # Unsafety
179 ///
180 /// This function is `unsafe` to call on Unix specifically as it
181 /// transitively requires usage of the `from_raw_fd` function, which is
182 /// itself unsafe in some circumstances.
183 ///
184 /// It's recommended to call this function very early in the lifetime of a
185 /// program before any other file descriptors are opened. That way you can
186 /// make sure to take ownership properly of the file descriptors passed
187 /// down, if any.
188 ///
189 /// It's generally unsafe to call this function twice in a program if the
190 /// previous invocation returned `Some`.
191 ///
192 /// Note, though, that on Windows it should be safe to call this function
193 /// any number of times.
194 pub unsafe fn from_env() -> Option<Client> {
195 let var = match env::var("CARGO_MAKEFLAGS")
196 .or(env::var("MAKEFLAGS"))
197 .or(env::var("MFLAGS"))
198 {
199 Ok(s) => s,
200 Err(_) => return None,
201 };
202 let mut arg = "--jobserver-fds=";
203 let pos = match var.find(arg) {
204 Some(i) => i,
205 None => {
206 arg = "--jobserver-auth=";
207 match var.find(arg) {
208 Some(i) => i,
209 None => return None,
210 }
211 }
212 };
213
214 let s = var[pos + arg.len()..].split(' ').next().unwrap();
215 imp::Client::open(s).map(|c| Client { inner: Arc::new(c) })
216 }
217
218 /// Acquires a token from this jobserver client.
219 ///
220 /// This function will block the calling thread until a new token can be
221 /// acquired from the jobserver.
222 ///
223 /// # Return value
224 ///
225 /// On successful acquisition of a token an instance of `Acquired` is
226 /// returned. This structure, when dropped, will release the token back to
227 /// the jobserver. It's recommended to avoid leaking this value.
228 ///
229 /// # Errors
230 ///
231 /// If an I/O error happens while acquiring a token then this function will
232 /// return immediately with the error. If an error is returned then a token
233 /// was not acquired.
234 pub fn acquire(&self) -> io::Result<Acquired> {
235 let data = try!(self.inner.acquire());
236 Ok(Acquired {
237 client: self.inner.clone(),
238 data: data,
239 })
240 }
241
242 /// Configures a child process to have access to this client's jobserver as
243 /// well.
244 ///
245 /// This function is required to be called to ensure that a jobserver is
246 /// properly inherited to a child process. If this function is *not* called
247 /// then this `Client` will not be accessible in the child process. In other
248 /// words, if not called, then `Client::from_env` will return `None` in the
249 /// child process (or the equivalent of `Child::from_env` that `make` uses).
250 ///
251 /// ## Platform-specific behavior
252 ///
253 /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS` environment
254 /// variables for the child process, and on Unix this will also allow the
255 /// two file descriptors for this client to be inherited to the child.
256 ///
257 /// On platforms other than Unix and Windows this panics.
258 pub fn configure(&self, cmd: &mut Command) {
259 let arg = self.inner.string_arg();
260 // Older implementations of make use `--jobserver-fds` and newer
261 // implementations use `--jobserver-auth`, pass both to try to catch
262 // both implementations.
263 let value = format!("--jobserver-fds={0} --jobserver-auth={0}", arg);
264 cmd.env("CARGO_MAKEFLAGS", &value);
265 self.inner.configure(cmd);
266 }
267
268 /// Converts this `Client` into a helper thread to deal with a blocking
269 /// `acquire` function a little more easily.
270 ///
271 /// The fact that the `acquire` function on `Client` blocks isn't always
272 /// the easiest to work with. Typically you're using a jobserver to
273 /// manage running other events in parallel! This means that you need to
274 /// either (a) wait for an existing job to finish or (b) wait for a
275 /// new token to become available.
276 ///
277 /// Unfortunately the blocking in `acquire` happens at the implementation
278 /// layer of jobservers. On Unix this requires a blocking call to `read`
279 /// and on Windows this requires one of the `WaitFor*` functions. Both
280 /// of these situations aren't the easiest to deal with:
281 ///
282 /// * On Unix there's basically only one way to wake up a `read` early, and
283 /// that's through a signal. This is what the `make` implementation
284 /// itself uses, relying on `SIGCHLD` to wake up a blocking acquisition
285 /// of a new job token. Unfortunately nonblocking I/O is not an option
286 /// here, so it means that "waiting for one of two events" means that
287 /// the latter event must generate a signal! This is not always the case
288 /// on unix for all jobservers.
289 ///
290 /// * On Windows you'd have to basically use the `WaitForMultipleObjects`
291 /// which means that you've got to canonicalize all your event sources
292 /// into a `HANDLE` which also isn't the easiest thing to do
293 /// unfortunately.
294 ///
295 /// This function essentially attempts to ease these limitations by
296 /// converting this `Client` into a helper thread spawned into this
297 /// process. The application can then request that the helper thread
298 /// acquires tokens and the provided closure will be invoked for each token
299 /// acquired.
300 ///
301 /// The intention is that this function can be used to translate the event
302 /// of a token acquisition into an arbitrary user-defined event.
303 ///
304 /// # Arguments
305 ///
306 /// This function will consume the `Client` provided to be transferred to
307 /// the helper thread that is spawned. Additionally a closure `f` is
308 /// provided to be invoked whenever a token is acquired.
309 ///
310 /// This closure is only invoked after calls to
311 /// `HelperThread::request_token` have been made and a token itself has
312 /// been acquired. If an error happens while acquiring the token then
313 /// an error will be yielded to the closure as well.
314 ///
315 /// # Return Value
316 ///
317 /// This function will return an instance of the `HelperThread` structure
318 /// which is used to manage the helper thread associated with this client.
319 /// Through the `HelperThread` you'll request that tokens are acquired.
320 /// When acquired, the closure provided here is invoked.
321 ///
322 /// When the `HelperThread` structure is returned it will be gracefully
323 /// torn down, and the calling thread will be blocked until the thread is
324 /// torn down (which should be prompt).
325 ///
326 /// # Errors
327 ///
328 /// This function may fail due to creation of the helper thread or
329 /// auxiliary I/O objects to manage the helper thread. In any of these
330 /// situations the error is propagated upwards.
331 ///
332 /// # Platform-specific behavior
333 ///
334 /// On Windows this function behaves pretty normally as expected, but on
335 /// Unix the implementation is... a little heinous. As mentioned above
336 /// we're forced into blocking I/O for token acquisition, namely a blocking
337 /// call to `read`. We must be able to unblock this, however, to tear down
338 /// the helper thread gracefully!
339 ///
340 /// Essentially what happens is that we'll send a signal to the helper
341 /// thread spawned and rely on `EINTR` being returned to wake up the helper
342 /// thread. This involves installing a global `SIGUSR1` handler that does
343 /// nothing along with sending signals to that thread. This may cause
344 /// odd behavior in some applications, so it's recommended to review and
345 /// test thoroughly before using this.
346 pub fn into_helper_thread<F>(self, f: F) -> io::Result<HelperThread>
347 where
348 F: FnMut(io::Result<Acquired>) + Send + 'static,
349 {
350 let (tx, rx) = mpsc::channel();
351 Ok(HelperThread {
352 inner: Some(imp::spawn_helper(self, rx, Box::new(f))?),
353 tx: Some(tx),
354 })
355 }
356
357 /// Blocks the current thread until a token is acquired.
358 ///
359 /// This is the same as `acquire`, except that it doesn't return an RAII
360 /// helper. If successful the process will need to guarantee that
361 /// `release_raw` is called in the future.
362 pub fn acquire_raw(&self) -> io::Result<()> {
363 self.inner.acquire()?;
364 Ok(())
365 }
366
367 /// Releases a jobserver token back to the original jobserver.
368 ///
369 /// This is intended to be paired with `acquire_raw` if it was called, but
370 /// in some situations it could also be called to relinquish a process's
371 /// implicit token temporarily which is then re-acquired later.
372 pub fn release_raw(&self) -> io::Result<()> {
373 self.inner.release(None)?;
374 Ok(())
375 }
376 }
377
378 impl Drop for Acquired {
379 fn drop(&mut self) {
380 drop(self.client.release(Some(&self.data)));
381 }
382 }
383
384 /// Structure returned from `Client::into_helper_thread` to manage the lifetime
385 /// of the helper thread returned, see those associated docs for more info.
386 #[derive(Debug)]
387 pub struct HelperThread {
388 inner: Option<imp::Helper>,
389 tx: Option<Sender<()>>,
390 }
391
392 impl HelperThread {
393 /// Request that the helper thread acquires a token, eventually calling the
394 /// original closure with a token when it's available.
395 ///
396 /// For more information, see the docs on that function.
397 pub fn request_token(&self) {
398 self.tx.as_ref().unwrap().send(()).unwrap();
399 }
400 }
401
402 impl Drop for HelperThread {
403 fn drop(&mut self) {
404 drop(self.tx.take());
405 self.inner.take().unwrap().join();
406 }
407 }
408
409 #[cfg(unix)]
410 mod imp {
411 extern crate libc;
412
413 use std::fs::File;
414 use std::io::{self, Read, Write};
415 use std::mem;
416 use std::os::unix::prelude::*;
417 use std::process::Command;
418 use std::ptr;
419 use std::sync::atomic::{AtomicBool, Ordering};
420 use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
421 use std::sync::{Arc, Once, ONCE_INIT};
422 use std::thread::{self, Builder, JoinHandle};
423 use std::time::Duration;
424
425 use self::libc::c_int;
426
427 #[derive(Debug)]
428 pub struct Client {
429 read: File,
430 write: File,
431 }
432
433 #[derive(Debug)]
434 pub struct Acquired {
435 byte: u8,
436 }
437
438 impl Client {
439 pub fn new(limit: usize) -> io::Result<Client> {
440 let client = unsafe { Client::mk()? };
441 // I don't think the character written here matters, but I could be
442 // wrong!
443 for _ in 0..limit {
444 (&client.write).write(&[b'|'])?;
445 }
446 info!("created a jobserver: {:?}", client);
447 Ok(client)
448 }
449
450 unsafe fn mk() -> io::Result<Client> {
451 let mut pipes = [0; 2];
452
453 // Attempt atomically-create-with-cloexec if we can on Linux,
454 // detected by using the `syscall` function in `libc` to try to work
455 // with as many kernels/glibc implementations as possible.
456 #[cfg(target_os = "linux")]
457 {
458 static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
459 if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
460 match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
461 -1 => {
462 let err = io::Error::last_os_error();
463 if err.raw_os_error() == Some(libc::ENOSYS) {
464 PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
465 } else {
466 return Err(err);
467 }
468 }
469 _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
470 }
471 }
472 }
473
474 cvt(libc::pipe(pipes.as_mut_ptr()))?;
475 drop(set_cloexec(pipes[0], true));
476 drop(set_cloexec(pipes[1], true));
477 Ok(Client::from_fds(pipes[0], pipes[1]))
478 }
479
480 pub unsafe fn open(s: &str) -> Option<Client> {
481 let mut parts = s.splitn(2, ',');
482 let read = parts.next().unwrap();
483 let write = match parts.next() {
484 Some(s) => s,
485 None => return None,
486 };
487
488 let read = match read.parse() {
489 Ok(n) => n,
490 Err(_) => return None,
491 };
492 let write = match write.parse() {
493 Ok(n) => n,
494 Err(_) => return None,
495 };
496
497 // Ok so we've got two integers that look like file descriptors, but
498 // for extra sanity checking let's see if they actually look like
499 // instances of a pipe before we return the client.
500 //
501 // If we're called from `make` *without* the leading + on our rule
502 // then we'll have `MAKEFLAGS` env vars but won't actually have
503 // access to the file descriptors.
504 if is_valid_fd(read) && is_valid_fd(write) {
505 info!("using env fds {} and {}", read, write);
506 drop(set_cloexec(read, true));
507 drop(set_cloexec(write, true));
508 Some(Client::from_fds(read, write))
509 } else {
510 info!("one of {} or {} isn't a pipe", read, write);
511 None
512 }
513 }
514
515 unsafe fn from_fds(read: c_int, write: c_int) -> Client {
516 Client {
517 read: File::from_raw_fd(read),
518 write: File::from_raw_fd(write),
519 }
520 }
521
522 pub fn acquire(&self) -> io::Result<Acquired> {
523 // We don't actually know if the file descriptor here is set in
524 // blocking or nonblocking mode. AFAIK all released versions of
525 // `make` use blocking fds for the jobserver, but the unreleased
526 // version of `make` doesn't. In the unreleased version jobserver
527 // fds are set to nonblocking and combined with `pselect`
528 // internally.
529 //
530 // Here we try to be compatible with both strategies. We
531 // unconditionally expect the file descriptor to be in nonblocking
532 // mode and if it happens to be in blocking mode then most of this
533 // won't end up actually being necessary!
534 //
535 // We use `poll` here to block this thread waiting for read
536 // readiness, and then afterwards we perform the `read` itself. If
537 // the `read` returns that it would block then we start over and try
538 // again.
539 //
540 // Also note that we explicitly don't handle EINTR here. That's used
541 // to shut us down, so we otherwise punt all errors upwards.
542 unsafe {
543 let mut fd: libc::pollfd = mem::zeroed();
544 fd.fd = self.read.as_raw_fd();
545 fd.events = libc::POLLIN;
546 loop {
547 fd.revents = 0;
548 if libc::poll(&mut fd, 1, -1) == -1 {
549 return Err(io::Error::last_os_error());
550 }
551 if fd.revents == 0 {
552 continue;
553 }
554 let mut buf = [0];
555 match (&self.read).read(&mut buf) {
556 Ok(1) => return Ok(Acquired { byte: buf[0] }),
557 Ok(_) => {
558 return Err(io::Error::new(
559 io::ErrorKind::Other,
560 "early EOF on jobserver pipe",
561 ))
562 }
563 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
564 Err(e) => return Err(e),
565 }
566 }
567 }
568 }
569
570 pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
571 // Note that the fd may be nonblocking but we're going to go ahead
572 // and assume that the writes here are always nonblocking (we can
573 // always quickly release a token). If that turns out to not be the
574 // case we'll get an error anyway!
575 let byte = data.map(|d| d.byte).unwrap_or(b'+');
576 match (&self.write).write(&[byte])? {
577 1 => Ok(()),
578 _ => Err(io::Error::new(
579 io::ErrorKind::Other,
580 "failed to write token back to jobserver",
581 )),
582 }
583 }
584
585 pub fn string_arg(&self) -> String {
586 format!("{},{} -j", self.read.as_raw_fd(), self.write.as_raw_fd())
587 }
588
589 pub fn configure(&self, cmd: &mut Command) {
590 // Here we basically just want to say that in the child process
591 // we'll configure the read/write file descriptors to *not* be
592 // cloexec, so they're inherited across the exec and specified as
593 // integers through `string_arg` above.
594 let read = self.read.as_raw_fd();
595 let write = self.write.as_raw_fd();
596 cmd.before_exec(move || {
597 set_cloexec(read, false)?;
598 set_cloexec(write, false)?;
599 Ok(())
600 });
601 }
602 }
603
604 #[derive(Debug)]
605 pub struct Helper {
606 thread: JoinHandle<()>,
607 quitting: Arc<AtomicBool>,
608 rx_done: Receiver<()>,
609 }
610
611 pub fn spawn_helper(
612 client: ::Client,
613 rx: Receiver<()>,
614 mut f: Box<FnMut(io::Result<::Acquired>) + Send>,
615 ) -> io::Result<Helper> {
616 static USR1_INIT: Once = ONCE_INIT;
617 let mut err = None;
618 USR1_INIT.call_once(|| unsafe {
619 let mut new: libc::sigaction = mem::zeroed();
620 new.sa_sigaction = sigusr1_handler as usize;
621 new.sa_flags = libc::SA_SIGINFO as _;
622 if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
623 err = Some(io::Error::last_os_error());
624 }
625 });
626
627 if let Some(e) = err.take() {
628 return Err(e);
629 }
630
631 let quitting = Arc::new(AtomicBool::new(false));
632 let quitting2 = quitting.clone();
633 let (tx_done, rx_done) = mpsc::channel();
634 let thread = Builder::new().spawn(move || {
635 'outer: for () in rx {
636 loop {
637 let res = client.acquire();
638 if let Err(ref e) = res {
639 if e.kind() == io::ErrorKind::Interrupted {
640 if quitting2.load(Ordering::SeqCst) {
641 break 'outer;
642 } else {
643 continue;
644 }
645 }
646 }
647 f(res);
648 break;
649 }
650 }
651 tx_done.send(()).unwrap();
652 })?;
653
654 Ok(Helper {
655 thread: thread,
656 quitting: quitting,
657 rx_done: rx_done,
658 })
659 }
660
661 impl Helper {
662 pub fn join(self) {
663 self.quitting.store(true, Ordering::SeqCst);
664 let dur = Duration::from_millis(10);
665 let mut done = false;
666 for _ in 0..100 {
667 unsafe {
668 // Ignore the return value here of `pthread_kill`,
669 // apparently on OSX if you kill a dead thread it will
670 // return an error, but on other platforms it may not. In
671 // that sense we don't actually know if this will succeed or
672 // not!
673 libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
674 match self.rx_done.recv_timeout(dur) {
675 Ok(()) | Err(RecvTimeoutError::Disconnected) => {
676 done = true;
677 break;
678 }
679 Err(RecvTimeoutError::Timeout) => {}
680 }
681 }
682 thread::yield_now();
683 }
684 if done {
685 drop(self.thread.join());
686 }
687 }
688 }
689
690 fn is_valid_fd(fd: c_int) -> bool {
691 unsafe {
692 return libc::fcntl(fd, libc::F_GETFD) != -1;
693 }
694 }
695
696 fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
697 unsafe {
698 let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
699 let new = if set {
700 previous | libc::FD_CLOEXEC
701 } else {
702 previous & !libc::FD_CLOEXEC
703 };
704 if new != previous {
705 cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
706 }
707 Ok(())
708 }
709 }
710
711 fn cvt(t: c_int) -> io::Result<c_int> {
712 if t == -1 {
713 Err(io::Error::last_os_error())
714 } else {
715 Ok(t)
716 }
717 }
718
719 extern "C" fn sigusr1_handler(
720 _signum: c_int,
721 _info: *mut libc::siginfo_t,
722 _ptr: *mut libc::c_void,
723 ) {
724 // nothing to do
725 }
726 }
727
728 #[cfg(windows)]
729 mod imp {
730 extern crate rand;
731
732 use std::ffi::CString;
733 use std::io;
734 use std::process::Command;
735 use std::ptr;
736 use std::sync::mpsc::Receiver;
737 use std::sync::Arc;
738 use std::thread::{Builder, JoinHandle};
739
740 #[derive(Debug)]
741 pub struct Client {
742 sem: Handle,
743 name: String,
744 }
745
746 #[derive(Debug)]
747 pub struct Acquired;
748
749 type BOOL = i32;
750 type DWORD = u32;
751 type HANDLE = *mut u8;
752 type LONG = i32;
753
754 const ERROR_ALREADY_EXISTS: DWORD = 183;
755 const FALSE: BOOL = 0;
756 const INFINITE: DWORD = 0xffffffff;
757 const SEMAPHORE_MODIFY_STATE: DWORD = 0x2;
758 const SYNCHRONIZE: DWORD = 0x00100000;
759 const TRUE: BOOL = 1;
760 const WAIT_OBJECT_0: DWORD = 0;
761
762 extern "system" {
763 fn CloseHandle(handle: HANDLE) -> BOOL;
764 fn SetEvent(hEvent: HANDLE) -> BOOL;
765 fn WaitForMultipleObjects(
766 ncount: DWORD,
767 lpHandles: *const HANDLE,
768 bWaitAll: BOOL,
769 dwMilliseconds: DWORD,
770 ) -> DWORD;
771 fn CreateEventA(
772 lpEventAttributes: *mut u8,
773 bManualReset: BOOL,
774 bInitialState: BOOL,
775 lpName: *const i8,
776 ) -> HANDLE;
777 fn ReleaseSemaphore(
778 hSemaphore: HANDLE,
779 lReleaseCount: LONG,
780 lpPreviousCount: *mut LONG,
781 ) -> BOOL;
782 fn CreateSemaphoreA(
783 lpEventAttributes: *mut u8,
784 lInitialCount: LONG,
785 lMaximumCount: LONG,
786 lpName: *const i8,
787 ) -> HANDLE;
788 fn OpenSemaphoreA(
789 dwDesiredAccess: DWORD,
790 bInheritHandle: BOOL,
791 lpName: *const i8,
792 ) -> HANDLE;
793 fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
794 }
795
796 impl Client {
797 pub fn new(limit: usize) -> io::Result<Client> {
798 // Try a bunch of random semaphore names until we get a unique one,
799 // but don't try for too long.
800 //
801 // Note that `limit == 0` is a valid argument above but Windows
802 // won't let us create a semaphore with 0 slots available to it. Get
803 // `limit == 0` working by creating a semaphore instead with one
804 // slot and then immediately acquire it (without ever releaseing it
805 // back).
806 for _ in 0..100 {
807 let mut name = format!("__rust_jobserver_semaphore_{}\0", rand::random::<u32>());
808 unsafe {
809 let create_limit = if limit == 0 { 1 } else { limit };
810 let r = CreateSemaphoreA(
811 ptr::null_mut(),
812 create_limit as LONG,
813 create_limit as LONG,
814 name.as_ptr() as *const _,
815 );
816 if r.is_null() {
817 return Err(io::Error::last_os_error());
818 }
819 let handle = Handle(r);
820
821 let err = io::Error::last_os_error();
822 if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) {
823 continue;
824 }
825 name.pop(); // chop off the trailing nul
826 let client = Client {
827 sem: handle,
828 name: name,
829 };
830 if create_limit != limit {
831 client.acquire()?;
832 }
833 info!("created jobserver {:?}", client);
834 return Ok(client);
835 }
836 }
837
838 Err(io::Error::new(
839 io::ErrorKind::Other,
840 "failed to find a unique name for a semaphore",
841 ))
842 }
843
844 pub unsafe fn open(s: &str) -> Option<Client> {
845 let name = match CString::new(s) {
846 Ok(s) => s,
847 Err(_) => return None,
848 };
849
850 let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, name.as_ptr());
851 if sem.is_null() {
852 info!("failed to open environment semaphore {}", s);
853 None
854 } else {
855 info!("opened environment semaphore {}", s);
856 Some(Client {
857 sem: Handle(sem),
858 name: s.to_string(),
859 })
860 }
861 }
862
863 pub fn acquire(&self) -> io::Result<Acquired> {
864 unsafe {
865 let r = WaitForSingleObject(self.sem.0, INFINITE);
866 if r == WAIT_OBJECT_0 {
867 Ok(Acquired)
868 } else {
869 Err(io::Error::last_os_error())
870 }
871 }
872 }
873
874 pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
875 unsafe {
876 let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut());
877 if r != 0 {
878 Ok(())
879 } else {
880 Err(io::Error::last_os_error())
881 }
882 }
883 }
884
885 pub fn string_arg(&self) -> String {
886 self.name.clone()
887 }
888
889 pub fn configure(&self, _cmd: &mut Command) {
890 // nothing to do here, we gave the name of our semaphore to the
891 // child above
892 }
893 }
894
895 #[derive(Debug)]
896 struct Handle(HANDLE);
897 // HANDLE is a raw ptr, but we're send/sync
898 unsafe impl Sync for Handle {}
899 unsafe impl Send for Handle {}
900
901 impl Drop for Handle {
902 fn drop(&mut self) {
903 unsafe {
904 CloseHandle(self.0);
905 }
906 }
907 }
908
909 #[derive(Debug)]
910 pub struct Helper {
911 event: Arc<Handle>,
912 thread: JoinHandle<()>,
913 }
914
915 pub fn spawn_helper(
916 client: ::Client,
917 rx: Receiver<()>,
918 mut f: Box<FnMut(io::Result<::Acquired>) + Send>,
919 ) -> io::Result<Helper> {
920 let event = unsafe {
921 let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null());
922 if r.is_null() {
923 return Err(io::Error::last_os_error());
924 } else {
925 Handle(r)
926 }
927 };
928 let event = Arc::new(event);
929 let event2 = event.clone();
930 let thread = Builder::new().spawn(move || {
931 let objects = [event2.0, client.inner.sem.0];
932 for () in rx {
933 let r = unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) };
934 if r == WAIT_OBJECT_0 {
935 break;
936 }
937 if r == WAIT_OBJECT_0 + 1 {
938 f(Ok(::Acquired {
939 client: client.inner.clone(),
940 data: Acquired,
941 }))
942 } else {
943 f(Err(io::Error::last_os_error()))
944 }
945 }
946 })?;
947 Ok(Helper {
948 thread: thread,
949 event: event,
950 })
951 }
952
953 impl Helper {
954 pub fn join(self) {
955 let r = unsafe { SetEvent(self.event.0) };
956 if r == 0 {
957 panic!("failed to set event: {}", io::Error::last_os_error());
958 }
959 drop(self.thread.join());
960 }
961 }
962 }
963
964 #[cfg(not(any(unix, windows)))]
965 mod imp {
966 use std::io;
967 use std::process::Command;
968 use std::sync::mpsc::{self, Receiver, SyncSender};
969 use std::sync::Mutex;
970 use std::thread::{Builder, JoinHandle};
971
972 #[derive(Debug)]
973 pub struct Client {
974 tx: SyncSender<()>,
975 rx: Mutex<Receiver<()>>,
976 }
977
978 #[derive(Debug)]
979 pub struct Acquired(());
980
981 impl Client {
982 pub fn new(limit: usize) -> io::Result<Client> {
983 let (tx, rx) = mpsc::sync_channel(limit);
984 for _ in 0..limit {
985 tx.send(()).unwrap();
986 }
987 Ok(Client {
988 tx,
989 rx: Mutex::new(rx),
990 })
991 }
992
993 pub unsafe fn open(_s: &str) -> Option<Client> {
994 None
995 }
996
997 pub fn acquire(&self) -> io::Result<Acquired> {
998 self.rx.lock().unwrap().recv().unwrap();
999 Ok(Acquired(()))
1000 }
1001
1002 pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
1003 self.tx.send(()).unwrap();
1004 Ok(())
1005 }
1006
1007 pub fn string_arg(&self) -> String {
1008 panic!(
1009 "On this platform there is no cross process jobserver support,
1010 so Client::configure is not supported."
1011 );
1012 }
1013
1014 pub fn configure(&self, _cmd: &mut Command) {
1015 unreachable!();
1016 }
1017 }
1018
1019 #[derive(Debug)]
1020 pub struct Helper {
1021 thread: JoinHandle<()>,
1022 }
1023
1024 pub fn spawn_helper(
1025 client: ::Client,
1026 rx: Receiver<()>,
1027 mut f: Box<FnMut(io::Result<::Acquired>) + Send>,
1028 ) -> io::Result<Helper> {
1029 let thread = Builder::new().spawn(move || {
1030 for () in rx {
1031 let res = client.acquire();
1032 f(res);
1033 }
1034 })?;
1035
1036 Ok(Helper { thread: thread })
1037 }
1038
1039 impl Helper {
1040 pub fn join(self) {
1041 drop(self.thread.join());
1042 }
1043 }
1044 }
1045
1046 #[test]
1047 fn no_helper_deadlock() {
1048 let x = crate::Client::new(32).unwrap();
1049 let _y = x.clone();
1050 std::mem::drop(x.into_helper_thread(|_| {}).unwrap());
1051 }