]>
Commit | Line | Data |
---|---|---|
f20569fa XL |
1 | extern crate rand; |
2 | extern crate timely; | |
3 | extern crate differential_dataflow; | |
4 | ||
5 | use rand::{Rng, SeedableRng, StdRng}; | |
6 | ||
7 | use timely::dataflow::operators::*; | |
8 | ||
9 | use differential_dataflow::AsCollection; | |
10 | use differential_dataflow::operators::*; | |
11 | use differential_dataflow::input::InputSession; | |
12 | ||
13 | // mod loglikelihoodratio; | |
14 | ||
15 | fn main() { | |
16 | ||
17 | // define a new timely dataflow computation. | |
18 | timely::execute_from_args(std::env::args().skip(6), move |worker| { | |
19 | ||
20 | // capture parameters of the experiment. | |
21 | let users: usize = std::env::args().nth(1).unwrap().parse().unwrap(); | |
22 | let items: usize = std::env::args().nth(2).unwrap().parse().unwrap(); | |
23 | let scale: usize = std::env::args().nth(3).unwrap().parse().unwrap(); | |
24 | let batch: usize = std::env::args().nth(4).unwrap().parse().unwrap(); | |
25 | let noisy: bool = std::env::args().nth(5).unwrap() == "noisy"; | |
26 | ||
27 | let index = worker.index(); | |
28 | let peers = worker.peers(); | |
29 | ||
30 | let (input, probe) = worker.dataflow(|scope| { | |
31 | ||
32 | // input of (user, item) collection. | |
33 | let (input, occurrences) = scope.new_input(); | |
34 | let occurrences = occurrences.as_collection(); | |
35 | ||
36 | //TODO adjust code to only work with upper triangular half of cooccurrence matrix | |
37 | ||
38 | /* Compute the cooccurrence matrix C = A'A from the binary interaction matrix A. */ | |
39 | let cooccurrences = | |
40 | occurrences | |
41 | .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b)) | |
42 | .filter(|&(item_a, item_b)| item_a != item_b) | |
43 | .count(); | |
44 | ||
45 | /* compute the rowsums of C indicating how often we encounter individual items. */ | |
46 | let row_sums = | |
47 | occurrences | |
48 | .map(|(_user, item)| item) | |
49 | .count(); | |
50 | ||
51 | // row_sums.inspect(|record| println!("[row_sums] {:?}", record)); | |
52 | ||
53 | /* Join the cooccurrence pairs with the corresponding row sums. */ | |
54 | let mut cooccurrences_with_row_sums = cooccurrences | |
55 | .map(|((item_a, item_b), num_cooccurrences)| (item_a, (item_b, num_cooccurrences))) | |
56 | .join_map(&row_sums, |&item_a, &(item_b, num_cooccurrences), &row_sum_a| { | |
57 | assert!(row_sum_a > 0); | |
58 | (item_b, (item_a, num_cooccurrences, row_sum_a)) | |
59 | }) | |
60 | .join_map(&row_sums, |&item_b, &(item_a, num_cooccurrences, row_sum_a), &row_sum_b| { | |
61 | assert!(row_sum_a > 0); | |
62 | assert!(row_sum_b > 0); | |
63 | (item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b)) | |
64 | }); | |
65 | ||
66 | // cooccurrences_with_row_sums | |
67 | // .inspect(|record| println!("[cooccurrences_with_row_sums] {:?}", record)); | |
68 | ||
69 | // //TODO compute top-k "similar items" per item | |
70 | // /* Compute LLR scores for each item pair. */ | |
71 | // let llr_scores = cooccurrences_with_row_sums.map( | |
72 | // |(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))| { | |
73 | ||
74 | // println!( | |
75 | // "[llr_scores] item_a={} item_b={}, num_cooccurrences={} row_sum_a={} row_sum_b={}", | |
76 | // item_a, item_b, num_cooccurrences, row_sum_a, row_sum_b); | |
77 | ||
78 | // let k11: isize = num_cooccurrences; | |
79 | // let k12: isize = row_sum_a as isize - k11; | |
80 | // let k21: isize = row_sum_b as isize - k11; | |
81 | // let k22: isize = 10000 - k12 - k21 + k11; | |
82 | ||
83 | // let llr_score = loglikelihoodratio::log_likelihood_ratio(k11, k12, k21, k22); | |
84 | ||
85 | // ((item_a, item_b), llr_score) | |
86 | // }); | |
87 | ||
88 | if noisy { | |
89 | cooccurrences_with_row_sums = | |
90 | cooccurrences_with_row_sums | |
91 | .inspect(|x| println!("change: {:?}", x)); | |
92 | } | |
93 | ||
94 | let probe = | |
95 | cooccurrences_with_row_sums | |
96 | .probe(); | |
97 | /* | |
98 | // produce the (item, item) collection | |
99 | let cooccurrences = occurrences | |
100 | .join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b)); | |
101 | // count the occurrences of each item. | |
102 | let counts = cooccurrences | |
103 | .map(|(item_a,_)| item_a) | |
104 | .count(); | |
105 | // produce ((item1, item2), count1, count2, count12) tuples | |
106 | let cooccurrences_with_counts = cooccurrences | |
107 | .join_map(&counts, |&item_a, &item_b, &count_item_a| (item_b, (item_a, count_item_a))) | |
108 | .join_map(&counts, |&item_b, &(item_a, count_item_a), &count_item_b| { | |
109 | ((item_a, item_b), count_item_a, count_item_b) | |
110 | }); | |
111 | let probe = cooccurrences_with_counts | |
112 | .inspect(|x| println!("change: {:?}", x)) | |
113 | .probe(); | |
114 | */ | |
115 | (input, probe) | |
116 | }); | |
117 | ||
118 | let seed: &[_] = &[1, 2, 3, index]; | |
119 | let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions | |
120 | let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions | |
121 | ||
122 | let mut input = InputSession::from(input); | |
123 | ||
124 | for count in 0 .. scale { | |
125 | if count % peers == index { | |
126 | let user = rng1.gen_range(0, users); | |
127 | let item = rng1.gen_range(0, items); | |
128 | // println!("[INITIAL INPUT] ({}, {})", user, item); | |
129 | input.insert((user, item)); | |
130 | } | |
131 | } | |
132 | ||
133 | // load the initial data up! | |
134 | while probe.less_than(input.time()) { worker.step(); } | |
135 | ||
136 | for round in 1 .. { | |
137 | ||
138 | for element in (round * batch) .. ((round + 1) * batch) { | |
139 | if element % peers == index { | |
140 | // advance the input timestamp. | |
141 | input.advance_to(round * batch); | |
142 | // insert a new item. | |
143 | let user = rng1.gen_range(0, users); | |
144 | let item = rng1.gen_range(0, items); | |
145 | if noisy { println!("[INPUT: insert] ({}, {})", user, item); } | |
146 | input.insert((user, item)); | |
147 | // remove an old item. | |
148 | let user = rng2.gen_range(0, users); | |
149 | let item = rng2.gen_range(0, items); | |
150 | if noisy { println!("[INPUT: remove] ({}, {})", user, item); } | |
151 | input.remove((user, item)); | |
152 | } | |
153 | } | |
154 | ||
155 | input.advance_to(round * batch); | |
156 | input.flush(); | |
157 | ||
158 | while probe.less_than(input.time()) { worker.step(); } | |
159 | } | |
160 | }).unwrap(); | |
161 | } |