]> git.proxmox.com Git - rustc.git/blame - src/vendor/jobserver/src/lib.rs
New upstream version 1.23.0+dfsg1
[rustc.git] / src / vendor / jobserver / src / lib.rs
CommitLineData
041b39d2
XL
1//! An implementation of the GNU make jobserver.
2//!
3//! This crate is an implementation, in Rust, of the GNU `make` jobserver for
4//! CLI tools that are interoperating with make or otherwise require some form
5//! of parallelism limiting across process boundaries. This was originally
6//! written for usage in Cargo to both (a) work when `cargo` is invoked from
7//! `make` (using `make`'s jobserver) and (b) work when `cargo` invokes build
8//! scripts, exporting a jobserver implementation for `make` processes to
9//! transitively use.
10//!
11//! The jobserver implementation can be found in [detail online][docs] but
12//! basically boils down to a cross-process semaphore. On Unix this is
13//! implemented with the `pipe` syscall and read/write ends of a pipe and on
14//! Windows this is implemented literally with IPC semaphores.
15//!
16//! The jobserver protocol in `make` also dictates when tokens are acquire to
17//! run child work, and clients using this crate should take care to implement
18//! such details to ensure correct interoperation with `make` itself.
19//!
20//! ## Examples
21//!
22//! Connect to a jobserver that was set up by `make` or a different process:
23//!
24//! ```no_run
25//! use jobserver::Client;
26//!
27//! // See API documentation for why this is `unsafe`
28//! let client = match unsafe { Client::from_env() } {
29//! Some(client) => client,
30//! None => panic!("client not configured"),
31//! };
32//! ```
33//!
34//! Acquire and release token from a jobserver:
35//!
36//! ```no_run
37//! use jobserver::Client;
38//!
39//! let client = unsafe { Client::from_env().unwrap() };
40//! let token = client.acquire().unwrap(); // blocks until it is available
41//! drop(token); // releases the token when the work is done
42//! ```
43//!
44//! Create a new jobserver and configure a child process to have access:
45//!
46//! ```
47//! use std::process::Command;
48//! use jobserver::Client;
49//!
50//! let client = Client::new(4).expect("failed to create jobserver");
51//! let mut cmd = Command::new("make");
52//! client.configure(&mut cmd);
53//! ```
54//!
55//! ## Caveats
56//!
57//! This crate makes no attempt to release tokens back to a jobserver on
58//! abnormal exit of a process. If a process which acquires a token is killed
59//! with ctrl-c or some similar signal then tokens will not be released and the
60//! jobserver may be in a corrupt state.
61//!
62//! Note that this is typically ok as ctrl-c means that an entire build process
63//! is being torn down, but it's worth being aware of at least!
64//!
65//! ## Windows caveats
66//!
67//! There appear to be two implementations of `make` on Windows. On MSYS2 one
68//! typically comes as `mingw32-make` and the other as `make` itself. I'm not
69//! personally too familiar with what's going on here, but for jobserver-related
70//! information the `mingw32-make` implementation uses Windows semaphores
71//! whereas the `make` program does not. The `make` program appears to use file
72//! descriptors and I'm not really sure how it works, so this crate is not
73//! compatible with `make` on Windows. It is, however, compatible with
74//! `mingw32-make`.
75//!
76//! [docs]: http://make.mad-scientist.net/papers/jobserver-implementation/
77
78#![deny(missing_docs, missing_debug_implementations)]
79#![doc(html_root_url = "https://docs.rs/jobserver/0.1")]
80
81use std::env;
82use std::io;
83use std::process::Command;
84use std::sync::Arc;
85use std::sync::mpsc::{self, Sender};
86
87/// A client of a jobserver
88///
89/// This structure is the main type exposed by this library, and is where
90/// interaction to a jobserver is configured through. Clients are either created
91/// from scratch in which case the internal semphore is initialied on the spot,
92/// or a client is created from the environment to connect to a jobserver
93/// already created.
94///
95/// Some usage examples can be found in the crate documentation for using a
96/// client.
97///
98/// Note that a `Client` implements the `Clone` trait, and all instances of a
99/// `Client` refer to the same jobserver instance.
100#[derive(Clone, Debug)]
101pub struct Client {
102 inner: Arc<imp::Client>,
103}
104
105/// An acquired token from a jobserver.
106///
107/// This token will be released back to the jobserver when it is dropped and
108/// otherwise represents the ability to spawn off another thread of work.
109#[derive(Debug)]
110pub struct Acquired {
111 client: Arc<imp::Client>,
112 data: imp::Acquired,
113}
114
115impl Client {
116 /// Creates a new jobserver initialized with the given parallelism limit.
117 ///
118 /// A client to the jobserver created will be returned. This client will
119 /// allow at most `limit` tokens to be acquired from it in parallel. More
120 /// calls to `acquire` will cause the calling thread to block.
121 ///
122 /// Note that the created `Client` is not automatically inherited into
123 /// spawned child processes from this program. Manual usage of the
124 /// `configure` function is required for a child process to have access to a
125 /// job server.
126 ///
127 /// # Examples
128 ///
129 /// ```
130 /// use jobserver::Client;
131 ///
132 /// let client = Client::new(4).expect("failed to create jobserver");
133 /// ```
134 ///
135 /// # Errors
136 ///
137 /// Returns an error if any I/O error happens when attempting to create the
138 /// jobserver client.
139 pub fn new(limit: usize) -> io::Result<Client> {
140 Ok(Client {
141 inner: Arc::new(imp::Client::new(limit)?),
142 })
143 }
144
145 /// Attempts to connect to the jobserver specified in this process's
146 /// environment.
147 ///
148 /// When the a `make` executable calls a child process it will configure the
149 /// environment of the child to ensure that it has handles to the jobserver
150 /// it's passing down. This function will attempt to look for these details
151 /// and connect to the jobserver.
152 ///
153 /// Note that the created `Client` is not automatically inherited into
154 /// spawned child processes from this program. Manual usage of the
155 /// `configure` function is required for a child process to have access to a
156 /// job server.
157 ///
158 /// # Return value
159 ///
160 /// If a jobserver was found in the environment and it looks correct then
161 /// `Some` of the connected client will be returned. If no jobserver was
162 /// found then `None` will be returned.
163 ///
164 /// Note that on Unix the `Client` returned **takes ownership of the file
165 /// descriptors specified in the environment**. Jobservers on Unix are
166 /// implemented with `pipe` file descriptors, and they're inherited from
167 /// parent processes. This `Client` returned takes ownership of the file
168 /// descriptors for this process and will close the file descriptors after
169 /// this value is dropped.
170 ///
171 /// Additionally on Unix this function will configure the file descriptors
172 /// with `CLOEXEC` so they're not automatically inherited by spawned
173 /// children.
174 ///
175 /// # Unsafety
176 ///
177 /// This function is `unsafe` to call on Unix specifically as it
178 /// transitively requires usage of the `from_raw_fd` function, which is
179 /// itself unsafe in some circumstances.
180 ///
181 /// It's recommended to call this function very early in the lifetime of a
182 /// program before any other file descriptors are opened. That way you can
183 /// make sure to take ownership properly of the file descriptors passed
184 /// down, if any.
185 ///
186 /// It's generally unsafe to call this function twice in a program if the
187 /// previous invocation returned `Some`.
188 ///
189 /// Note, though, that on Windows it should be safe to call this function
190 /// any number of times.
191 pub unsafe fn from_env() -> Option<Client> {
192 let var = match env::var("CARGO_MAKEFLAGS")
193 .or(env::var("MAKEFLAGS"))
194 .or(env::var("MFLAGS")) {
195 Ok(s) => s,
196 Err(_) => return None,
197 };
198 let mut arg = "--jobserver-fds=";
199 let pos = match var.find(arg) {
200 Some(i) => i,
201 None => {
202 arg = "--jobserver-auth=";
203 match var.find(arg) {
204 Some(i) => i,
205 None => return None,
206 }
207 }
208 };
209
210 let s = var[pos + arg.len()..].split(' ').next().unwrap();
211 imp::Client::open(s).map(|c| {
212 Client { inner: Arc::new(c) }
213 })
214 }
215
216 /// Acquires a token from this jobserver client.
217 ///
218 /// This function will block the calling thread until a new token can be
219 /// acquired from the jobserver.
220 ///
221 /// # Return value
222 ///
223 /// On successful acquisition of a token an instance of `Acquired` is
224 /// returned. This structure, when dropped, will release the token back to
225 /// the jobserver. It's recommended to avoid leaking this value.
226 ///
227 /// # Errors
228 ///
229 /// If an I/O error happens while acquiring a token then this function will
230 /// return immediately with the error. If an error is returned then a token
231 /// was not acquired.
232 pub fn acquire(&self) -> io::Result<Acquired> {
233 let data = try!(self.inner.acquire());
234 Ok(Acquired {
235 client: self.inner.clone(),
236 data: data,
237 })
238 }
239
240 /// Configures a child process to have access to this client's jobserver as
241 /// well.
242 ///
243 /// This function is required to be called to ensure that a jobserver is
244 /// properly inherited to a child process. If this function is *not* called
245 /// then this `Client` will not be accessible in the child process. In other
246 /// words, if not called, then `Client::from_env` will return `None` in the
247 /// child process (or the equivalent of `Child::from_env` that `make` uses).
248 ///
249 /// ## Platform-specific behavior
250 ///
251 /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS` environment
252 /// variables for the child process, and on Unix this will also allow the
253 /// two file descriptors for this client to be inherited to the child.
254 pub fn configure(&self, cmd: &mut Command) {
255 let arg = self.inner.string_arg();
256 // Older implementations of make use `--jobserver-fds` and newer
257 // implementations use `--jobserver-auth`, pass both to try to catch
258 // both implementations.
259 let value = format!("--jobserver-fds={0} --jobserver-auth={0}", arg);
260 cmd.env("CARGO_MAKEFLAGS", &value);
261 self.inner.configure(cmd);
262 }
263
264 /// Converts this `Client` into a helper thread to deal with a blocking
265 /// `acquire` function a little more easily.
266 ///
267 /// The fact that the `acquire` function on `Client` blocks isn't always
268 /// the easiest to work with. Typically you're using a jobserver to
269 /// manage running other events in parallel! This means that you need to
270 /// either (a) wait for an existing job to finish or (b) wait for a
271 /// new token to become available.
272 ///
273 /// Unfortunately the blocking in `acquire` happens at the implementation
274 /// layer of jobservers. On Unix this requires a blocking call to `read`
275 /// and on Windows this requires one of the `WaitFor*` functions. Both
276 /// of these situations aren't the easiest to deal with:
277 ///
278 /// * On Unix there's basically only one way to wake up a `read` early, and
279 /// that's through a signal. This is what the `make` implementation
280 /// itself uses, relying on `SIGCHLD` to wake up a blocking acquisition
281 /// of a new job token. Unfortunately nonblocking I/O is not an option
282 /// here, so it means that "waiting for one of two events" means that
283 /// the latter event must generate a signal! This is not always the case
284 /// on unix for all jobservers.
285 ///
286 /// * On Windows you'd have to basically use the `WaitForMultipleObjects`
287 /// which means that you've got to canonicalize all your event sources
288 /// into a `HANDLE` which also isn't the easiest thing to do
289 /// unfortunately.
290 ///
291 /// This function essentially attempts to ease these limitations by
292 /// converting this `Client` into a helper thread spawned into this
293 /// process. The application can then request that the helper thread
294 /// acquires tokens and the provided closure will be invoked for each token
295 /// acquired.
296 ///
297 /// The intention is that this function can be used to translate the event
298 /// of a token acquisition into an arbitrary user-defined event.
299 ///
300 /// # Arguments
301 ///
302 /// This function will consume the `Client` provided to be transferred to
303 /// the helper thread that is spawned. Additionally a closure `f` is
304 /// provided to be invoked whenever a token is acquired.
305 ///
306 /// This closure is only invoked after calls to
307 /// `HelperThread::request_token` have been made and a token itself has
308 /// been acquired. If an error happens while acquiring the token then
309 /// an error will be yielded to the closure as well.
310 ///
311 /// # Return Value
312 ///
313 /// This function will return an instance of the `HelperThread` structure
314 /// which is used to manage the helper thread associated with this client.
315 /// Through the `HelperThread` you'll request that tokens are acquired.
316 /// When acquired, the closure provided here is invoked.
317 ///
318 /// When the `HelperThread` structure is returned it will be gracefully
319 /// torn down, and the calling thread will be blocked until the thread is
320 /// torn down (which should be prompt).
321 ///
322 /// # Errors
323 ///
324 /// This function may fail due to creation of the helper thread or
325 /// auxiliary I/O objects to manage the helper thread. In any of these
326 /// situations the error is propagated upwards.
327 ///
328 /// # Platform-specific behavior
329 ///
330 /// On Windows this function behaves pretty normally as expected, but on
331 /// Unix the implementation is... a little heinous. As mentioned above
332 /// we're forced into blocking I/O for token acquisition, namely a blocking
333 /// call to `read`. We must be able to unblock this, however, to tear down
334 /// the helper thread gracefully!
335 ///
336 /// Essentially what happens is that we'll send a signal to the helper
337 /// thread spawned and rely on `EINTR` being returned to wake up the helper
338 /// thread. This involves installing a global `SIGUSR1` handler that does
339 /// nothing along with sending signals to that thread. This may cause
340 /// odd behavior in some applications, so it's recommended to review and
341 /// test thoroughly before using this.
342 pub fn into_helper_thread<F>(self, f: F) -> io::Result<HelperThread>
343 where F: FnMut(io::Result<Acquired>) + Send + 'static,
344 {
345 let (tx, rx) = mpsc::channel();
346 Ok(HelperThread {
347 inner: Some(imp::spawn_helper(self, rx, Box::new(f))?),
348 tx: Some(tx),
349 })
350 }
351}
352
353impl Drop for Acquired {
354 fn drop(&mut self) {
355 drop(self.client.release(&self.data));
356 }
357}
358
359/// Structure returned from `Client::into_helper_thread` to manage the lifetime
360/// of the helper thread returned, see those associated docs for more info.
361#[derive(Debug)]
362pub struct HelperThread {
363 inner: Option<imp::Helper>,
364 tx: Option<Sender<()>>,
365}
366
367impl HelperThread {
368 /// Request that the helper thread acquires a token, eventually calling the
369 /// original closure with a token when it's available.
370 ///
371 /// For more information, see the docs on that function.
372 pub fn request_token(&self) {
373 self.tx.as_ref().unwrap().send(()).unwrap();
374 }
375}
376
377impl Drop for HelperThread {
378 fn drop(&mut self) {
379 drop(self.tx.take());
380 self.inner.take().unwrap().join();
381 }
382}
383
384#[cfg(unix)]
385mod imp {
386 extern crate libc;
387
388 use std::fs::File;
389 use std::io::{self, Read, Write};
390 use std::mem;
391 use std::os::unix::prelude::*;
392 use std::process::Command;
393 use std::ptr;
394 use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
395 use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
396 use std::sync::{Arc, Once, ONCE_INIT};
397 use std::thread::{JoinHandle, Builder};
398 use std::time::Duration;
399
400 use self::libc::c_int;
401
402 #[derive(Debug)]
403 pub struct Client {
404 read: File,
405 write: File,
406 }
407
408 #[derive(Debug)]
409 pub struct Acquired {
410 byte: u8,
411 }
412
413 impl Client {
414 pub fn new(limit: usize) -> io::Result<Client> {
415 let client = unsafe { Client::mk()? };
416 // I don't think the character written here matters, but I could be
417 // wrong!
418 for _ in 0..limit {
419 (&client.write).write(&[b'|'])?;
420 }
421 Ok(client)
422 }
423
424 unsafe fn mk() -> io::Result<Client> {
425 let mut pipes = [0; 2];
426
427 // Attempt atomically-create-with-cloexec if we can
428 if cfg!(target_os = "linux") {
429 if let Some(pipe2) = pipe2() {
430 cvt(pipe2(pipes.as_mut_ptr(), libc::O_CLOEXEC))?;
431 return Ok(Client::from_fds(pipes[0], pipes[1]))
432 }
433 }
434
435 cvt(libc::pipe(pipes.as_mut_ptr()))?;
436 drop(set_cloexec(pipes[0], true));
437 drop(set_cloexec(pipes[1], true));
438 Ok(Client::from_fds(pipes[0], pipes[1]))
439 }
440
441 pub unsafe fn open(s: &str) -> Option<Client> {
442 let mut parts = s.splitn(2, ',');
443 let read = parts.next().unwrap();
444 let write = match parts.next() {
445 Some(s) => s,
446 None => return None,
447 };
448
449 let read = match read.parse() {
450 Ok(n) => n,
451 Err(_) => return None,
452 };
453 let write = match write.parse() {
454 Ok(n) => n,
455 Err(_) => return None,
456 };
457
458 // Ok so we've got two integers that look like file descriptors, but
459 // for extra sanity checking let's see if they actually look like
460 // instances of a pipe before we return the client.
461 //
462 // If we're called from `make` *without* the leading + on our rule
463 // then we'll have `MAKEFLAGS` env vars but won't actually have
464 // access to the file descriptors.
465 if is_pipe(read) && is_pipe(write) {
466 drop(set_cloexec(read, true));
467 drop(set_cloexec(write, true));
468 Some(Client::from_fds(read, write))
469 } else {
470 None
471 }
472 }
473
474 unsafe fn from_fds(read: c_int, write: c_int) -> Client {
475 Client {
476 read: File::from_raw_fd(read),
477 write: File::from_raw_fd(write),
478 }
479 }
480
481 pub fn acquire(&self) -> io::Result<Acquired> {
abe05a73
XL
482 // We don't actually know if the file descriptor here is set in
483 // blocking or nonblocking mode. AFAIK all released versions of
484 // `make` use blocking fds for the jobserver, but the unreleased
485 // version of `make` doesn't. In the unreleased version jobserver
486 // fds are set to nonblocking and combined with `pselect`
487 // internally.
488 //
489 // Here we try to be compatible with both strategies. We
490 // unconditionally expect the file descriptor to be in nonblocking
491 // mode and if it happens to be in blocking mode then most of this
492 // won't end up actually being necessary!
493 //
494 // We use `poll` here to block this thread waiting for read
495 // readiness, and then afterwards we perform the `read` itself. If
496 // the `read` returns that it would block then we start over and try
497 // again.
498 //
499 // Also note that we explicitly don't handle EINTR here. That's used
500 // to shut us down, so we otherwise punt all errors upwards.
501 unsafe {
502 let mut fd: libc::pollfd = mem::zeroed();
503 fd.fd = self.read.as_raw_fd();
504 fd.events = libc::POLLIN;
505 loop {
506 fd.revents = 0;
507 if libc::poll(&mut fd, 1, -1) == -1 {
508 return Err(io::Error::last_os_error())
509 }
510 if fd.revents == 0 {
511 continue
512 }
513 let mut buf = [0];
514 match (&self.read).read(&mut buf) {
515 Ok(1) => return Ok(Acquired { byte: buf[0] }),
516 Ok(_) => {
517 return Err(io::Error::new(io::ErrorKind::Other,
518 "early EOF on jobserver pipe"))
519 }
520 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
521 Err(e) => return Err(e),
522 }
523 }
041b39d2
XL
524 }
525 }
526
527 pub fn release(&self, data: &Acquired) -> io::Result<()> {
abe05a73
XL
528 // Note that the fd may be nonblocking but we're going to go ahead
529 // and assume that the writes here are always nonblocking (we can
530 // always quickly release a token). If that turns out to not be the
531 // case we'll get an error anyway!
041b39d2
XL
532 match (&self.write).write(&[data.byte])? {
533 1 => Ok(()),
534 _ => Err(io::Error::new(io::ErrorKind::Other,
535 "failed to write token back to jobserver")),
536 }
537 }
538
539 pub fn string_arg(&self) -> String {
540 format!("{},{} -j", self.read.as_raw_fd(), self.write.as_raw_fd())
541 }
542
543 pub fn configure(&self, cmd: &mut Command) {
544 // Here we basically just want to say that in the child process
545 // we'll configure the read/write file descriptors to *not* be
546 // cloexec, so they're inherited across the exec and specified as
547 // integers through `string_arg` above.
548 let read = self.read.as_raw_fd();
549 let write = self.write.as_raw_fd();
550 cmd.before_exec(move || {
551 set_cloexec(read, false)?;
552 set_cloexec(write, false)?;
553 Ok(())
554 });
555 }
556 }
557
558 #[derive(Debug)]
559 pub struct Helper {
560 thread: JoinHandle<()>,
561 quitting: Arc<AtomicBool>,
562 rx_done: Receiver<()>,
563 }
564
565 pub fn spawn_helper(client: ::Client,
566 rx: Receiver<()>,
567 mut f: Box<FnMut(io::Result<::Acquired>) + Send>)
568 -> io::Result<Helper>
569 {
570 static USR1_INIT: Once = ONCE_INIT;
571 let mut err = None;
572 USR1_INIT.call_once(|| unsafe {
573 let mut new: libc::sigaction = mem::zeroed();
574 new.sa_sigaction = sigusr1_handler as usize;
575 new.sa_flags = libc::SA_SIGINFO as _;
576 if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
577 err = Some(io::Error::last_os_error());
578 }
579 });
580
581 if let Some(e) = err.take() {
582 return Err(e)
583 }
584
585 let quitting = Arc::new(AtomicBool::new(false));
586 let quitting2 = quitting.clone();
587 let (tx_done, rx_done) = mpsc::channel();
588 let thread = Builder::new().spawn(move || {
589 'outer:
590 for () in rx {
591 loop {
592 let res = client.acquire();
593 if let Err(ref e) = res {
594 if e.kind() == io::ErrorKind::Interrupted {
595 if quitting2.load(Ordering::SeqCst) {
596 break 'outer
597 } else {
598 continue
599 }
600 }
601 }
602 f(res);
603 break
604 }
605 }
606 tx_done.send(()).unwrap();
607 })?;
608
609 Ok(Helper {
610 thread: thread,
611 quitting: quitting,
612 rx_done: rx_done,
613 })
614 }
615
616 impl Helper {
617 pub fn join(self) {
618 self.quitting.store(true, Ordering::SeqCst);
619 let dur = Duration::from_millis(10);
620 let mut done = false;
621 for _ in 0..100 {
622 unsafe {
623 // Ignore the return value here of `pthread_kill`,
624 // apparently on OSX if you kill a dead thread it will
625 // return an error, but on other platforms it may not. In
626 // that sense we don't actually know if this will succeed or
627 // not!
628 libc::pthread_kill(self.thread.as_pthread_t(), libc::SIGUSR1);
629 match self.rx_done.recv_timeout(dur) {
630 Ok(()) |
631 Err(RecvTimeoutError::Disconnected) => {
632 done = true;
633 break
634 }
635 Err(RecvTimeoutError::Timeout) => {}
636 }
637 }
638 }
639 if !done {
640 panic!("failed to shut down worker thread");
641 }
642 drop(self.thread.join());
643 }
644 }
645
646 #[allow(unused_assignments)]
647 fn is_pipe(fd: c_int) -> bool {
648 unsafe {
649 let mut stat = mem::zeroed();
650 if libc::fstat(fd, &mut stat) == 0 {
651 // On android arm and i686 mode_t is u16 and st_mode is u32,
652 // this generates a type mismatch when S_IFIFO (declared as mode_t)
653 // is used in operations with st_mode, so we use this workaround
654 // to get the value of S_IFIFO with the same type of st_mode.
655 let mut s_ififo = stat.st_mode;
656 s_ififo = libc::S_IFIFO as _;
657 stat.st_mode & s_ififo == s_ififo
658 } else {
659 false
660 }
661 }
662 }
663
664 fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
665 unsafe {
666 let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
667 let new = if set {
668 previous | libc::FD_CLOEXEC
669 } else {
670 previous & !libc::FD_CLOEXEC
671 };
672 if new != previous {
673 cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
674 }
675 Ok(())
676 }
677 }
678
679 fn cvt(t: c_int) -> io::Result<c_int> {
680 if t == -1 {
681 Err(io::Error::last_os_error())
682 } else {
683 Ok(t)
684 }
685 }
686
687 unsafe fn pipe2() -> Option<&'static fn(*mut c_int, c_int) -> c_int> {
688 static PIPE2: AtomicUsize = ATOMIC_USIZE_INIT;
689
690 if PIPE2.load(Ordering::SeqCst) == 0 {
691 let name = "pipe2\0";
692 let n = match libc::dlsym(libc::RTLD_DEFAULT, name.as_ptr() as *const _) as usize {
693 0 => 1,
694 n => n,
695 };
696 PIPE2.store(n, Ordering::SeqCst);
697 }
698 if PIPE2.load(Ordering::SeqCst) == 1 {
699 None
700 } else {
701 mem::transmute(&PIPE2)
702 }
703 }
704
705 extern fn sigusr1_handler(_signum: c_int,
706 _info: *mut libc::siginfo_t,
707 _ptr: *mut libc::c_void) {
708 // nothing to do
709 }
710}
711
712#[cfg(windows)]
713mod imp {
714 extern crate rand;
715
716 use std::ffi::CString;
717 use std::io;
718 use std::process::Command;
719 use std::ptr;
720 use std::sync::Arc;
721 use std::sync::mpsc::Receiver;
722 use std::thread::{Builder, JoinHandle};
723
724 #[derive(Debug)]
725 pub struct Client {
726 sem: Handle,
727 name: String,
728 }
729
730 #[derive(Debug)]
731 pub struct Acquired;
732
733 type BOOL = i32;
734 type DWORD = u32;
735 type HANDLE = *mut u8;
736 type LONG = i32;
737
738 const ERROR_ALREADY_EXISTS: DWORD = 183;
739 const FALSE: BOOL = 0;
740 const INFINITE: DWORD = 0xffffffff;
741 const SEMAPHORE_MODIFY_STATE: DWORD = 0x2;
742 const SYNCHRONIZE: DWORD = 0x00100000;
743 const TRUE: BOOL = 1;
744 const WAIT_OBJECT_0: DWORD = 0;
745
746 extern "system" {
747 fn CloseHandle(handle: HANDLE) -> BOOL;
748 fn SetEvent(hEvent: HANDLE) -> BOOL;
749 fn WaitForMultipleObjects(ncount: DWORD,
750 lpHandles: *const HANDLE,
751 bWaitAll: BOOL,
752 dwMilliseconds: DWORD) -> DWORD;
753 fn CreateEventA(lpEventAttributes: *mut u8,
754 bManualReset: BOOL,
755 bInitialState: BOOL,
756 lpName: *const i8) -> HANDLE;
757 fn ReleaseSemaphore(hSemaphore: HANDLE,
758 lReleaseCount: LONG,
759 lpPreviousCount: *mut LONG) -> BOOL;
760 fn CreateSemaphoreA(lpEventAttributes: *mut u8,
761 lInitialCount: LONG,
762 lMaximumCount: LONG,
763 lpName: *const i8) -> HANDLE;
764 fn OpenSemaphoreA(dwDesiredAccess: DWORD,
765 bInheritHandle: BOOL,
766 lpName: *const i8) -> HANDLE;
767 fn WaitForSingleObject(hHandle: HANDLE,
768 dwMilliseconds: DWORD) -> DWORD;
769 }
770
771 impl Client {
772 pub fn new(limit: usize) -> io::Result<Client> {
773 // Try a bunch of random semaphore names until we get a unique one,
774 // but don't try for too long.
775 //
776 // Note that `limit == 0` is a valid argument above but Windows
777 // won't let us create a semaphore with 0 slots available to it. Get
778 // `limit == 0` working by creating a semaphore instead with one
779 // slot and then immediately acquire it (without ever releaseing it
780 // back).
781 for _ in 0..100 {
782 let mut name = format!("__rust_jobserver_semaphore_{}\0",
783 rand::random::<u32>());
784 unsafe {
785 let create_limit = if limit == 0 {1} else {limit};
786 let r = CreateSemaphoreA(ptr::null_mut(),
787 create_limit as LONG,
788 create_limit as LONG,
789 name.as_ptr() as *const _);
790 if r.is_null() {
791 return Err(io::Error::last_os_error())
792 }
793 let handle = Handle(r);
794
795 let err = io::Error::last_os_error();
796 if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) {
797 continue
798 }
799 name.pop(); // chop off the trailing nul
800 let client = Client {
801 sem: handle,
802 name: name,
803 };
804 if create_limit != limit {
805 client.acquire()?;
806 }
807 return Ok(client)
808 }
809 }
810
811 Err(io::Error::new(io::ErrorKind::Other,
812 "failed to find a unique name for a semaphore"))
813 }
814
815 pub unsafe fn open(s: &str) -> Option<Client> {
816 let name = match CString::new(s) {
817 Ok(s) => s,
818 Err(_) => return None,
819 };
820
821 let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE,
822 FALSE,
823 name.as_ptr());
824 if sem.is_null() {
825 None
826 } else {
827 Some(Client {
828 sem: Handle(sem),
829 name: s.to_string(),
830 })
831 }
832 }
833
834 pub fn acquire(&self) -> io::Result<Acquired> {
835 unsafe {
836 let r = WaitForSingleObject(self.sem.0, INFINITE);
837 if r == WAIT_OBJECT_0 {
838 Ok(Acquired)
839 } else {
840 Err(io::Error::last_os_error())
841 }
842 }
843 }
844
845 pub fn release(&self, _data: &Acquired) -> io::Result<()> {
846 unsafe {
847 let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut());
848 if r != 0 {
849 Ok(())
850 } else {
851 Err(io::Error::last_os_error())
852 }
853 }
854 }
855
856 pub fn string_arg(&self) -> String {
857 self.name.clone()
858 }
859
860 pub fn configure(&self, _cmd: &mut Command) {
861 // nothing to do here, we gave the name of our semaphore to the
862 // child above
863 }
864 }
865
866 #[derive(Debug)]
867 struct Handle(HANDLE);
868 // HANDLE is a raw ptr, but we're send/sync
869 unsafe impl Sync for Handle {}
870 unsafe impl Send for Handle {}
871
872 impl Drop for Handle {
873 fn drop(&mut self) {
874 unsafe {
875 CloseHandle(self.0);
876 }
877 }
878 }
879
880 #[derive(Debug)]
881 pub struct Helper {
882 event: Arc<Handle>,
883 thread: JoinHandle<()>,
884 }
885
886 pub fn spawn_helper(client: ::Client,
887 rx: Receiver<()>,
888 mut f: Box<FnMut(io::Result<::Acquired>) + Send>)
889 -> io::Result<Helper>
890 {
891 let event = unsafe {
892 let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null());
893 if r.is_null() {
894 return Err(io::Error::last_os_error())
895 } else {
896 Handle(r)
897 }
898 };
899 let event = Arc::new(event);
900 let event2 = event.clone();
901 let thread = Builder::new().spawn(move || {
902 let objects = [event2.0, client.inner.sem.0];
903 for () in rx {
904 let r = unsafe {
905 WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE)
906 };
907 if r == WAIT_OBJECT_0 {
908 break
909 }
910 if r == WAIT_OBJECT_0 + 1 {
911 f(Ok(::Acquired {
912 client: client.inner.clone(),
913 data: Acquired,
914 }))
915 } else {
916 f(Err(io::Error::last_os_error()))
917 }
918 }
919 })?;
920 Ok(Helper {
921 thread: thread,
922 event: event,
923 })
924 }
925
926 impl Helper {
927 pub fn join(self) {
928 let r = unsafe { SetEvent(self.event.0) };
929 if r == 0 {
930 panic!("failed to set event: {}", io::Error::last_os_error());
931 }
932 drop(self.thread.join());
933 }
934 }
935}