]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2004-2006 The Trustees of Indiana University. |
2 | ||
3 | // Use, modification and distribution is subject to the Boost Software | |
4 | // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | ||
7 | // Authors: Douglas Gregor | |
8 | // Nick Edmonds | |
9 | // Andrew Lumsdaine | |
10 | #include <boost/assert.hpp> | |
11 | #include <boost/property_map/parallel/distributed_property_map.hpp> | |
12 | #include <boost/property_map/parallel/detail/untracked_pair.hpp> | |
13 | #include <boost/type_traits/is_base_and_derived.hpp> | |
14 | #include <boost/bind.hpp> | |
15 | #include <boost/property_map/parallel/simple_trigger.hpp> | |
16 | ||
17 | namespace boost { namespace parallel { | |
18 | ||
19 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
20 | template<typename Reduce> | |
21 | PBGL_DISTRIB_PMAP | |
22 | ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global, | |
23 | const StorageMap& pm, const Reduce& reduce) | |
24 | : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver)) | |
25 | { | |
26 | typedef handle_message<Reduce> Handler; | |
27 | ||
28 | data->ghost_cells.reset(new ghost_cells_type()); | |
29 | data->reset = &data_t::template do_reset<Reduce>; | |
30 | data->process_group.replace_handler(Handler(data, reduce)); | |
31 | data->process_group.template get_receiver<Handler>() | |
32 | ->setup_triggers(data->process_group); | |
33 | } | |
34 | ||
35 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
36 | PBGL_DISTRIB_PMAP::~distributed_property_map() { } | |
37 | ||
38 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
39 | template<typename Reduce> | |
40 | void | |
41 | PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce) | |
42 | { | |
43 | typedef handle_message<Reduce> Handler; | |
44 | data->process_group.replace_handler(Handler(data, reduce)); | |
45 | Handler* handler = data->process_group.template get_receiver<Handler>(); | |
46 | BOOST_ASSERT(handler); | |
47 | handler->setup_triggers(data->process_group); | |
48 | data->get_default_value = reduce; | |
49 | data->has_default_resolver = Reduce::non_default_resolver; | |
50 | int model = data->model; | |
51 | data->reset = &data_t::template do_reset<Reduce>; | |
52 | set_consistency_model(model); | |
53 | } | |
54 | ||
55 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
56 | void PBGL_DISTRIB_PMAP::prune_ghost_cells() const | |
57 | { | |
58 | if (data->max_ghost_cells == 0) | |
59 | return; | |
60 | ||
61 | while (data->ghost_cells->size() > data->max_ghost_cells) { | |
62 | // Evict the last ghost cell | |
63 | ||
64 | if (data->model & cm_flush) { | |
65 | // We need to flush values when we evict them. | |
66 | boost::parallel::detail::untracked_pair<key_type, value_type> const& victim | |
67 | = data->ghost_cells->back(); | |
68 | send(data->process_group, get(data->global, victim.first).first, | |
69 | property_map_put, victim); | |
70 | } | |
71 | ||
72 | // Actually remove the ghost cell | |
73 | data->ghost_cells->pop_back(); | |
74 | } | |
75 | } | |
76 | ||
77 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
78 | typename PBGL_DISTRIB_PMAP::value_type& | |
79 | PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const | |
80 | { | |
81 | // Index by key | |
82 | ghost_cells_key_index_type const& key_index | |
83 | = data->ghost_cells->template get<1>(); | |
84 | ||
85 | // Search for the ghost cell by key, and project back to the sequence | |
86 | iterator ghost_cell | |
87 | = data->ghost_cells->template project<0>(key_index.find(key)); | |
88 | if (ghost_cell == data->ghost_cells->end()) { | |
89 | value_type value; | |
90 | if (data->has_default_resolver) | |
91 | // Since we have a default resolver, use it to create a default | |
92 | // value for this ghost cell. | |
93 | value = data->get_default_value(key); | |
94 | else if (request_if_missing) | |
95 | // Request the actual value of this key from its owner | |
96 | send_oob_with_reply(data->process_group, get(data->global, key).first, | |
97 | property_map_get, key, value); | |
98 | else | |
99 | value = value_type(); | |
100 | ||
101 | // Create a ghost cell containing the new value | |
102 | ghost_cell | |
103 | = data->ghost_cells->push_front(std::make_pair(key, value)).first; | |
104 | ||
105 | // If we need to, prune the ghost cells | |
106 | if (data->max_ghost_cells > 0) | |
107 | prune_ghost_cells(); | |
108 | } else if (data->max_ghost_cells > 0) | |
109 | // Put this cell at the beginning of the MRU list | |
110 | data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell); | |
111 | ||
112 | return const_cast<value_type&>(ghost_cell->second); | |
113 | } | |
114 | ||
115 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
116 | template<typename Reduce> | |
117 | void | |
118 | PBGL_DISTRIB_PMAP | |
119 | ::handle_message<Reduce>::operator()(process_id_type source, int tag) | |
120 | { | |
121 | BOOST_ASSERT(false); | |
122 | } | |
123 | ||
124 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
125 | template<typename Reduce> | |
126 | void | |
127 | PBGL_DISTRIB_PMAP::handle_message<Reduce>:: | |
128 | handle_put(int /*source*/, int /*tag*/, | |
129 | const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context) | |
130 | { | |
131 | using boost::get; | |
132 | ||
133 | shared_ptr<data_t> data(data_ptr); | |
134 | ||
135 | owner_local_pair p = get(data->global, req.first); | |
136 | BOOST_ASSERT(p.first == process_id(data->process_group)); | |
137 | ||
138 | detail::maybe_put(data->storage, p.second, | |
139 | reduce(req.first, | |
140 | get(data->storage, p.second), | |
141 | req.second)); | |
142 | } | |
143 | ||
144 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
145 | template<typename Reduce> | |
146 | typename PBGL_DISTRIB_PMAP::value_type | |
147 | PBGL_DISTRIB_PMAP::handle_message<Reduce>:: | |
148 | handle_get(int source, int /*tag*/, const key_type& key, | |
149 | trigger_receive_context) | |
150 | { | |
151 | using boost::get; | |
152 | ||
153 | shared_ptr<data_t> data(data_ptr); | |
154 | BOOST_ASSERT(data); | |
155 | ||
156 | owner_local_pair p = get(data->global, key); | |
157 | return get(data->storage, p.second); | |
158 | } | |
159 | ||
160 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
161 | template<typename Reduce> | |
162 | void | |
163 | PBGL_DISTRIB_PMAP::handle_message<Reduce>:: | |
164 | handle_multiget(int source, int tag, const std::vector<key_type>& keys, | |
165 | trigger_receive_context) | |
166 | { | |
167 | shared_ptr<data_t> data(data_ptr); | |
168 | BOOST_ASSERT(data); | |
169 | ||
170 | typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value; | |
171 | std::vector<key_value> results; | |
172 | std::size_t n = keys.size(); | |
173 | results.reserve(n); | |
174 | ||
175 | using boost::get; | |
176 | ||
177 | for (std::size_t i = 0; i < n; ++i) { | |
178 | local_key_type local_key = get(data->global, keys[i]).second; | |
179 | results.push_back(key_value(keys[i], get(data->storage, local_key))); | |
180 | } | |
181 | send(data->process_group, source, property_map_multiget_reply, results); | |
182 | } | |
183 | ||
184 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
185 | template<typename Reduce> | |
186 | void | |
187 | PBGL_DISTRIB_PMAP::handle_message<Reduce>:: | |
188 | handle_multiget_reply | |
189 | (int source, int tag, | |
190 | const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg, | |
191 | trigger_receive_context) | |
192 | { | |
193 | shared_ptr<data_t> data(data_ptr); | |
194 | BOOST_ASSERT(data); | |
195 | ||
196 | // Index by key | |
197 | ghost_cells_key_index_type const& key_index | |
198 | = data->ghost_cells->template get<1>(); | |
199 | ||
200 | std::size_t n = msg.size(); | |
201 | for (std::size_t i = 0; i < n; ++i) { | |
202 | // Search for the ghost cell by key, and project back to the sequence | |
203 | iterator position | |
204 | = data->ghost_cells->template project<0>(key_index.find(msg[i].first)); | |
205 | ||
206 | if (position != data->ghost_cells->end()) | |
207 | const_cast<value_type&>(position->second) = msg[i].second; | |
208 | } | |
209 | } | |
210 | ||
211 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
212 | template<typename Reduce> | |
213 | void | |
214 | PBGL_DISTRIB_PMAP::handle_message<Reduce>:: | |
215 | handle_multiput | |
216 | (int source, int tag, | |
217 | const std::vector<unsafe_pair<local_key_type, value_type> >& values, | |
218 | trigger_receive_context) | |
219 | { | |
220 | using boost::get; | |
221 | ||
222 | shared_ptr<data_t> data(data_ptr); | |
223 | BOOST_ASSERT(data); | |
224 | ||
225 | std::size_t n = values.size(); | |
226 | for (std::size_t i = 0; i < n; ++i) { | |
227 | local_key_type local_key = values[i].first; | |
228 | value_type local_value = get(data->storage, local_key); | |
229 | detail::maybe_put(data->storage, values[i].first, | |
230 | reduce(values[i].first, | |
231 | local_value, | |
232 | values[i].second)); | |
233 | } | |
234 | } | |
235 | ||
236 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
237 | template<typename Reduce> | |
238 | void | |
239 | PBGL_DISTRIB_PMAP::handle_message<Reduce>:: | |
240 | setup_triggers(process_group_type& pg) | |
241 | { | |
242 | using boost::parallel::simple_trigger; | |
243 | ||
244 | simple_trigger(pg, property_map_put, this, &handle_message::handle_put); | |
245 | simple_trigger(pg, property_map_get, this, &handle_message::handle_get); | |
246 | simple_trigger(pg, property_map_multiget, this, | |
247 | &handle_message::handle_multiget); | |
248 | simple_trigger(pg, property_map_multiget_reply, this, | |
249 | &handle_message::handle_multiget_reply); | |
250 | simple_trigger(pg, property_map_multiput, this, | |
251 | &handle_message::handle_multiput); | |
252 | } | |
253 | ||
254 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
255 | void | |
256 | PBGL_DISTRIB_PMAP | |
257 | ::on_synchronize::operator()() | |
258 | { | |
259 | int stage=0; // we only get called at the start now | |
260 | shared_ptr<data_t> data(data_ptr); | |
261 | BOOST_ASSERT(data); | |
262 | ||
263 | // Determine in which stage backward consistency messages should be sent. | |
264 | int backward_stage = -1; | |
265 | if (data->model & cm_backward) { | |
266 | if (data->model & cm_flush) backward_stage = 1; | |
267 | else backward_stage = 0; | |
268 | } | |
269 | ||
270 | // Flush results in first stage | |
271 | if (stage == 0 && data->model & cm_flush) | |
272 | data->flush(); | |
273 | ||
274 | // Backward consistency | |
275 | if (stage == backward_stage && !(data->model & (cm_clear | cm_reset))) | |
276 | data->refresh_ghost_cells(); | |
277 | ||
278 | // Optionally clear results | |
279 | if (data->model & cm_clear) | |
280 | data->clear(); | |
281 | ||
282 | // Optionally reset results | |
283 | if (data->model & cm_reset) { | |
284 | if (data->reset) ((*data).*data->reset)(); | |
285 | } | |
286 | } | |
287 | ||
288 | ||
289 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
290 | void | |
291 | PBGL_DISTRIB_PMAP::set_consistency_model(int model) | |
292 | { | |
293 | data->model = model; | |
294 | ||
295 | bool need_on_synchronize = (model != cm_forward); | |
296 | ||
297 | // Backward consistency is a two-stage process. | |
298 | if (model & cm_backward) { | |
299 | // For backward consistency to work, we absolutely cannot throw | |
300 | // away any ghost cells. | |
301 | data->max_ghost_cells = 0; | |
302 | } | |
303 | ||
304 | // attach the on_synchronize handler. | |
305 | if (need_on_synchronize) | |
306 | data->process_group.replace_on_synchronize_handler(on_synchronize(data)); | |
307 | } | |
308 | ||
309 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
310 | void | |
311 | PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells) | |
312 | { | |
313 | if ((data->model & cm_backward) && max_ghost_cells > 0) | |
314 | boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: " | |
315 | "cannot limit ghost-cell usage with a backward " | |
316 | "consistency model")); | |
317 | ||
318 | if (max_ghost_cells == 1) | |
319 | // It is not safe to have only 1 ghost cell; the cell() method | |
320 | // will fail. | |
321 | max_ghost_cells = 2; | |
322 | ||
323 | data->max_ghost_cells = max_ghost_cells; | |
324 | prune_ghost_cells(); | |
325 | } | |
326 | ||
327 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
328 | void PBGL_DISTRIB_PMAP::clear() | |
329 | { | |
330 | data->clear(); | |
331 | } | |
332 | ||
333 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
334 | void PBGL_DISTRIB_PMAP::data_t::clear() | |
335 | { | |
336 | ghost_cells->clear(); | |
337 | } | |
338 | ||
339 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
340 | void PBGL_DISTRIB_PMAP::reset() | |
341 | { | |
342 | if (data->reset) ((*data).*data->reset)(); | |
343 | } | |
344 | ||
345 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
346 | void PBGL_DISTRIB_PMAP::flush() | |
347 | { | |
348 | data->flush(); | |
349 | } | |
350 | ||
351 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
352 | void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells() | |
353 | { | |
354 | using boost::get; | |
355 | ||
356 | std::vector<std::vector<key_type> > keys; | |
357 | keys.resize(num_processes(process_group)); | |
358 | ||
359 | // Collect the set of keys for which we will request values | |
360 | for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) | |
361 | keys[get(global, i->first).first].push_back(i->first); | |
362 | ||
363 | // Send multiget requests to each of the other processors | |
364 | typedef typename ProcessGroup::process_size_type process_size_type; | |
365 | process_size_type n = num_processes(process_group); | |
366 | process_id_type id = process_id(process_group); | |
367 | for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) { | |
368 | if (!keys[p].empty()) | |
369 | send(process_group, p, property_map_multiget, keys[p]); | |
370 | } | |
371 | } | |
372 | ||
373 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
374 | void PBGL_DISTRIB_PMAP::data_t::flush() | |
375 | { | |
376 | using boost::get; | |
377 | ||
378 | int n = num_processes(process_group); | |
379 | std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values; | |
380 | values.resize(n); | |
381 | ||
382 | // Collect all of the flushed values | |
383 | for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) { | |
384 | std::pair<int, local_key_type> g = get(global, i->first); | |
385 | values[g.first].push_back(std::make_pair(g.second, i->second)); | |
386 | } | |
387 | ||
388 | // Transmit flushed values | |
389 | for (int p = 0; p < n; ++p) { | |
390 | if (!values[p].empty()) | |
391 | send(process_group, p, property_map_multiput, values[p]); | |
392 | } | |
393 | } | |
394 | ||
395 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
396 | void PBGL_DISTRIB_PMAP::do_synchronize() | |
397 | { | |
398 | if (data->model & cm_backward) { | |
399 | synchronize(data->process_group); | |
400 | return; | |
401 | } | |
402 | ||
403 | // Request refreshes of the values of our ghost cells | |
404 | data->refresh_ghost_cells(); | |
405 | ||
406 | // Allows all of the multigets to get to their destinations | |
407 | synchronize(data->process_group); | |
408 | ||
409 | // Allows all of the multiget responses to get to their destinations | |
410 | synchronize(data->process_group); | |
411 | } | |
412 | ||
413 | template<typename ProcessGroup, typename GlobalMap, typename StorageMap> | |
414 | template<typename Resolver> | |
415 | void PBGL_DISTRIB_PMAP::data_t::do_reset() | |
416 | { | |
417 | Resolver* resolver = get_default_value.template target<Resolver>(); | |
418 | BOOST_ASSERT(resolver); | |
419 | ||
420 | for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) | |
421 | const_cast<value_type&>(i->second) = (*resolver)(i->first); | |
422 | } | |
423 | ||
424 | } } // end namespace boost::parallel |