1 //! An implementation of the GNU make jobserver.
3 //! This crate is an implementation, in Rust, of the GNU `make` jobserver for
4 //! CLI tools that are interoperating with make or otherwise require some form
5 //! of parallelism limiting across process boundaries. This was originally
6 //! written for usage in Cargo to both (a) work when `cargo` is invoked from
7 //! `make` (using `make`'s jobserver) and (b) work when `cargo` invokes build
8 //! scripts, exporting a jobserver implementation for `make` processes to
11 //! The jobserver implementation can be found in [detail online][docs] but
12 //! basically boils down to a cross-process semaphore. On Unix this is
13 //! implemented with the `pipe` syscall and read/write ends of a pipe and on
14 //! Windows this is implemented literally with IPC semaphores.
16 //! The jobserver protocol in `make` also dictates when tokens are acquire to
17 //! run child work, and clients using this crate should take care to implement
18 //! such details to ensure correct interoperation with `make` itself.
22 //! Connect to a jobserver that was set up by `make` or a different process:
25 //! use jobserver::Client;
27 //! // See API documentation for why this is `unsafe`
28 //! let client = match unsafe { Client::from_env() } {
29 //! Some(client) => client,
30 //! None => panic!("client not configured"),
34 //! Acquire and release token from a jobserver:
37 //! use jobserver::Client;
39 //! let client = unsafe { Client::from_env().unwrap() };
40 //! let token = client.acquire().unwrap(); // blocks until it is available
41 //! drop(token); // releases the token when the work is done
44 //! Create a new jobserver and configure a child process to have access:
47 //! use std::process::Command;
48 //! use jobserver::Client;
50 //! let client = Client::new(4).expect("failed to create jobserver");
51 //! let mut cmd = Command::new("make");
52 //! client.configure(&mut cmd);
57 //! This crate makes no attempt to release tokens back to a jobserver on
58 //! abnormal exit of a process. If a process which acquires a token is killed
59 //! with ctrl-c or some similar signal then tokens will not be released and the
60 //! jobserver may be in a corrupt state.
62 //! Note that this is typically ok as ctrl-c means that an entire build process
63 //! is being torn down, but it's worth being aware of at least!
65 //! ## Windows caveats
67 //! There appear to be two implementations of `make` on Windows. On MSYS2 one
68 //! typically comes as `mingw32-make` and the other as `make` itself. I'm not
69 //! personally too familiar with what's going on here, but for jobserver-related
70 //! information the `mingw32-make` implementation uses Windows semaphores
71 //! whereas the `make` program does not. The `make` program appears to use file
72 //! descriptors and I'm not really sure how it works, so this crate is not
73 //! compatible with `make` on Windows. It is, however, compatible with
76 //! [docs]: http://make.mad-scientist.net/papers/jobserver-implementation/
78 #![deny(missing_docs, missing_debug_implementations)]
79 #![doc(html_root_url = "https://docs.rs/jobserver/0.1")]
86 use std
::process
::Command
;
87 use std
::sync
::mpsc
::{self, Sender}
;
90 /// A client of a jobserver
92 /// This structure is the main type exposed by this library, and is where
93 /// interaction to a jobserver is configured through. Clients are either created
94 /// from scratch in which case the internal semphore is initialied on the spot,
95 /// or a client is created from the environment to connect to a jobserver
98 /// Some usage examples can be found in the crate documentation for using a
101 /// Note that a `Client` implements the `Clone` trait, and all instances of a
102 /// `Client` refer to the same jobserver instance.
103 #[derive(Clone, Debug)]
105 inner
: Arc
<imp
::Client
>,
108 /// An acquired token from a jobserver.
110 /// This token will be released back to the jobserver when it is dropped and
111 /// otherwise represents the ability to spawn off another thread of work.
113 pub struct Acquired
{
114 client
: Arc
<imp
::Client
>,
119 /// Creates a new jobserver initialized with the given parallelism limit.
121 /// A client to the jobserver created will be returned. This client will
122 /// allow at most `limit` tokens to be acquired from it in parallel. More
123 /// calls to `acquire` will cause the calling thread to block.
125 /// Note that the created `Client` is not automatically inherited into
126 /// spawned child processes from this program. Manual usage of the
127 /// `configure` function is required for a child process to have access to a
133 /// use jobserver::Client;
135 /// let client = Client::new(4).expect("failed to create jobserver");
140 /// Returns an error if any I/O error happens when attempting to create the
141 /// jobserver client.
142 pub fn new(limit
: usize) -> io
::Result
<Client
> {
144 inner
: Arc
::new(imp
::Client
::new(limit
)?
),
148 /// Attempts to connect to the jobserver specified in this process's
151 /// When the a `make` executable calls a child process it will configure the
152 /// environment of the child to ensure that it has handles to the jobserver
153 /// it's passing down. This function will attempt to look for these details
154 /// and connect to the jobserver.
156 /// Note that the created `Client` is not automatically inherited into
157 /// spawned child processes from this program. Manual usage of the
158 /// `configure` function is required for a child process to have access to a
163 /// If a jobserver was found in the environment and it looks correct then
164 /// `Some` of the connected client will be returned. If no jobserver was
165 /// found then `None` will be returned.
167 /// Note that on Unix the `Client` returned **takes ownership of the file
168 /// descriptors specified in the environment**. Jobservers on Unix are
169 /// implemented with `pipe` file descriptors, and they're inherited from
170 /// parent processes. This `Client` returned takes ownership of the file
171 /// descriptors for this process and will close the file descriptors after
172 /// this value is dropped.
174 /// Additionally on Unix this function will configure the file descriptors
175 /// with `CLOEXEC` so they're not automatically inherited by spawned
180 /// This function is `unsafe` to call on Unix specifically as it
181 /// transitively requires usage of the `from_raw_fd` function, which is
182 /// itself unsafe in some circumstances.
184 /// It's recommended to call this function very early in the lifetime of a
185 /// program before any other file descriptors are opened. That way you can
186 /// make sure to take ownership properly of the file descriptors passed
189 /// It's generally unsafe to call this function twice in a program if the
190 /// previous invocation returned `Some`.
192 /// Note, though, that on Windows it should be safe to call this function
193 /// any number of times.
194 pub unsafe fn from_env() -> Option
<Client
> {
195 let var
= match env
::var("CARGO_MAKEFLAGS")
196 .or(env
::var("MAKEFLAGS"))
197 .or(env
::var("MFLAGS"))
200 Err(_
) => return None
,
202 let mut arg
= "--jobserver-fds=";
203 let pos
= match var
.find(arg
) {
206 arg
= "--jobserver-auth=";
207 match var
.find(arg
) {
214 let s
= var
[pos
+ arg
.len()..].split(' '
).next().unwrap();
215 imp
::Client
::open(s
).map(|c
| Client { inner: Arc::new(c) }
)
218 /// Acquires a token from this jobserver client.
220 /// This function will block the calling thread until a new token can be
221 /// acquired from the jobserver.
225 /// On successful acquisition of a token an instance of `Acquired` is
226 /// returned. This structure, when dropped, will release the token back to
227 /// the jobserver. It's recommended to avoid leaking this value.
231 /// If an I/O error happens while acquiring a token then this function will
232 /// return immediately with the error. If an error is returned then a token
233 /// was not acquired.
234 pub fn acquire(&self) -> io
::Result
<Acquired
> {
235 let data
= try
!(self.inner
.acquire());
237 client
: self.inner
.clone(),
242 /// Configures a child process to have access to this client's jobserver as
245 /// This function is required to be called to ensure that a jobserver is
246 /// properly inherited to a child process. If this function is *not* called
247 /// then this `Client` will not be accessible in the child process. In other
248 /// words, if not called, then `Client::from_env` will return `None` in the
249 /// child process (or the equivalent of `Child::from_env` that `make` uses).
251 /// ## Platform-specific behavior
253 /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS` environment
254 /// variables for the child process, and on Unix this will also allow the
255 /// two file descriptors for this client to be inherited to the child.
257 /// On platforms other than Unix and Windows this panics.
258 pub fn configure(&self, cmd
: &mut Command
) {
259 let arg
= self.inner
.string_arg();
260 // Older implementations of make use `--jobserver-fds` and newer
261 // implementations use `--jobserver-auth`, pass both to try to catch
262 // both implementations.
263 let value
= format
!("--jobserver-fds={0} --jobserver-auth={0}", arg
);
264 cmd
.env("CARGO_MAKEFLAGS", &value
);
265 self.inner
.configure(cmd
);
268 /// Converts this `Client` into a helper thread to deal with a blocking
269 /// `acquire` function a little more easily.
271 /// The fact that the `acquire` function on `Client` blocks isn't always
272 /// the easiest to work with. Typically you're using a jobserver to
273 /// manage running other events in parallel! This means that you need to
274 /// either (a) wait for an existing job to finish or (b) wait for a
275 /// new token to become available.
277 /// Unfortunately the blocking in `acquire` happens at the implementation
278 /// layer of jobservers. On Unix this requires a blocking call to `read`
279 /// and on Windows this requires one of the `WaitFor*` functions. Both
280 /// of these situations aren't the easiest to deal with:
282 /// * On Unix there's basically only one way to wake up a `read` early, and
283 /// that's through a signal. This is what the `make` implementation
284 /// itself uses, relying on `SIGCHLD` to wake up a blocking acquisition
285 /// of a new job token. Unfortunately nonblocking I/O is not an option
286 /// here, so it means that "waiting for one of two events" means that
287 /// the latter event must generate a signal! This is not always the case
288 /// on unix for all jobservers.
290 /// * On Windows you'd have to basically use the `WaitForMultipleObjects`
291 /// which means that you've got to canonicalize all your event sources
292 /// into a `HANDLE` which also isn't the easiest thing to do
295 /// This function essentially attempts to ease these limitations by
296 /// converting this `Client` into a helper thread spawned into this
297 /// process. The application can then request that the helper thread
298 /// acquires tokens and the provided closure will be invoked for each token
301 /// The intention is that this function can be used to translate the event
302 /// of a token acquisition into an arbitrary user-defined event.
306 /// This function will consume the `Client` provided to be transferred to
307 /// the helper thread that is spawned. Additionally a closure `f` is
308 /// provided to be invoked whenever a token is acquired.
310 /// This closure is only invoked after calls to
311 /// `HelperThread::request_token` have been made and a token itself has
312 /// been acquired. If an error happens while acquiring the token then
313 /// an error will be yielded to the closure as well.
317 /// This function will return an instance of the `HelperThread` structure
318 /// which is used to manage the helper thread associated with this client.
319 /// Through the `HelperThread` you'll request that tokens are acquired.
320 /// When acquired, the closure provided here is invoked.
322 /// When the `HelperThread` structure is returned it will be gracefully
323 /// torn down, and the calling thread will be blocked until the thread is
324 /// torn down (which should be prompt).
328 /// This function may fail due to creation of the helper thread or
329 /// auxiliary I/O objects to manage the helper thread. In any of these
330 /// situations the error is propagated upwards.
332 /// # Platform-specific behavior
334 /// On Windows this function behaves pretty normally as expected, but on
335 /// Unix the implementation is... a little heinous. As mentioned above
336 /// we're forced into blocking I/O for token acquisition, namely a blocking
337 /// call to `read`. We must be able to unblock this, however, to tear down
338 /// the helper thread gracefully!
340 /// Essentially what happens is that we'll send a signal to the helper
341 /// thread spawned and rely on `EINTR` being returned to wake up the helper
342 /// thread. This involves installing a global `SIGUSR1` handler that does
343 /// nothing along with sending signals to that thread. This may cause
344 /// odd behavior in some applications, so it's recommended to review and
345 /// test thoroughly before using this.
346 pub fn into_helper_thread
<F
>(self, f
: F
) -> io
::Result
<HelperThread
>
348 F
: FnMut(io
::Result
<Acquired
>) + Send
+ '
static,
350 let (tx
, rx
) = mpsc
::channel();
352 inner
: Some(imp
::spawn_helper(self, rx
, Box
::new(f
))?
),
357 /// Blocks the current thread until a token is acquired.
359 /// This is the same as `acquire`, except that it doesn't return an RAII
360 /// helper. If successful the process will need to guarantee that
361 /// `release_raw` is called in the future.
362 pub fn acquire_raw(&self) -> io
::Result
<()> {
363 self.inner
.acquire()?
;
367 /// Releases a jobserver token back to the original jobserver.
369 /// This is intended to be paired with `acquire_raw` if it was called, but
370 /// in some situations it could also be called to relinquish a process's
371 /// implicit token temporarily which is then re-acquired later.
372 pub fn release_raw(&self) -> io
::Result
<()> {
373 self.inner
.release(None
)?
;
378 impl Drop
for Acquired
{
380 drop(self.client
.release(Some(&self.data
)));
384 /// Structure returned from `Client::into_helper_thread` to manage the lifetime
385 /// of the helper thread returned, see those associated docs for more info.
387 pub struct HelperThread
{
388 inner
: Option
<imp
::Helper
>,
389 tx
: Option
<Sender
<()>>,
393 /// Request that the helper thread acquires a token, eventually calling the
394 /// original closure with a token when it's available.
396 /// For more information, see the docs on that function.
397 pub fn request_token(&self) {
398 self.tx
.as_ref().unwrap().send(()).unwrap();
402 impl Drop
for HelperThread
{
404 drop(self.tx
.take());
405 self.inner
.take().unwrap().join();
414 use std
::io
::{self, Read, Write}
;
416 use std
::os
::unix
::prelude
::*;
417 use std
::process
::Command
;
419 use std
::sync
::atomic
::{AtomicBool, Ordering}
;
420 use std
::sync
::mpsc
::{self, Receiver, RecvTimeoutError}
;
421 use std
::sync
::{Arc, Once, ONCE_INIT}
;
422 use std
::thread
::{self, Builder, JoinHandle}
;
423 use std
::time
::Duration
;
425 use self::libc
::c_int
;
434 pub struct Acquired
{
439 pub fn new(limit
: usize) -> io
::Result
<Client
> {
440 let client
= unsafe { Client::mk()? }
;
441 // I don't think the character written here matters, but I could be
444 (&client
.write
).write(&[b'
|'
])?
;
446 info
!("created a jobserver: {:?}", client
);
450 unsafe fn mk() -> io
::Result
<Client
> {
451 let mut pipes
= [0; 2];
453 // Attempt atomically-create-with-cloexec if we can on Linux,
454 // detected by using the `syscall` function in `libc` to try to work
455 // with as many kernels/glibc implementations as possible.
456 #[cfg(target_os = "linux")]
458 static PIPE2_AVAILABLE
: AtomicBool
= AtomicBool
::new(true);
459 if PIPE2_AVAILABLE
.load(Ordering
::SeqCst
) {
460 match libc
::syscall(libc
::SYS_pipe2
, pipes
.as_mut_ptr(), libc
::O_CLOEXEC
) {
462 let err
= io
::Error
::last_os_error();
463 if err
.raw_os_error() == Some(libc
::ENOSYS
) {
464 PIPE2_AVAILABLE
.store(false, Ordering
::SeqCst
);
469 _
=> return Ok(Client
::from_fds(pipes
[0], pipes
[1])),
474 cvt(libc
::pipe(pipes
.as_mut_ptr()))?
;
475 drop(set_cloexec(pipes
[0], true));
476 drop(set_cloexec(pipes
[1], true));
477 Ok(Client
::from_fds(pipes
[0], pipes
[1]))
480 pub unsafe fn open(s
: &str) -> Option
<Client
> {
481 let mut parts
= s
.splitn(2, '
,'
);
482 let read
= parts
.next().unwrap();
483 let write
= match parts
.next() {
488 let read
= match read
.parse() {
490 Err(_
) => return None
,
492 let write
= match write
.parse() {
494 Err(_
) => return None
,
497 // Ok so we've got two integers that look like file descriptors, but
498 // for extra sanity checking let's see if they actually look like
499 // instances of a pipe before we return the client.
501 // If we're called from `make` *without* the leading + on our rule
502 // then we'll have `MAKEFLAGS` env vars but won't actually have
503 // access to the file descriptors.
504 if is_valid_fd(read
) && is_valid_fd(write
) {
505 info
!("using env fds {} and {}", read
, write
);
506 drop(set_cloexec(read
, true));
507 drop(set_cloexec(write
, true));
508 Some(Client
::from_fds(read
, write
))
510 info
!("one of {} or {} isn't a pipe", read
, write
);
515 unsafe fn from_fds(read
: c_int
, write
: c_int
) -> Client
{
517 read
: File
::from_raw_fd(read
),
518 write
: File
::from_raw_fd(write
),
522 pub fn acquire(&self) -> io
::Result
<Acquired
> {
523 // We don't actually know if the file descriptor here is set in
524 // blocking or nonblocking mode. AFAIK all released versions of
525 // `make` use blocking fds for the jobserver, but the unreleased
526 // version of `make` doesn't. In the unreleased version jobserver
527 // fds are set to nonblocking and combined with `pselect`
530 // Here we try to be compatible with both strategies. We
531 // unconditionally expect the file descriptor to be in nonblocking
532 // mode and if it happens to be in blocking mode then most of this
533 // won't end up actually being necessary!
535 // We use `poll` here to block this thread waiting for read
536 // readiness, and then afterwards we perform the `read` itself. If
537 // the `read` returns that it would block then we start over and try
540 // Also note that we explicitly don't handle EINTR here. That's used
541 // to shut us down, so we otherwise punt all errors upwards.
543 let mut fd
: libc
::pollfd
= mem
::zeroed();
544 fd
.fd
= self.read
.as_raw_fd();
545 fd
.events
= libc
::POLLIN
;
548 if libc
::poll(&mut fd
, 1, -1) == -1 {
549 return Err(io
::Error
::last_os_error());
555 match (&self.read
).read(&mut buf
) {
556 Ok(1) => return Ok(Acquired { byte: buf[0] }
),
558 return Err(io
::Error
::new(
559 io
::ErrorKind
::Other
,
560 "early EOF on jobserver pipe",
563 Err(ref e
) if e
.kind() == io
::ErrorKind
::WouldBlock
=> {}
564 Err(e
) => return Err(e
),
570 pub fn release(&self, data
: Option
<&Acquired
>) -> io
::Result
<()> {
571 // Note that the fd may be nonblocking but we're going to go ahead
572 // and assume that the writes here are always nonblocking (we can
573 // always quickly release a token). If that turns out to not be the
574 // case we'll get an error anyway!
575 let byte
= data
.map(|d
| d
.byte
).unwrap_or(b'
+'
);
576 match (&self.write
).write(&[byte
])?
{
578 _
=> Err(io
::Error
::new(
579 io
::ErrorKind
::Other
,
580 "failed to write token back to jobserver",
585 pub fn string_arg(&self) -> String
{
586 format
!("{},{} -j", self.read
.as_raw_fd(), self.write
.as_raw_fd())
589 pub fn configure(&self, cmd
: &mut Command
) {
590 // Here we basically just want to say that in the child process
591 // we'll configure the read/write file descriptors to *not* be
592 // cloexec, so they're inherited across the exec and specified as
593 // integers through `string_arg` above.
594 let read
= self.read
.as_raw_fd();
595 let write
= self.write
.as_raw_fd();
596 cmd
.before_exec(move || {
597 set_cloexec(read
, false)?
;
598 set_cloexec(write
, false)?
;
606 thread
: JoinHandle
<()>,
607 quitting
: Arc
<AtomicBool
>,
608 rx_done
: Receiver
<()>,
614 mut f
: Box
<FnMut(io
::Result
<::Acquired
>) + Send
>,
615 ) -> io
::Result
<Helper
> {
616 static USR1_INIT
: Once
= ONCE_INIT
;
618 USR1_INIT
.call_once(|| unsafe {
619 let mut new
: libc
::sigaction
= mem
::zeroed();
620 new
.sa_sigaction
= sigusr1_handler
as usize;
621 new
.sa_flags
= libc
::SA_SIGINFO
as _
;
622 if libc
::sigaction(libc
::SIGUSR1
, &new
, ptr
::null_mut()) != 0 {
623 err
= Some(io
::Error
::last_os_error());
627 if let Some(e
) = err
.take() {
631 let quitting
= Arc
::new(AtomicBool
::new(false));
632 let quitting2
= quitting
.clone();
633 let (tx_done
, rx_done
) = mpsc
::channel();
634 let thread
= Builder
::new().spawn(move || {
635 'outer
: for () in rx
{
637 let res
= client
.acquire();
638 if let Err(ref e
) = res
{
639 if e
.kind() == io
::ErrorKind
::Interrupted
{
640 if quitting2
.load(Ordering
::SeqCst
) {
651 tx_done
.send(()).unwrap();
663 self.quitting
.store(true, Ordering
::SeqCst
);
664 let dur
= Duration
::from_millis(10);
665 let mut done
= false;
668 // Ignore the return value here of `pthread_kill`,
669 // apparently on OSX if you kill a dead thread it will
670 // return an error, but on other platforms it may not. In
671 // that sense we don't actually know if this will succeed or
673 libc
::pthread_kill(self.thread
.as_pthread_t() as _
, libc
::SIGUSR1
);
674 match self.rx_done
.recv_timeout(dur
) {
675 Ok(()) | Err(RecvTimeoutError
::Disconnected
) => {
679 Err(RecvTimeoutError
::Timeout
) => {}
685 drop(self.thread
.join());
690 fn is_valid_fd(fd
: c_int
) -> bool
{
692 return libc
::fcntl(fd
, libc
::F_GETFD
) != -1;
696 fn set_cloexec(fd
: c_int
, set
: bool
) -> io
::Result
<()> {
698 let previous
= cvt(libc
::fcntl(fd
, libc
::F_GETFD
))?
;
700 previous
| libc
::FD_CLOEXEC
702 previous
& !libc
::FD_CLOEXEC
705 cvt(libc
::fcntl(fd
, libc
::F_SETFD
, new
))?
;
711 fn cvt(t
: c_int
) -> io
::Result
<c_int
> {
713 Err(io
::Error
::last_os_error())
719 extern "C" fn sigusr1_handler(
721 _info
: *mut libc
::siginfo_t
,
722 _ptr
: *mut libc
::c_void
,
732 use std
::ffi
::CString
;
734 use std
::process
::Command
;
736 use std
::sync
::mpsc
::Receiver
;
738 use std
::thread
::{Builder, JoinHandle}
;
751 type HANDLE
= *mut u8;
754 const ERROR_ALREADY_EXISTS
: DWORD
= 183;
755 const FALSE
: BOOL
= 0;
756 const INFINITE
: DWORD
= 0xffffffff;
757 const SEMAPHORE_MODIFY_STATE
: DWORD
= 0x2;
758 const SYNCHRONIZE
: DWORD
= 0x00100000;
759 const TRUE
: BOOL
= 1;
760 const WAIT_OBJECT_0
: DWORD
= 0;
763 fn CloseHandle(handle
: HANDLE
) -> BOOL
;
764 fn SetEvent(hEvent
: HANDLE
) -> BOOL
;
765 fn WaitForMultipleObjects(
767 lpHandles
: *const HANDLE
,
769 dwMilliseconds
: DWORD
,
772 lpEventAttributes
: *mut u8,
780 lpPreviousCount
: *mut LONG
,
783 lpEventAttributes
: *mut u8,
789 dwDesiredAccess
: DWORD
,
790 bInheritHandle
: BOOL
,
793 fn WaitForSingleObject(hHandle
: HANDLE
, dwMilliseconds
: DWORD
) -> DWORD
;
797 pub fn new(limit
: usize) -> io
::Result
<Client
> {
798 // Try a bunch of random semaphore names until we get a unique one,
799 // but don't try for too long.
801 // Note that `limit == 0` is a valid argument above but Windows
802 // won't let us create a semaphore with 0 slots available to it. Get
803 // `limit == 0` working by creating a semaphore instead with one
804 // slot and then immediately acquire it (without ever releaseing it
807 let mut name
= format
!("__rust_jobserver_semaphore_{}\0", rand
::random
::<u32>());
809 let create_limit
= if limit
== 0 { 1 }
else { limit }
;
810 let r
= CreateSemaphoreA(
812 create_limit
as LONG
,
813 create_limit
as LONG
,
814 name
.as_ptr() as *const _
,
817 return Err(io
::Error
::last_os_error());
819 let handle
= Handle(r
);
821 let err
= io
::Error
::last_os_error();
822 if err
.raw_os_error() == Some(ERROR_ALREADY_EXISTS
as i32) {
825 name
.pop(); // chop off the trailing nul
826 let client
= Client
{
830 if create_limit
!= limit
{
833 info
!("created jobserver {:?}", client
);
839 io
::ErrorKind
::Other
,
840 "failed to find a unique name for a semaphore",
844 pub unsafe fn open(s
: &str) -> Option
<Client
> {
845 let name
= match CString
::new(s
) {
847 Err(_
) => return None
,
850 let sem
= OpenSemaphoreA(SYNCHRONIZE
| SEMAPHORE_MODIFY_STATE
, FALSE
, name
.as_ptr());
852 info
!("failed to open environment semaphore {}", s
);
855 info
!("opened environment semaphore {}", s
);
863 pub fn acquire(&self) -> io
::Result
<Acquired
> {
865 let r
= WaitForSingleObject(self.sem
.0, INFINITE
);
866 if r
== WAIT_OBJECT_0
{
869 Err(io
::Error
::last_os_error())
874 pub fn release(&self, _data
: Option
<&Acquired
>) -> io
::Result
<()> {
876 let r
= ReleaseSemaphore(self.sem
.0, 1, ptr
::null_mut());
880 Err(io
::Error
::last_os_error())
885 pub fn string_arg(&self) -> String
{
889 pub fn configure(&self, _cmd
: &mut Command
) {
890 // nothing to do here, we gave the name of our semaphore to the
896 struct Handle(HANDLE
);
897 // HANDLE is a raw ptr, but we're send/sync
898 unsafe impl Sync
for Handle {}
899 unsafe impl Send
for Handle {}
901 impl Drop
for Handle
{
912 thread
: JoinHandle
<()>,
918 mut f
: Box
<FnMut(io
::Result
<::Acquired
>) + Send
>,
919 ) -> io
::Result
<Helper
> {
921 let r
= CreateEventA(ptr
::null_mut(), TRUE
, FALSE
, ptr
::null());
923 return Err(io
::Error
::last_os_error());
928 let event
= Arc
::new(event
);
929 let event2
= event
.clone();
930 let thread
= Builder
::new().spawn(move || {
931 let objects
= [event2
.0
, client
.inner
.sem
.0];
933 let r
= unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) }
;
934 if r
== WAIT_OBJECT_0
{
937 if r
== WAIT_OBJECT_0
+ 1 {
939 client
: client
.inner
.clone(),
943 f(Err(io
::Error
::last_os_error()))
955 let r
= unsafe { SetEvent(self.event.0) }
;
957 panic
!("failed to set event: {}", io
::Error
::last_os_error());
959 drop(self.thread
.join());
964 #[cfg(not(any(unix, windows)))]
967 use std
::process
::Command
;
968 use std
::sync
::mpsc
::{self, Receiver, SyncSender}
;
969 use std
::sync
::Mutex
;
970 use std
::thread
::{Builder, JoinHandle}
;
975 rx
: Mutex
<Receiver
<()>>,
979 pub struct Acquired(());
982 pub fn new(limit
: usize) -> io
::Result
<Client
> {
983 let (tx
, rx
) = mpsc
::sync_channel(limit
);
985 tx
.send(()).unwrap();
993 pub unsafe fn open(_s
: &str) -> Option
<Client
> {
997 pub fn acquire(&self) -> io
::Result
<Acquired
> {
998 self.rx
.lock().unwrap().recv().unwrap();
1002 pub fn release(&self, _data
: Option
<&Acquired
>) -> io
::Result
<()> {
1003 self.tx
.send(()).unwrap();
1007 pub fn string_arg(&self) -> String
{
1009 "On this platform there is no cross process jobserver support,
1010 so Client::configure is not supported."
1014 pub fn configure(&self, _cmd
: &mut Command
) {
1021 thread
: JoinHandle
<()>,
1024 pub fn spawn_helper(
1027 mut f
: Box
<FnMut(io
::Result
<::Acquired
>) + Send
>,
1028 ) -> io
::Result
<Helper
> {
1029 let thread
= Builder
::new().spawn(move || {
1031 let res
= client
.acquire();
1036 Ok(Helper { thread: thread }
)
1041 drop(self.thread
.join());
1047 fn no_helper_deadlock() {
1048 let x
= crate::Client
::new(32).unwrap();
1050 std
::mem
::drop(x
.into_helper_thread(|_
| {}
).unwrap());