]> git.proxmox.com Git - cargo.git/commitdiff
Add a task-pool abstraction
authorAlex Crichton <alex@alexcrichton.com>
Sun, 29 Jun 2014 04:23:50 +0000 (21:23 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Mon, 30 Jun 2014 23:00:33 +0000 (16:00 -0700)
The standard library also provides a task pool, but it's scheduling is not quite
the desired semantics. This task pool instead has all workers contend on a
shared queue to take work from rather than assigning new jobs to specific
workers for forever.

src/cargo/util/mod.rs
src/cargo/util/pool.rs [new file with mode: 0644]

index 5cc0ccb348c8cf6e0e19d49d6b04f260eeb68160..6b26d9bc8d427512f6779bbe25165329a6627ffd 100644 (file)
@@ -6,6 +6,7 @@ pub use self::errors::{CliError, FromError, ProcessError};
 pub use self::errors::{process_error, internal_error, internal, human};
 pub use self::paths::realpath;
 pub use self::hex::to_hex;
+pub use self::pool::TaskPool;
 
 pub mod graph;
 pub mod process_builder;
@@ -16,3 +17,4 @@ pub mod toml;
 pub mod paths;
 pub mod errors;
 pub mod hex;
+mod pool;
diff --git a/src/cargo/util/pool.rs b/src/cargo/util/pool.rs
new file mode 100644 (file)
index 0000000..2b3697a
--- /dev/null
@@ -0,0 +1,61 @@
+//! A load-balancing task pool.
+//!
+//! This differs in implementation from std::sync::TaskPool in that each job is
+//! up for grabs by any of the child tasks in the pool.
+//!
+//! This should be upstreamed at some point.
+
+use std::sync::{Arc, Mutex};
+
+pub struct TaskPool {
+    state: Arc<Mutex<State>>,
+}
+
+struct State { done: bool, jobs: Vec<proc():Send> }
+
+impl TaskPool {
+    pub fn new(tasks: uint) -> TaskPool {
+        assert!(tasks > 0);
+
+        let state = Arc::new(Mutex::new(State {
+            done: false,
+            jobs: Vec::new(),
+        }));
+
+        for _ in range(0, tasks) {
+            let myjobs = state.clone();
+            spawn(proc() worker(&*myjobs));
+        }
+
+        return TaskPool { state: state };
+
+        fn worker(mystate: &Mutex<State>) {
+            let mut state = mystate.lock();
+            while !state.done {
+                match state.jobs.pop() {
+                    Some(job) => {
+                        drop(state);
+                        job();
+                        state = mystate.lock();
+                    }
+                    None => state.cond.wait(),
+                }
+            }
+        }
+    }
+
+    pub fn execute(&self, job: proc():Send) {
+        let mut state = self.state.lock();
+        state.jobs.push(job);
+        state.cond.signal();
+    }
+}
+
+impl Drop for TaskPool {
+    fn drop(&mut self) {
+        let mut state = self.state.lock();
+        state.done = true;
+        state.cond.broadcast();
+        drop(state);
+    }
+}