3 #![allow(non_camel_case_types)]
6 // ignore-emscripten No support for threads
9 A somewhat reduced test case to expose some Valgrind issues.
11 This originally came from the word-count benchmark.
14 pub fn map(filename
: String
, mut emit
: map_reduce
::putter
) {
15 emit(filename
, "1".to_string());
19 use std
::collections
::HashMap
;
20 use std
::sync
::mpsc
::{channel, Sender}
;
24 pub type putter
<'a
> = Box
<dyn FnMut(String
, String
) + 'a
>;
26 pub type mapper
= extern fn(String
, putter
);
28 enum ctrl_proto { find_reducer(Vec<u8>, Sender<isize>), mapper_done, }
30 fn start_mappers(ctrl
: Sender
<ctrl_proto
>, inputs
: Vec
<String
>) {
32 let ctrl
= ctrl
.clone();
34 thread
::spawn(move|| map_task(ctrl
.clone(), i
.clone()) );
38 fn map_task(ctrl
: Sender
<ctrl_proto
>, input
: String
) {
39 let mut intermediates
= HashMap
::new();
41 fn emit(im
: &mut HashMap
<String
, isize>,
42 ctrl
: Sender
<ctrl_proto
>, key
: String
,
44 if im
.contains_key(&key
) {
47 let (tx
, rx
) = channel();
48 println
!("sending find_reducer");
49 ctrl
.send(ctrl_proto
::find_reducer(key
.as_bytes().to_vec(), tx
)).unwrap();
50 println
!("receiving");
51 let c
= rx
.recv().unwrap();
56 let ctrl_clone
= ctrl
.clone();
57 ::map(input
, Box
::new(|a
,b
| emit(&mut intermediates
, ctrl
.clone(), a
, b
)));
58 ctrl_clone
.send(ctrl_proto
::mapper_done
).unwrap();
61 pub fn map_reduce(inputs
: Vec
<String
>) {
62 let (tx
, rx
) = channel();
64 // This thread becomes the master control thread. It spawns others
67 let mut reducers
: HashMap
<String
, isize>;
69 reducers
= HashMap
::new();
71 start_mappers(tx
, inputs
.clone());
73 let mut num_mappers
= inputs
.len() as isize;
75 while num_mappers
> 0 {
76 match rx
.recv().unwrap() {
77 ctrl_proto
::mapper_done
=> { num_mappers -= 1; }
78 ctrl_proto
::find_reducer(k
, cc
) => {
80 match reducers
.get(&str::from_utf8(&k
).unwrap().to_string()) {
81 Some(&_c
) => { c = _c; }
92 map_reduce
::map_reduce(
93 vec
!["../src/test/run-pass/hashmap-memory.rs".to_string()]);