1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 // Authors: Douglas Gregor
10 #include <boost/assert.hpp>
11 #include <boost/property_map/parallel/distributed_property_map.hpp>
12 #include <boost/property_map/parallel/detail/untracked_pair.hpp>
13 #include <boost/type_traits/is_base_and_derived.hpp>
14 #include <boost/bind.hpp>
15 #include <boost/property_map/parallel/simple_trigger.hpp>
17 namespace boost { namespace parallel {
19 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
20 template<typename Reduce>
22 ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
23 const StorageMap& pm, const Reduce& reduce)
24 : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
26 typedef handle_message<Reduce> Handler;
28 data->ghost_cells.reset(new ghost_cells_type());
29 data->reset = &data_t::template do_reset<Reduce>;
30 data->process_group.replace_handler(Handler(data, reduce));
31 data->process_group.template get_receiver<Handler>()
32 ->setup_triggers(data->process_group);
35 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
36 PBGL_DISTRIB_PMAP::~distributed_property_map() { }
38 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
39 template<typename Reduce>
41 PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
43 typedef handle_message<Reduce> Handler;
44 data->process_group.replace_handler(Handler(data, reduce));
45 Handler* handler = data->process_group.template get_receiver<Handler>();
46 BOOST_ASSERT(handler);
47 handler->setup_triggers(data->process_group);
48 data->get_default_value = reduce;
49 data->has_default_resolver = Reduce::non_default_resolver;
50 int model = data->model;
51 data->reset = &data_t::template do_reset<Reduce>;
52 set_consistency_model(model);
55 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
56 void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
58 if (data->max_ghost_cells == 0)
61 while (data->ghost_cells->size() > data->max_ghost_cells) {
62 // Evict the last ghost cell
64 if (data->model & cm_flush) {
65 // We need to flush values when we evict them.
66 boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
67 = data->ghost_cells->back();
68 send(data->process_group, get(data->global, victim.first).first,
69 property_map_put, victim);
72 // Actually remove the ghost cell
73 data->ghost_cells->pop_back();
77 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
78 typename PBGL_DISTRIB_PMAP::value_type&
79 PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
82 ghost_cells_key_index_type const& key_index
83 = data->ghost_cells->template get<1>();
85 // Search for the ghost cell by key, and project back to the sequence
87 = data->ghost_cells->template project<0>(key_index.find(key));
88 if (ghost_cell == data->ghost_cells->end()) {
90 if (data->has_default_resolver)
91 // Since we have a default resolver, use it to create a default
92 // value for this ghost cell.
93 value = data->get_default_value(key);
94 else if (request_if_missing)
95 // Request the actual value of this key from its owner
96 send_oob_with_reply(data->process_group, get(data->global, key).first,
97 property_map_get, key, value);
101 // Create a ghost cell containing the new value
103 = data->ghost_cells->push_front(std::make_pair(key, value)).first;
105 // If we need to, prune the ghost cells
106 if (data->max_ghost_cells > 0)
108 } else if (data->max_ghost_cells > 0)
109 // Put this cell at the beginning of the MRU list
110 data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
112 return const_cast<value_type&>(ghost_cell->second);
115 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
116 template<typename Reduce>
119 ::handle_message<Reduce>::operator()(process_id_type source, int tag)
124 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
125 template<typename Reduce>
127 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
128 handle_put(int /*source*/, int /*tag*/,
129 const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
133 shared_ptr<data_t> data(data_ptr);
135 owner_local_pair p = get(data->global, req.first);
136 BOOST_ASSERT(p.first == process_id(data->process_group));
138 detail::maybe_put(data->storage, p.second,
140 get(data->storage, p.second),
144 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
145 template<typename Reduce>
146 typename PBGL_DISTRIB_PMAP::value_type
147 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
148 handle_get(int source, int /*tag*/, const key_type& key,
149 trigger_receive_context)
153 shared_ptr<data_t> data(data_ptr);
156 owner_local_pair p = get(data->global, key);
157 return get(data->storage, p.second);
160 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
161 template<typename Reduce>
163 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
164 handle_multiget(int source, int tag, const std::vector<key_type>& keys,
165 trigger_receive_context)
167 shared_ptr<data_t> data(data_ptr);
170 typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
171 std::vector<key_value> results;
172 std::size_t n = keys.size();
177 for (std::size_t i = 0; i < n; ++i) {
178 local_key_type local_key = get(data->global, keys[i]).second;
179 results.push_back(key_value(keys[i], get(data->storage, local_key)));
181 send(data->process_group, source, property_map_multiget_reply, results);
184 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
185 template<typename Reduce>
187 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
188 handle_multiget_reply
189 (int source, int tag,
190 const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
191 trigger_receive_context)
193 shared_ptr<data_t> data(data_ptr);
197 ghost_cells_key_index_type const& key_index
198 = data->ghost_cells->template get<1>();
200 std::size_t n = msg.size();
201 for (std::size_t i = 0; i < n; ++i) {
202 // Search for the ghost cell by key, and project back to the sequence
204 = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
206 if (position != data->ghost_cells->end())
207 const_cast<value_type&>(position->second) = msg[i].second;
211 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
212 template<typename Reduce>
214 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
216 (int source, int tag,
217 const std::vector<unsafe_pair<local_key_type, value_type> >& values,
218 trigger_receive_context)
222 shared_ptr<data_t> data(data_ptr);
225 std::size_t n = values.size();
226 for (std::size_t i = 0; i < n; ++i) {
227 local_key_type local_key = values[i].first;
228 value_type local_value = get(data->storage, local_key);
229 detail::maybe_put(data->storage, values[i].first,
230 reduce(values[i].first,
236 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
237 template<typename Reduce>
239 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
240 setup_triggers(process_group_type& pg)
242 using boost::parallel::simple_trigger;
244 simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
245 simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
246 simple_trigger(pg, property_map_multiget, this,
247 &handle_message::handle_multiget);
248 simple_trigger(pg, property_map_multiget_reply, this,
249 &handle_message::handle_multiget_reply);
250 simple_trigger(pg, property_map_multiput, this,
251 &handle_message::handle_multiput);
254 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
257 ::on_synchronize::operator()()
259 int stage=0; // we only get called at the start now
260 shared_ptr<data_t> data(data_ptr);
263 // Determine in which stage backward consistency messages should be sent.
264 int backward_stage = -1;
265 if (data->model & cm_backward) {
266 if (data->model & cm_flush) backward_stage = 1;
267 else backward_stage = 0;
270 // Flush results in first stage
271 if (stage == 0 && data->model & cm_flush)
274 // Backward consistency
275 if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
276 data->refresh_ghost_cells();
278 // Optionally clear results
279 if (data->model & cm_clear)
282 // Optionally reset results
283 if (data->model & cm_reset) {
284 if (data->reset) ((*data).*data->reset)();
289 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
291 PBGL_DISTRIB_PMAP::set_consistency_model(int model)
295 bool need_on_synchronize = (model != cm_forward);
297 // Backward consistency is a two-stage process.
298 if (model & cm_backward) {
299 // For backward consistency to work, we absolutely cannot throw
300 // away any ghost cells.
301 data->max_ghost_cells = 0;
304 // attach the on_synchronize handler.
305 if (need_on_synchronize)
306 data->process_group.replace_on_synchronize_handler(on_synchronize(data));
309 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
311 PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
313 if ((data->model & cm_backward) && max_ghost_cells > 0)
314 boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
315 "cannot limit ghost-cell usage with a backward "
316 "consistency model"));
318 if (max_ghost_cells == 1)
319 // It is not safe to have only 1 ghost cell; the cell() method
323 data->max_ghost_cells = max_ghost_cells;
327 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
328 void PBGL_DISTRIB_PMAP::clear()
333 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
334 void PBGL_DISTRIB_PMAP::data_t::clear()
336 ghost_cells->clear();
339 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
340 void PBGL_DISTRIB_PMAP::reset()
342 if (data->reset) ((*data).*data->reset)();
345 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
346 void PBGL_DISTRIB_PMAP::flush()
351 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
352 void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
356 std::vector<std::vector<key_type> > keys;
357 keys.resize(num_processes(process_group));
359 // Collect the set of keys for which we will request values
360 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
361 keys[get(global, i->first).first].push_back(i->first);
363 // Send multiget requests to each of the other processors
364 typedef typename ProcessGroup::process_size_type process_size_type;
365 process_size_type n = num_processes(process_group);
366 process_id_type id = process_id(process_group);
367 for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
368 if (!keys[p].empty())
369 send(process_group, p, property_map_multiget, keys[p]);
373 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
374 void PBGL_DISTRIB_PMAP::data_t::flush()
378 int n = num_processes(process_group);
379 std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
382 // Collect all of the flushed values
383 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
384 std::pair<int, local_key_type> g = get(global, i->first);
385 values[g.first].push_back(std::make_pair(g.second, i->second));
388 // Transmit flushed values
389 for (int p = 0; p < n; ++p) {
390 if (!values[p].empty())
391 send(process_group, p, property_map_multiput, values[p]);
395 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
396 void PBGL_DISTRIB_PMAP::do_synchronize()
398 if (data->model & cm_backward) {
399 synchronize(data->process_group);
403 // Request refreshes of the values of our ghost cells
404 data->refresh_ghost_cells();
406 // Allows all of the multigets to get to their destinations
407 synchronize(data->process_group);
409 // Allows all of the multiget responses to get to their destinations
410 synchronize(data->process_group);
413 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
414 template<typename Resolver>
415 void PBGL_DISTRIB_PMAP::data_t::do_reset()
417 Resolver* resolver = get_default_value.template target<Resolver>();
418 BOOST_ASSERT(resolver);
420 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
421 const_cast<value_type&>(i->second) = (*resolver)(i->first);
424 } } // end namespace boost::parallel