]>
Commit | Line | Data |
---|---|---|
0bf4aa26 XL |
1 | #![allow(non_camel_case_types)] |
2 | #![allow(dead_code)] | |
3 | #![allow(unused_mut)] | |
5bcae85e SL |
4 | // ignore-emscripten No support for threads |
5 | ||
223e47cc LB |
6 | /** |
7 | A somewhat reduced test case to expose some Valgrind issues. | |
8 | ||
9 | This originally came from the word-count benchmark. | |
10 | */ | |
11 | ||
1a4d82fc JJ |
12 | pub fn map(filename: String, mut emit: map_reduce::putter) { |
13 | emit(filename, "1".to_string()); | |
14 | } | |
223e47cc LB |
15 | |
16 | mod map_reduce { | |
1a4d82fc JJ |
17 | use std::collections::HashMap; |
18 | use std::sync::mpsc::{channel, Sender}; | |
970d7e83 | 19 | use std::str; |
c34b1796 | 20 | use std::thread; |
223e47cc | 21 | |
1a4d82fc | 22 | pub type putter<'a> = Box<FnMut(String, String) + 'a>; |
223e47cc | 23 | |
1a4d82fc | 24 | pub type mapper = extern fn(String, putter); |
223e47cc | 25 | |
c34b1796 | 26 | enum ctrl_proto { find_reducer(Vec<u8>, Sender<isize>), mapper_done, } |
223e47cc | 27 | |
1a4d82fc | 28 | fn start_mappers(ctrl: Sender<ctrl_proto>, inputs: Vec<String>) { |
85aaf69f | 29 | for i in &inputs { |
223e47cc LB |
30 | let ctrl = ctrl.clone(); |
31 | let i = i.clone(); | |
c34b1796 | 32 | thread::spawn(move|| map_task(ctrl.clone(), i.clone()) ); |
223e47cc LB |
33 | } |
34 | } | |
35 | ||
1a4d82fc JJ |
36 | fn map_task(ctrl: Sender<ctrl_proto>, input: String) { |
37 | let mut intermediates = HashMap::new(); | |
223e47cc | 38 | |
c34b1796 | 39 | fn emit(im: &mut HashMap<String, isize>, |
1a4d82fc JJ |
40 | ctrl: Sender<ctrl_proto>, key: String, |
41 | _val: String) { | |
223e47cc LB |
42 | if im.contains_key(&key) { |
43 | return; | |
44 | } | |
1a4d82fc JJ |
45 | let (tx, rx) = channel(); |
46 | println!("sending find_reducer"); | |
47 | ctrl.send(ctrl_proto::find_reducer(key.as_bytes().to_vec(), tx)).unwrap(); | |
48 | println!("receiving"); | |
49 | let c = rx.recv().unwrap(); | |
50 | println!("{}", c); | |
223e47cc LB |
51 | im.insert(key, c); |
52 | } | |
53 | ||
54 | let ctrl_clone = ctrl.clone(); | |
c34b1796 | 55 | ::map(input, Box::new(|a,b| emit(&mut intermediates, ctrl.clone(), a, b))); |
1a4d82fc | 56 | ctrl_clone.send(ctrl_proto::mapper_done).unwrap(); |
223e47cc LB |
57 | } |
58 | ||
1a4d82fc JJ |
59 | pub fn map_reduce(inputs: Vec<String>) { |
60 | let (tx, rx) = channel(); | |
223e47cc | 61 | |
bd371182 | 62 | // This thread becomes the master control thread. It spawns others |
223e47cc LB |
63 | // to do the rest. |
64 | ||
c34b1796 | 65 | let mut reducers: HashMap<String, isize>; |
223e47cc | 66 | |
970d7e83 | 67 | reducers = HashMap::new(); |
223e47cc | 68 | |
1a4d82fc | 69 | start_mappers(tx, inputs.clone()); |
223e47cc | 70 | |
c34b1796 | 71 | let mut num_mappers = inputs.len() as isize; |
223e47cc LB |
72 | |
73 | while num_mappers > 0 { | |
1a4d82fc JJ |
74 | match rx.recv().unwrap() { |
75 | ctrl_proto::mapper_done => { num_mappers -= 1; } | |
76 | ctrl_proto::find_reducer(k, cc) => { | |
223e47cc | 77 | let mut c; |
85aaf69f | 78 | match reducers.get(&str::from_utf8(&k).unwrap().to_string()) { |
223e47cc LB |
79 | Some(&_c) => { c = _c; } |
80 | None => { c = 0; } | |
81 | } | |
1a4d82fc | 82 | cc.send(c).unwrap(); |
223e47cc LB |
83 | } |
84 | } | |
85 | } | |
86 | } | |
87 | } | |
88 | ||
89 | pub fn main() { | |
1a4d82fc | 90 | map_reduce::map_reduce( |
c30ab7b3 | 91 | vec!["../src/test/run-pass/hashmap-memory.rs".to_string()]); |
223e47cc | 92 | } |