]>
Commit | Line | Data |
---|---|---|
041b39d2 XL |
1 | //! An implementation of the GNU make jobserver. |
2 | //! | |
3 | //! This crate is an implementation, in Rust, of the GNU `make` jobserver for | |
4 | //! CLI tools that are interoperating with make or otherwise require some form | |
5 | //! of parallelism limiting across process boundaries. This was originally | |
6 | //! written for usage in Cargo to both (a) work when `cargo` is invoked from | |
7 | //! `make` (using `make`'s jobserver) and (b) work when `cargo` invokes build | |
8 | //! scripts, exporting a jobserver implementation for `make` processes to | |
9 | //! transitively use. | |
10 | //! | |
11 | //! The jobserver implementation can be found in [detail online][docs] but | |
12 | //! basically boils down to a cross-process semaphore. On Unix this is | |
13 | //! implemented with the `pipe` syscall and read/write ends of a pipe and on | |
14 | //! Windows this is implemented literally with IPC semaphores. | |
15 | //! | |
16 | //! The jobserver protocol in `make` also dictates when tokens are acquire to | |
17 | //! run child work, and clients using this crate should take care to implement | |
18 | //! such details to ensure correct interoperation with `make` itself. | |
19 | //! | |
20 | //! ## Examples | |
21 | //! | |
22 | //! Connect to a jobserver that was set up by `make` or a different process: | |
23 | //! | |
24 | //! ```no_run | |
25 | //! use jobserver::Client; | |
26 | //! | |
27 | //! // See API documentation for why this is `unsafe` | |
28 | //! let client = match unsafe { Client::from_env() } { | |
29 | //! Some(client) => client, | |
30 | //! None => panic!("client not configured"), | |
31 | //! }; | |
32 | //! ``` | |
33 | //! | |
34 | //! Acquire and release token from a jobserver: | |
35 | //! | |
36 | //! ```no_run | |
37 | //! use jobserver::Client; | |
38 | //! | |
39 | //! let client = unsafe { Client::from_env().unwrap() }; | |
40 | //! let token = client.acquire().unwrap(); // blocks until it is available | |
41 | //! drop(token); // releases the token when the work is done | |
42 | //! ``` | |
43 | //! | |
44 | //! Create a new jobserver and configure a child process to have access: | |
45 | //! | |
46 | //! ``` | |
47 | //! use std::process::Command; | |
48 | //! use jobserver::Client; | |
49 | //! | |
50 | //! let client = Client::new(4).expect("failed to create jobserver"); | |
51 | //! let mut cmd = Command::new("make"); | |
52 | //! client.configure(&mut cmd); | |
53 | //! ``` | |
54 | //! | |
55 | //! ## Caveats | |
56 | //! | |
57 | //! This crate makes no attempt to release tokens back to a jobserver on | |
58 | //! abnormal exit of a process. If a process which acquires a token is killed | |
59 | //! with ctrl-c or some similar signal then tokens will not be released and the | |
60 | //! jobserver may be in a corrupt state. | |
61 | //! | |
62 | //! Note that this is typically ok as ctrl-c means that an entire build process | |
63 | //! is being torn down, but it's worth being aware of at least! | |
64 | //! | |
65 | //! ## Windows caveats | |
66 | //! | |
67 | //! There appear to be two implementations of `make` on Windows. On MSYS2 one | |
68 | //! typically comes as `mingw32-make` and the other as `make` itself. I'm not | |
69 | //! personally too familiar with what's going on here, but for jobserver-related | |
70 | //! information the `mingw32-make` implementation uses Windows semaphores | |
71 | //! whereas the `make` program does not. The `make` program appears to use file | |
72 | //! descriptors and I'm not really sure how it works, so this crate is not | |
73 | //! compatible with `make` on Windows. It is, however, compatible with | |
74 | //! `mingw32-make`. | |
75 | //! | |
76 | //! [docs]: http://make.mad-scientist.net/papers/jobserver-implementation/ | |
77 | ||
78 | #![deny(missing_docs, missing_debug_implementations)] | |
79 | #![doc(html_root_url = "https://docs.rs/jobserver/0.1")] | |
80 | ||
81 | use std::env; | |
82 | use std::io; | |
83 | use std::process::Command; | |
84 | use std::sync::Arc; | |
85 | use std::sync::mpsc::{self, Sender}; | |
86 | ||
87 | /// A client of a jobserver | |
88 | /// | |
89 | /// This structure is the main type exposed by this library, and is where | |
90 | /// interaction to a jobserver is configured through. Clients are either created | |
91 | /// from scratch in which case the internal semphore is initialied on the spot, | |
92 | /// or a client is created from the environment to connect to a jobserver | |
93 | /// already created. | |
94 | /// | |
95 | /// Some usage examples can be found in the crate documentation for using a | |
96 | /// client. | |
97 | /// | |
98 | /// Note that a `Client` implements the `Clone` trait, and all instances of a | |
99 | /// `Client` refer to the same jobserver instance. | |
100 | #[derive(Clone, Debug)] | |
101 | pub struct Client { | |
102 | inner: Arc<imp::Client>, | |
103 | } | |
104 | ||
105 | /// An acquired token from a jobserver. | |
106 | /// | |
107 | /// This token will be released back to the jobserver when it is dropped and | |
108 | /// otherwise represents the ability to spawn off another thread of work. | |
109 | #[derive(Debug)] | |
110 | pub struct Acquired { | |
111 | client: Arc<imp::Client>, | |
112 | data: imp::Acquired, | |
113 | } | |
114 | ||
115 | impl Client { | |
116 | /// Creates a new jobserver initialized with the given parallelism limit. | |
117 | /// | |
118 | /// A client to the jobserver created will be returned. This client will | |
119 | /// allow at most `limit` tokens to be acquired from it in parallel. More | |
120 | /// calls to `acquire` will cause the calling thread to block. | |
121 | /// | |
122 | /// Note that the created `Client` is not automatically inherited into | |
123 | /// spawned child processes from this program. Manual usage of the | |
124 | /// `configure` function is required for a child process to have access to a | |
125 | /// job server. | |
126 | /// | |
127 | /// # Examples | |
128 | /// | |
129 | /// ``` | |
130 | /// use jobserver::Client; | |
131 | /// | |
132 | /// let client = Client::new(4).expect("failed to create jobserver"); | |
133 | /// ``` | |
134 | /// | |
135 | /// # Errors | |
136 | /// | |
137 | /// Returns an error if any I/O error happens when attempting to create the | |
138 | /// jobserver client. | |
139 | pub fn new(limit: usize) -> io::Result<Client> { | |
140 | Ok(Client { | |
141 | inner: Arc::new(imp::Client::new(limit)?), | |
142 | }) | |
143 | } | |
144 | ||
145 | /// Attempts to connect to the jobserver specified in this process's | |
146 | /// environment. | |
147 | /// | |
148 | /// When the a `make` executable calls a child process it will configure the | |
149 | /// environment of the child to ensure that it has handles to the jobserver | |
150 | /// it's passing down. This function will attempt to look for these details | |
151 | /// and connect to the jobserver. | |
152 | /// | |
153 | /// Note that the created `Client` is not automatically inherited into | |
154 | /// spawned child processes from this program. Manual usage of the | |
155 | /// `configure` function is required for a child process to have access to a | |
156 | /// job server. | |
157 | /// | |
158 | /// # Return value | |
159 | /// | |
160 | /// If a jobserver was found in the environment and it looks correct then | |
161 | /// `Some` of the connected client will be returned. If no jobserver was | |
162 | /// found then `None` will be returned. | |
163 | /// | |
164 | /// Note that on Unix the `Client` returned **takes ownership of the file | |
165 | /// descriptors specified in the environment**. Jobservers on Unix are | |
166 | /// implemented with `pipe` file descriptors, and they're inherited from | |
167 | /// parent processes. This `Client` returned takes ownership of the file | |
168 | /// descriptors for this process and will close the file descriptors after | |
169 | /// this value is dropped. | |
170 | /// | |
171 | /// Additionally on Unix this function will configure the file descriptors | |
172 | /// with `CLOEXEC` so they're not automatically inherited by spawned | |
173 | /// children. | |
174 | /// | |
175 | /// # Unsafety | |
176 | /// | |
177 | /// This function is `unsafe` to call on Unix specifically as it | |
178 | /// transitively requires usage of the `from_raw_fd` function, which is | |
179 | /// itself unsafe in some circumstances. | |
180 | /// | |
181 | /// It's recommended to call this function very early in the lifetime of a | |
182 | /// program before any other file descriptors are opened. That way you can | |
183 | /// make sure to take ownership properly of the file descriptors passed | |
184 | /// down, if any. | |
185 | /// | |
186 | /// It's generally unsafe to call this function twice in a program if the | |
187 | /// previous invocation returned `Some`. | |
188 | /// | |
189 | /// Note, though, that on Windows it should be safe to call this function | |
190 | /// any number of times. | |
191 | pub unsafe fn from_env() -> Option<Client> { | |
192 | let var = match env::var("CARGO_MAKEFLAGS") | |
193 | .or(env::var("MAKEFLAGS")) | |
194 | .or(env::var("MFLAGS")) { | |
195 | Ok(s) => s, | |
196 | Err(_) => return None, | |
197 | }; | |
198 | let mut arg = "--jobserver-fds="; | |
199 | let pos = match var.find(arg) { | |
200 | Some(i) => i, | |
201 | None => { | |
202 | arg = "--jobserver-auth="; | |
203 | match var.find(arg) { | |
204 | Some(i) => i, | |
205 | None => return None, | |
206 | } | |
207 | } | |
208 | }; | |
209 | ||
210 | let s = var[pos + arg.len()..].split(' ').next().unwrap(); | |
211 | imp::Client::open(s).map(|c| { | |
212 | Client { inner: Arc::new(c) } | |
213 | }) | |
214 | } | |
215 | ||
216 | /// Acquires a token from this jobserver client. | |
217 | /// | |
218 | /// This function will block the calling thread until a new token can be | |
219 | /// acquired from the jobserver. | |
220 | /// | |
221 | /// # Return value | |
222 | /// | |
223 | /// On successful acquisition of a token an instance of `Acquired` is | |
224 | /// returned. This structure, when dropped, will release the token back to | |
225 | /// the jobserver. It's recommended to avoid leaking this value. | |
226 | /// | |
227 | /// # Errors | |
228 | /// | |
229 | /// If an I/O error happens while acquiring a token then this function will | |
230 | /// return immediately with the error. If an error is returned then a token | |
231 | /// was not acquired. | |
232 | pub fn acquire(&self) -> io::Result<Acquired> { | |
233 | let data = try!(self.inner.acquire()); | |
234 | Ok(Acquired { | |
235 | client: self.inner.clone(), | |
236 | data: data, | |
237 | }) | |
238 | } | |
239 | ||
240 | /// Configures a child process to have access to this client's jobserver as | |
241 | /// well. | |
242 | /// | |
243 | /// This function is required to be called to ensure that a jobserver is | |
244 | /// properly inherited to a child process. If this function is *not* called | |
245 | /// then this `Client` will not be accessible in the child process. In other | |
246 | /// words, if not called, then `Client::from_env` will return `None` in the | |
247 | /// child process (or the equivalent of `Child::from_env` that `make` uses). | |
248 | /// | |
249 | /// ## Platform-specific behavior | |
250 | /// | |
251 | /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS` environment | |
252 | /// variables for the child process, and on Unix this will also allow the | |
253 | /// two file descriptors for this client to be inherited to the child. | |
254 | pub fn configure(&self, cmd: &mut Command) { | |
255 | let arg = self.inner.string_arg(); | |
256 | // Older implementations of make use `--jobserver-fds` and newer | |
257 | // implementations use `--jobserver-auth`, pass both to try to catch | |
258 | // both implementations. | |
259 | let value = format!("--jobserver-fds={0} --jobserver-auth={0}", arg); | |
260 | cmd.env("CARGO_MAKEFLAGS", &value); | |
261 | self.inner.configure(cmd); | |
262 | } | |
263 | ||
264 | /// Converts this `Client` into a helper thread to deal with a blocking | |
265 | /// `acquire` function a little more easily. | |
266 | /// | |
267 | /// The fact that the `acquire` function on `Client` blocks isn't always | |
268 | /// the easiest to work with. Typically you're using a jobserver to | |
269 | /// manage running other events in parallel! This means that you need to | |
270 | /// either (a) wait for an existing job to finish or (b) wait for a | |
271 | /// new token to become available. | |
272 | /// | |
273 | /// Unfortunately the blocking in `acquire` happens at the implementation | |
274 | /// layer of jobservers. On Unix this requires a blocking call to `read` | |
275 | /// and on Windows this requires one of the `WaitFor*` functions. Both | |
276 | /// of these situations aren't the easiest to deal with: | |
277 | /// | |
278 | /// * On Unix there's basically only one way to wake up a `read` early, and | |
279 | /// that's through a signal. This is what the `make` implementation | |
280 | /// itself uses, relying on `SIGCHLD` to wake up a blocking acquisition | |
281 | /// of a new job token. Unfortunately nonblocking I/O is not an option | |
282 | /// here, so it means that "waiting for one of two events" means that | |
283 | /// the latter event must generate a signal! This is not always the case | |
284 | /// on unix for all jobservers. | |
285 | /// | |
286 | /// * On Windows you'd have to basically use the `WaitForMultipleObjects` | |
287 | /// which means that you've got to canonicalize all your event sources | |
288 | /// into a `HANDLE` which also isn't the easiest thing to do | |
289 | /// unfortunately. | |
290 | /// | |
291 | /// This function essentially attempts to ease these limitations by | |
292 | /// converting this `Client` into a helper thread spawned into this | |
293 | /// process. The application can then request that the helper thread | |
294 | /// acquires tokens and the provided closure will be invoked for each token | |
295 | /// acquired. | |
296 | /// | |
297 | /// The intention is that this function can be used to translate the event | |
298 | /// of a token acquisition into an arbitrary user-defined event. | |
299 | /// | |
300 | /// # Arguments | |
301 | /// | |
302 | /// This function will consume the `Client` provided to be transferred to | |
303 | /// the helper thread that is spawned. Additionally a closure `f` is | |
304 | /// provided to be invoked whenever a token is acquired. | |
305 | /// | |
306 | /// This closure is only invoked after calls to | |
307 | /// `HelperThread::request_token` have been made and a token itself has | |
308 | /// been acquired. If an error happens while acquiring the token then | |
309 | /// an error will be yielded to the closure as well. | |
310 | /// | |
311 | /// # Return Value | |
312 | /// | |
313 | /// This function will return an instance of the `HelperThread` structure | |
314 | /// which is used to manage the helper thread associated with this client. | |
315 | /// Through the `HelperThread` you'll request that tokens are acquired. | |
316 | /// When acquired, the closure provided here is invoked. | |
317 | /// | |
318 | /// When the `HelperThread` structure is returned it will be gracefully | |
319 | /// torn down, and the calling thread will be blocked until the thread is | |
320 | /// torn down (which should be prompt). | |
321 | /// | |
322 | /// # Errors | |
323 | /// | |
324 | /// This function may fail due to creation of the helper thread or | |
325 | /// auxiliary I/O objects to manage the helper thread. In any of these | |
326 | /// situations the error is propagated upwards. | |
327 | /// | |
328 | /// # Platform-specific behavior | |
329 | /// | |
330 | /// On Windows this function behaves pretty normally as expected, but on | |
331 | /// Unix the implementation is... a little heinous. As mentioned above | |
332 | /// we're forced into blocking I/O for token acquisition, namely a blocking | |
333 | /// call to `read`. We must be able to unblock this, however, to tear down | |
334 | /// the helper thread gracefully! | |
335 | /// | |
336 | /// Essentially what happens is that we'll send a signal to the helper | |
337 | /// thread spawned and rely on `EINTR` being returned to wake up the helper | |
338 | /// thread. This involves installing a global `SIGUSR1` handler that does | |
339 | /// nothing along with sending signals to that thread. This may cause | |
340 | /// odd behavior in some applications, so it's recommended to review and | |
341 | /// test thoroughly before using this. | |
342 | pub fn into_helper_thread<F>(self, f: F) -> io::Result<HelperThread> | |
343 | where F: FnMut(io::Result<Acquired>) + Send + 'static, | |
344 | { | |
345 | let (tx, rx) = mpsc::channel(); | |
346 | Ok(HelperThread { | |
347 | inner: Some(imp::spawn_helper(self, rx, Box::new(f))?), | |
348 | tx: Some(tx), | |
349 | }) | |
350 | } | |
351 | } | |
352 | ||
353 | impl Drop for Acquired { | |
354 | fn drop(&mut self) { | |
355 | drop(self.client.release(&self.data)); | |
356 | } | |
357 | } | |
358 | ||
359 | /// Structure returned from `Client::into_helper_thread` to manage the lifetime | |
360 | /// of the helper thread returned, see those associated docs for more info. | |
361 | #[derive(Debug)] | |
362 | pub struct HelperThread { | |
363 | inner: Option<imp::Helper>, | |
364 | tx: Option<Sender<()>>, | |
365 | } | |
366 | ||
367 | impl HelperThread { | |
368 | /// Request that the helper thread acquires a token, eventually calling the | |
369 | /// original closure with a token when it's available. | |
370 | /// | |
371 | /// For more information, see the docs on that function. | |
372 | pub fn request_token(&self) { | |
373 | self.tx.as_ref().unwrap().send(()).unwrap(); | |
374 | } | |
375 | } | |
376 | ||
377 | impl Drop for HelperThread { | |
378 | fn drop(&mut self) { | |
379 | drop(self.tx.take()); | |
380 | self.inner.take().unwrap().join(); | |
381 | } | |
382 | } | |
383 | ||
384 | #[cfg(unix)] | |
385 | mod imp { | |
386 | extern crate libc; | |
387 | ||
388 | use std::fs::File; | |
389 | use std::io::{self, Read, Write}; | |
390 | use std::mem; | |
391 | use std::os::unix::prelude::*; | |
392 | use std::process::Command; | |
393 | use std::ptr; | |
394 | use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; | |
395 | use std::sync::mpsc::{self, Receiver, RecvTimeoutError}; | |
396 | use std::sync::{Arc, Once, ONCE_INIT}; | |
397 | use std::thread::{JoinHandle, Builder}; | |
398 | use std::time::Duration; | |
399 | ||
400 | use self::libc::c_int; | |
401 | ||
402 | #[derive(Debug)] | |
403 | pub struct Client { | |
404 | read: File, | |
405 | write: File, | |
406 | } | |
407 | ||
408 | #[derive(Debug)] | |
409 | pub struct Acquired { | |
410 | byte: u8, | |
411 | } | |
412 | ||
413 | impl Client { | |
414 | pub fn new(limit: usize) -> io::Result<Client> { | |
415 | let client = unsafe { Client::mk()? }; | |
416 | // I don't think the character written here matters, but I could be | |
417 | // wrong! | |
418 | for _ in 0..limit { | |
419 | (&client.write).write(&[b'|'])?; | |
420 | } | |
421 | Ok(client) | |
422 | } | |
423 | ||
424 | unsafe fn mk() -> io::Result<Client> { | |
425 | let mut pipes = [0; 2]; | |
426 | ||
427 | // Attempt atomically-create-with-cloexec if we can | |
428 | if cfg!(target_os = "linux") { | |
429 | if let Some(pipe2) = pipe2() { | |
430 | cvt(pipe2(pipes.as_mut_ptr(), libc::O_CLOEXEC))?; | |
431 | return Ok(Client::from_fds(pipes[0], pipes[1])) | |
432 | } | |
433 | } | |
434 | ||
435 | cvt(libc::pipe(pipes.as_mut_ptr()))?; | |
436 | drop(set_cloexec(pipes[0], true)); | |
437 | drop(set_cloexec(pipes[1], true)); | |
438 | Ok(Client::from_fds(pipes[0], pipes[1])) | |
439 | } | |
440 | ||
441 | pub unsafe fn open(s: &str) -> Option<Client> { | |
442 | let mut parts = s.splitn(2, ','); | |
443 | let read = parts.next().unwrap(); | |
444 | let write = match parts.next() { | |
445 | Some(s) => s, | |
446 | None => return None, | |
447 | }; | |
448 | ||
449 | let read = match read.parse() { | |
450 | Ok(n) => n, | |
451 | Err(_) => return None, | |
452 | }; | |
453 | let write = match write.parse() { | |
454 | Ok(n) => n, | |
455 | Err(_) => return None, | |
456 | }; | |
457 | ||
458 | // Ok so we've got two integers that look like file descriptors, but | |
459 | // for extra sanity checking let's see if they actually look like | |
460 | // instances of a pipe before we return the client. | |
461 | // | |
462 | // If we're called from `make` *without* the leading + on our rule | |
463 | // then we'll have `MAKEFLAGS` env vars but won't actually have | |
464 | // access to the file descriptors. | |
465 | if is_pipe(read) && is_pipe(write) { | |
466 | drop(set_cloexec(read, true)); | |
467 | drop(set_cloexec(write, true)); | |
468 | Some(Client::from_fds(read, write)) | |
469 | } else { | |
470 | None | |
471 | } | |
472 | } | |
473 | ||
474 | unsafe fn from_fds(read: c_int, write: c_int) -> Client { | |
475 | Client { | |
476 | read: File::from_raw_fd(read), | |
477 | write: File::from_raw_fd(write), | |
478 | } | |
479 | } | |
480 | ||
481 | pub fn acquire(&self) -> io::Result<Acquired> { | |
abe05a73 XL |
482 | // We don't actually know if the file descriptor here is set in |
483 | // blocking or nonblocking mode. AFAIK all released versions of | |
484 | // `make` use blocking fds for the jobserver, but the unreleased | |
485 | // version of `make` doesn't. In the unreleased version jobserver | |
486 | // fds are set to nonblocking and combined with `pselect` | |
487 | // internally. | |
488 | // | |
489 | // Here we try to be compatible with both strategies. We | |
490 | // unconditionally expect the file descriptor to be in nonblocking | |
491 | // mode and if it happens to be in blocking mode then most of this | |
492 | // won't end up actually being necessary! | |
493 | // | |
494 | // We use `poll` here to block this thread waiting for read | |
495 | // readiness, and then afterwards we perform the `read` itself. If | |
496 | // the `read` returns that it would block then we start over and try | |
497 | // again. | |
498 | // | |
499 | // Also note that we explicitly don't handle EINTR here. That's used | |
500 | // to shut us down, so we otherwise punt all errors upwards. | |
501 | unsafe { | |
502 | let mut fd: libc::pollfd = mem::zeroed(); | |
503 | fd.fd = self.read.as_raw_fd(); | |
504 | fd.events = libc::POLLIN; | |
505 | loop { | |
506 | fd.revents = 0; | |
507 | if libc::poll(&mut fd, 1, -1) == -1 { | |
508 | return Err(io::Error::last_os_error()) | |
509 | } | |
510 | if fd.revents == 0 { | |
511 | continue | |
512 | } | |
513 | let mut buf = [0]; | |
514 | match (&self.read).read(&mut buf) { | |
515 | Ok(1) => return Ok(Acquired { byte: buf[0] }), | |
516 | Ok(_) => { | |
517 | return Err(io::Error::new(io::ErrorKind::Other, | |
518 | "early EOF on jobserver pipe")) | |
519 | } | |
520 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} | |
521 | Err(e) => return Err(e), | |
522 | } | |
523 | } | |
041b39d2 XL |
524 | } |
525 | } | |
526 | ||
527 | pub fn release(&self, data: &Acquired) -> io::Result<()> { | |
abe05a73 XL |
528 | // Note that the fd may be nonblocking but we're going to go ahead |
529 | // and assume that the writes here are always nonblocking (we can | |
530 | // always quickly release a token). If that turns out to not be the | |
531 | // case we'll get an error anyway! | |
041b39d2 XL |
532 | match (&self.write).write(&[data.byte])? { |
533 | 1 => Ok(()), | |
534 | _ => Err(io::Error::new(io::ErrorKind::Other, | |
535 | "failed to write token back to jobserver")), | |
536 | } | |
537 | } | |
538 | ||
539 | pub fn string_arg(&self) -> String { | |
540 | format!("{},{} -j", self.read.as_raw_fd(), self.write.as_raw_fd()) | |
541 | } | |
542 | ||
543 | pub fn configure(&self, cmd: &mut Command) { | |
544 | // Here we basically just want to say that in the child process | |
545 | // we'll configure the read/write file descriptors to *not* be | |
546 | // cloexec, so they're inherited across the exec and specified as | |
547 | // integers through `string_arg` above. | |
548 | let read = self.read.as_raw_fd(); | |
549 | let write = self.write.as_raw_fd(); | |
550 | cmd.before_exec(move || { | |
551 | set_cloexec(read, false)?; | |
552 | set_cloexec(write, false)?; | |
553 | Ok(()) | |
554 | }); | |
555 | } | |
556 | } | |
557 | ||
558 | #[derive(Debug)] | |
559 | pub struct Helper { | |
560 | thread: JoinHandle<()>, | |
561 | quitting: Arc<AtomicBool>, | |
562 | rx_done: Receiver<()>, | |
563 | } | |
564 | ||
565 | pub fn spawn_helper(client: ::Client, | |
566 | rx: Receiver<()>, | |
567 | mut f: Box<FnMut(io::Result<::Acquired>) + Send>) | |
568 | -> io::Result<Helper> | |
569 | { | |
570 | static USR1_INIT: Once = ONCE_INIT; | |
571 | let mut err = None; | |
572 | USR1_INIT.call_once(|| unsafe { | |
573 | let mut new: libc::sigaction = mem::zeroed(); | |
574 | new.sa_sigaction = sigusr1_handler as usize; | |
575 | new.sa_flags = libc::SA_SIGINFO as _; | |
576 | if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 { | |
577 | err = Some(io::Error::last_os_error()); | |
578 | } | |
579 | }); | |
580 | ||
581 | if let Some(e) = err.take() { | |
582 | return Err(e) | |
583 | } | |
584 | ||
585 | let quitting = Arc::new(AtomicBool::new(false)); | |
586 | let quitting2 = quitting.clone(); | |
587 | let (tx_done, rx_done) = mpsc::channel(); | |
588 | let thread = Builder::new().spawn(move || { | |
589 | 'outer: | |
590 | for () in rx { | |
591 | loop { | |
592 | let res = client.acquire(); | |
593 | if let Err(ref e) = res { | |
594 | if e.kind() == io::ErrorKind::Interrupted { | |
595 | if quitting2.load(Ordering::SeqCst) { | |
596 | break 'outer | |
597 | } else { | |
598 | continue | |
599 | } | |
600 | } | |
601 | } | |
602 | f(res); | |
603 | break | |
604 | } | |
605 | } | |
606 | tx_done.send(()).unwrap(); | |
607 | })?; | |
608 | ||
609 | Ok(Helper { | |
610 | thread: thread, | |
611 | quitting: quitting, | |
612 | rx_done: rx_done, | |
613 | }) | |
614 | } | |
615 | ||
616 | impl Helper { | |
617 | pub fn join(self) { | |
618 | self.quitting.store(true, Ordering::SeqCst); | |
619 | let dur = Duration::from_millis(10); | |
620 | let mut done = false; | |
621 | for _ in 0..100 { | |
622 | unsafe { | |
623 | // Ignore the return value here of `pthread_kill`, | |
624 | // apparently on OSX if you kill a dead thread it will | |
625 | // return an error, but on other platforms it may not. In | |
626 | // that sense we don't actually know if this will succeed or | |
627 | // not! | |
628 | libc::pthread_kill(self.thread.as_pthread_t(), libc::SIGUSR1); | |
629 | match self.rx_done.recv_timeout(dur) { | |
630 | Ok(()) | | |
631 | Err(RecvTimeoutError::Disconnected) => { | |
632 | done = true; | |
633 | break | |
634 | } | |
635 | Err(RecvTimeoutError::Timeout) => {} | |
636 | } | |
637 | } | |
638 | } | |
639 | if !done { | |
640 | panic!("failed to shut down worker thread"); | |
641 | } | |
642 | drop(self.thread.join()); | |
643 | } | |
644 | } | |
645 | ||
646 | #[allow(unused_assignments)] | |
647 | fn is_pipe(fd: c_int) -> bool { | |
648 | unsafe { | |
649 | let mut stat = mem::zeroed(); | |
650 | if libc::fstat(fd, &mut stat) == 0 { | |
651 | // On android arm and i686 mode_t is u16 and st_mode is u32, | |
652 | // this generates a type mismatch when S_IFIFO (declared as mode_t) | |
653 | // is used in operations with st_mode, so we use this workaround | |
654 | // to get the value of S_IFIFO with the same type of st_mode. | |
655 | let mut s_ififo = stat.st_mode; | |
656 | s_ififo = libc::S_IFIFO as _; | |
657 | stat.st_mode & s_ififo == s_ififo | |
658 | } else { | |
659 | false | |
660 | } | |
661 | } | |
662 | } | |
663 | ||
664 | fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> { | |
665 | unsafe { | |
666 | let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?; | |
667 | let new = if set { | |
668 | previous | libc::FD_CLOEXEC | |
669 | } else { | |
670 | previous & !libc::FD_CLOEXEC | |
671 | }; | |
672 | if new != previous { | |
673 | cvt(libc::fcntl(fd, libc::F_SETFD, new))?; | |
674 | } | |
675 | Ok(()) | |
676 | } | |
677 | } | |
678 | ||
679 | fn cvt(t: c_int) -> io::Result<c_int> { | |
680 | if t == -1 { | |
681 | Err(io::Error::last_os_error()) | |
682 | } else { | |
683 | Ok(t) | |
684 | } | |
685 | } | |
686 | ||
687 | unsafe fn pipe2() -> Option<&'static fn(*mut c_int, c_int) -> c_int> { | |
688 | static PIPE2: AtomicUsize = ATOMIC_USIZE_INIT; | |
689 | ||
690 | if PIPE2.load(Ordering::SeqCst) == 0 { | |
691 | let name = "pipe2\0"; | |
692 | let n = match libc::dlsym(libc::RTLD_DEFAULT, name.as_ptr() as *const _) as usize { | |
693 | 0 => 1, | |
694 | n => n, | |
695 | }; | |
696 | PIPE2.store(n, Ordering::SeqCst); | |
697 | } | |
698 | if PIPE2.load(Ordering::SeqCst) == 1 { | |
699 | None | |
700 | } else { | |
701 | mem::transmute(&PIPE2) | |
702 | } | |
703 | } | |
704 | ||
705 | extern fn sigusr1_handler(_signum: c_int, | |
706 | _info: *mut libc::siginfo_t, | |
707 | _ptr: *mut libc::c_void) { | |
708 | // nothing to do | |
709 | } | |
710 | } | |
711 | ||
712 | #[cfg(windows)] | |
713 | mod imp { | |
714 | extern crate rand; | |
715 | ||
716 | use std::ffi::CString; | |
717 | use std::io; | |
718 | use std::process::Command; | |
719 | use std::ptr; | |
720 | use std::sync::Arc; | |
721 | use std::sync::mpsc::Receiver; | |
722 | use std::thread::{Builder, JoinHandle}; | |
723 | ||
724 | #[derive(Debug)] | |
725 | pub struct Client { | |
726 | sem: Handle, | |
727 | name: String, | |
728 | } | |
729 | ||
730 | #[derive(Debug)] | |
731 | pub struct Acquired; | |
732 | ||
733 | type BOOL = i32; | |
734 | type DWORD = u32; | |
735 | type HANDLE = *mut u8; | |
736 | type LONG = i32; | |
737 | ||
738 | const ERROR_ALREADY_EXISTS: DWORD = 183; | |
739 | const FALSE: BOOL = 0; | |
740 | const INFINITE: DWORD = 0xffffffff; | |
741 | const SEMAPHORE_MODIFY_STATE: DWORD = 0x2; | |
742 | const SYNCHRONIZE: DWORD = 0x00100000; | |
743 | const TRUE: BOOL = 1; | |
744 | const WAIT_OBJECT_0: DWORD = 0; | |
745 | ||
746 | extern "system" { | |
747 | fn CloseHandle(handle: HANDLE) -> BOOL; | |
748 | fn SetEvent(hEvent: HANDLE) -> BOOL; | |
749 | fn WaitForMultipleObjects(ncount: DWORD, | |
750 | lpHandles: *const HANDLE, | |
751 | bWaitAll: BOOL, | |
752 | dwMilliseconds: DWORD) -> DWORD; | |
753 | fn CreateEventA(lpEventAttributes: *mut u8, | |
754 | bManualReset: BOOL, | |
755 | bInitialState: BOOL, | |
756 | lpName: *const i8) -> HANDLE; | |
757 | fn ReleaseSemaphore(hSemaphore: HANDLE, | |
758 | lReleaseCount: LONG, | |
759 | lpPreviousCount: *mut LONG) -> BOOL; | |
760 | fn CreateSemaphoreA(lpEventAttributes: *mut u8, | |
761 | lInitialCount: LONG, | |
762 | lMaximumCount: LONG, | |
763 | lpName: *const i8) -> HANDLE; | |
764 | fn OpenSemaphoreA(dwDesiredAccess: DWORD, | |
765 | bInheritHandle: BOOL, | |
766 | lpName: *const i8) -> HANDLE; | |
767 | fn WaitForSingleObject(hHandle: HANDLE, | |
768 | dwMilliseconds: DWORD) -> DWORD; | |
769 | } | |
770 | ||
771 | impl Client { | |
772 | pub fn new(limit: usize) -> io::Result<Client> { | |
773 | // Try a bunch of random semaphore names until we get a unique one, | |
774 | // but don't try for too long. | |
775 | // | |
776 | // Note that `limit == 0` is a valid argument above but Windows | |
777 | // won't let us create a semaphore with 0 slots available to it. Get | |
778 | // `limit == 0` working by creating a semaphore instead with one | |
779 | // slot and then immediately acquire it (without ever releaseing it | |
780 | // back). | |
781 | for _ in 0..100 { | |
782 | let mut name = format!("__rust_jobserver_semaphore_{}\0", | |
783 | rand::random::<u32>()); | |
784 | unsafe { | |
785 | let create_limit = if limit == 0 {1} else {limit}; | |
786 | let r = CreateSemaphoreA(ptr::null_mut(), | |
787 | create_limit as LONG, | |
788 | create_limit as LONG, | |
789 | name.as_ptr() as *const _); | |
790 | if r.is_null() { | |
791 | return Err(io::Error::last_os_error()) | |
792 | } | |
793 | let handle = Handle(r); | |
794 | ||
795 | let err = io::Error::last_os_error(); | |
796 | if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) { | |
797 | continue | |
798 | } | |
799 | name.pop(); // chop off the trailing nul | |
800 | let client = Client { | |
801 | sem: handle, | |
802 | name: name, | |
803 | }; | |
804 | if create_limit != limit { | |
805 | client.acquire()?; | |
806 | } | |
807 | return Ok(client) | |
808 | } | |
809 | } | |
810 | ||
811 | Err(io::Error::new(io::ErrorKind::Other, | |
812 | "failed to find a unique name for a semaphore")) | |
813 | } | |
814 | ||
815 | pub unsafe fn open(s: &str) -> Option<Client> { | |
816 | let name = match CString::new(s) { | |
817 | Ok(s) => s, | |
818 | Err(_) => return None, | |
819 | }; | |
820 | ||
821 | let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, | |
822 | FALSE, | |
823 | name.as_ptr()); | |
824 | if sem.is_null() { | |
825 | None | |
826 | } else { | |
827 | Some(Client { | |
828 | sem: Handle(sem), | |
829 | name: s.to_string(), | |
830 | }) | |
831 | } | |
832 | } | |
833 | ||
834 | pub fn acquire(&self) -> io::Result<Acquired> { | |
835 | unsafe { | |
836 | let r = WaitForSingleObject(self.sem.0, INFINITE); | |
837 | if r == WAIT_OBJECT_0 { | |
838 | Ok(Acquired) | |
839 | } else { | |
840 | Err(io::Error::last_os_error()) | |
841 | } | |
842 | } | |
843 | } | |
844 | ||
845 | pub fn release(&self, _data: &Acquired) -> io::Result<()> { | |
846 | unsafe { | |
847 | let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut()); | |
848 | if r != 0 { | |
849 | Ok(()) | |
850 | } else { | |
851 | Err(io::Error::last_os_error()) | |
852 | } | |
853 | } | |
854 | } | |
855 | ||
856 | pub fn string_arg(&self) -> String { | |
857 | self.name.clone() | |
858 | } | |
859 | ||
860 | pub fn configure(&self, _cmd: &mut Command) { | |
861 | // nothing to do here, we gave the name of our semaphore to the | |
862 | // child above | |
863 | } | |
864 | } | |
865 | ||
866 | #[derive(Debug)] | |
867 | struct Handle(HANDLE); | |
868 | // HANDLE is a raw ptr, but we're send/sync | |
869 | unsafe impl Sync for Handle {} | |
870 | unsafe impl Send for Handle {} | |
871 | ||
872 | impl Drop for Handle { | |
873 | fn drop(&mut self) { | |
874 | unsafe { | |
875 | CloseHandle(self.0); | |
876 | } | |
877 | } | |
878 | } | |
879 | ||
880 | #[derive(Debug)] | |
881 | pub struct Helper { | |
882 | event: Arc<Handle>, | |
883 | thread: JoinHandle<()>, | |
884 | } | |
885 | ||
886 | pub fn spawn_helper(client: ::Client, | |
887 | rx: Receiver<()>, | |
888 | mut f: Box<FnMut(io::Result<::Acquired>) + Send>) | |
889 | -> io::Result<Helper> | |
890 | { | |
891 | let event = unsafe { | |
892 | let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null()); | |
893 | if r.is_null() { | |
894 | return Err(io::Error::last_os_error()) | |
895 | } else { | |
896 | Handle(r) | |
897 | } | |
898 | }; | |
899 | let event = Arc::new(event); | |
900 | let event2 = event.clone(); | |
901 | let thread = Builder::new().spawn(move || { | |
902 | let objects = [event2.0, client.inner.sem.0]; | |
903 | for () in rx { | |
904 | let r = unsafe { | |
905 | WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) | |
906 | }; | |
907 | if r == WAIT_OBJECT_0 { | |
908 | break | |
909 | } | |
910 | if r == WAIT_OBJECT_0 + 1 { | |
911 | f(Ok(::Acquired { | |
912 | client: client.inner.clone(), | |
913 | data: Acquired, | |
914 | })) | |
915 | } else { | |
916 | f(Err(io::Error::last_os_error())) | |
917 | } | |
918 | } | |
919 | })?; | |
920 | Ok(Helper { | |
921 | thread: thread, | |
922 | event: event, | |
923 | }) | |
924 | } | |
925 | ||
926 | impl Helper { | |
927 | pub fn join(self) { | |
928 | let r = unsafe { SetEvent(self.event.0) }; | |
929 | if r == 0 { | |
930 | panic!("failed to set event: {}", io::Error::last_os_error()); | |
931 | } | |
932 | drop(self.thread.join()); | |
933 | } | |
934 | } | |
935 | } |