]>
Commit | Line | Data |
---|---|---|
416331ca XL |
1 | // run-pass |
2 | ||
0bf4aa26 XL |
3 | #![allow(non_camel_case_types)] |
4 | #![allow(dead_code)] | |
5 | #![allow(unused_mut)] | |
5bcae85e SL |
6 | // ignore-emscripten No support for threads |
7 | ||
223e47cc LB |
8 | /** |
9 | A somewhat reduced test case to expose some Valgrind issues. | |
10 | ||
11 | This originally came from the word-count benchmark. | |
12 | */ | |
13 | ||
1a4d82fc JJ |
14 | pub fn map(filename: String, mut emit: map_reduce::putter) { |
15 | emit(filename, "1".to_string()); | |
16 | } | |
223e47cc LB |
17 | |
18 | mod map_reduce { | |
1a4d82fc JJ |
19 | use std::collections::HashMap; |
20 | use std::sync::mpsc::{channel, Sender}; | |
970d7e83 | 21 | use std::str; |
c34b1796 | 22 | use std::thread; |
223e47cc | 23 | |
dc9dc135 | 24 | pub type putter<'a> = Box<dyn FnMut(String, String) + 'a>; |
223e47cc | 25 | |
5869c6ff | 26 | pub type mapper = extern "C" fn(String, putter); |
223e47cc | 27 | |
c34b1796 | 28 | enum ctrl_proto { find_reducer(Vec<u8>, Sender<isize>), mapper_done, } |
223e47cc | 29 | |
1a4d82fc | 30 | fn start_mappers(ctrl: Sender<ctrl_proto>, inputs: Vec<String>) { |
85aaf69f | 31 | for i in &inputs { |
223e47cc LB |
32 | let ctrl = ctrl.clone(); |
33 | let i = i.clone(); | |
c34b1796 | 34 | thread::spawn(move|| map_task(ctrl.clone(), i.clone()) ); |
223e47cc LB |
35 | } |
36 | } | |
37 | ||
1a4d82fc JJ |
38 | fn map_task(ctrl: Sender<ctrl_proto>, input: String) { |
39 | let mut intermediates = HashMap::new(); | |
223e47cc | 40 | |
c34b1796 | 41 | fn emit(im: &mut HashMap<String, isize>, |
1a4d82fc JJ |
42 | ctrl: Sender<ctrl_proto>, key: String, |
43 | _val: String) { | |
223e47cc LB |
44 | if im.contains_key(&key) { |
45 | return; | |
46 | } | |
1a4d82fc JJ |
47 | let (tx, rx) = channel(); |
48 | println!("sending find_reducer"); | |
49 | ctrl.send(ctrl_proto::find_reducer(key.as_bytes().to_vec(), tx)).unwrap(); | |
50 | println!("receiving"); | |
51 | let c = rx.recv().unwrap(); | |
52 | println!("{}", c); | |
223e47cc LB |
53 | im.insert(key, c); |
54 | } | |
55 | ||
56 | let ctrl_clone = ctrl.clone(); | |
c34b1796 | 57 | ::map(input, Box::new(|a,b| emit(&mut intermediates, ctrl.clone(), a, b))); |
1a4d82fc | 58 | ctrl_clone.send(ctrl_proto::mapper_done).unwrap(); |
223e47cc LB |
59 | } |
60 | ||
1a4d82fc JJ |
61 | pub fn map_reduce(inputs: Vec<String>) { |
62 | let (tx, rx) = channel(); | |
223e47cc | 63 | |
bd371182 | 64 | // This thread becomes the master control thread. It spawns others |
223e47cc LB |
65 | // to do the rest. |
66 | ||
c34b1796 | 67 | let mut reducers: HashMap<String, isize>; |
223e47cc | 68 | |
970d7e83 | 69 | reducers = HashMap::new(); |
223e47cc | 70 | |
1a4d82fc | 71 | start_mappers(tx, inputs.clone()); |
223e47cc | 72 | |
c34b1796 | 73 | let mut num_mappers = inputs.len() as isize; |
223e47cc LB |
74 | |
75 | while num_mappers > 0 { | |
1a4d82fc JJ |
76 | match rx.recv().unwrap() { |
77 | ctrl_proto::mapper_done => { num_mappers -= 1; } | |
78 | ctrl_proto::find_reducer(k, cc) => { | |
223e47cc | 79 | let mut c; |
85aaf69f | 80 | match reducers.get(&str::from_utf8(&k).unwrap().to_string()) { |
223e47cc LB |
81 | Some(&_c) => { c = _c; } |
82 | None => { c = 0; } | |
83 | } | |
1a4d82fc | 84 | cc.send(c).unwrap(); |
223e47cc LB |
85 | } |
86 | } | |
87 | } | |
88 | } | |
89 | } | |
90 | ||
91 | pub fn main() { | |
1a4d82fc | 92 | map_reduce::map_reduce( |
c30ab7b3 | 93 | vec!["../src/test/run-pass/hashmap-memory.rs".to_string()]); |
223e47cc | 94 | } |