]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/property_map/parallel/impl/distributed_property_map.ipp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / boost / property_map / parallel / impl / distributed_property_map.ipp
1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
2
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6
7 // Authors: Douglas Gregor
8 // Nick Edmonds
9 // Andrew Lumsdaine
10 #include <boost/assert.hpp>
11 #include <boost/property_map/parallel/distributed_property_map.hpp>
12 #include <boost/property_map/parallel/detail/untracked_pair.hpp>
13 #include <boost/type_traits/is_base_and_derived.hpp>
14 #include <boost/property_map/parallel/simple_trigger.hpp>
15
16 namespace boost { namespace parallel {
17
18 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
19 template<typename Reduce>
20 PBGL_DISTRIB_PMAP
21 ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
22 const StorageMap& pm, const Reduce& reduce)
23 : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
24 {
25 typedef handle_message<Reduce> Handler;
26
27 data->ghost_cells.reset(new ghost_cells_type());
28 data->reset = &data_t::template do_reset<Reduce>;
29 data->process_group.replace_handler(Handler(data, reduce));
30 data->process_group.template get_receiver<Handler>()
31 ->setup_triggers(data->process_group);
32 }
33
34 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
35 PBGL_DISTRIB_PMAP::~distributed_property_map() { }
36
37 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
38 template<typename Reduce>
39 void
40 PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
41 {
42 typedef handle_message<Reduce> Handler;
43 data->process_group.replace_handler(Handler(data, reduce));
44 Handler* handler = data->process_group.template get_receiver<Handler>();
45 BOOST_ASSERT(handler);
46 handler->setup_triggers(data->process_group);
47 data->get_default_value = reduce;
48 data->has_default_resolver = Reduce::non_default_resolver;
49 int model = data->model;
50 data->reset = &data_t::template do_reset<Reduce>;
51 set_consistency_model(model);
52 }
53
54 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
55 void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
56 {
57 if (data->max_ghost_cells == 0)
58 return;
59
60 while (data->ghost_cells->size() > data->max_ghost_cells) {
61 // Evict the last ghost cell
62
63 if (data->model & cm_flush) {
64 // We need to flush values when we evict them.
65 boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
66 = data->ghost_cells->back();
67 send(data->process_group, get(data->global, victim.first).first,
68 property_map_put, victim);
69 }
70
71 // Actually remove the ghost cell
72 data->ghost_cells->pop_back();
73 }
74 }
75
76 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
77 typename PBGL_DISTRIB_PMAP::value_type&
78 PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
79 {
80 // Index by key
81 ghost_cells_key_index_type const& key_index
82 = data->ghost_cells->template get<1>();
83
84 // Search for the ghost cell by key, and project back to the sequence
85 iterator ghost_cell
86 = data->ghost_cells->template project<0>(key_index.find(key));
87 if (ghost_cell == data->ghost_cells->end()) {
88 value_type value;
89 if (data->has_default_resolver)
90 // Since we have a default resolver, use it to create a default
91 // value for this ghost cell.
92 value = data->get_default_value(key);
93 else if (request_if_missing)
94 // Request the actual value of this key from its owner
95 send_oob_with_reply(data->process_group, get(data->global, key).first,
96 property_map_get, key, value);
97 else
98 value = value_type();
99
100 // Create a ghost cell containing the new value
101 ghost_cell
102 = data->ghost_cells->push_front(std::make_pair(key, value)).first;
103
104 // If we need to, prune the ghost cells
105 if (data->max_ghost_cells > 0)
106 prune_ghost_cells();
107 } else if (data->max_ghost_cells > 0)
108 // Put this cell at the beginning of the MRU list
109 data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
110
111 return const_cast<value_type&>(ghost_cell->second);
112 }
113
114 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
115 template<typename Reduce>
116 void
117 PBGL_DISTRIB_PMAP
118 ::handle_message<Reduce>::operator()(process_id_type source, int tag)
119 {
120 BOOST_ASSERT(false);
121 }
122
123 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
124 template<typename Reduce>
125 void
126 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
127 handle_put(int /*source*/, int /*tag*/,
128 const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
129 {
130 using boost::get;
131
132 shared_ptr<data_t> data(data_ptr);
133
134 owner_local_pair p = get(data->global, req.first);
135 BOOST_ASSERT(p.first == process_id(data->process_group));
136
137 detail::maybe_put(data->storage, p.second,
138 reduce(req.first,
139 get(data->storage, p.second),
140 req.second));
141 }
142
143 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
144 template<typename Reduce>
145 typename PBGL_DISTRIB_PMAP::value_type
146 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
147 handle_get(int source, int /*tag*/, const key_type& key,
148 trigger_receive_context)
149 {
150 using boost::get;
151
152 shared_ptr<data_t> data(data_ptr);
153 BOOST_ASSERT(data);
154
155 owner_local_pair p = get(data->global, key);
156 return get(data->storage, p.second);
157 }
158
159 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
160 template<typename Reduce>
161 void
162 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
163 handle_multiget(int source, int tag, const std::vector<key_type>& keys,
164 trigger_receive_context)
165 {
166 shared_ptr<data_t> data(data_ptr);
167 BOOST_ASSERT(data);
168
169 typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
170 std::vector<key_value> results;
171 std::size_t n = keys.size();
172 results.reserve(n);
173
174 using boost::get;
175
176 for (std::size_t i = 0; i < n; ++i) {
177 local_key_type local_key = get(data->global, keys[i]).second;
178 results.push_back(key_value(keys[i], get(data->storage, local_key)));
179 }
180 send(data->process_group, source, property_map_multiget_reply, results);
181 }
182
183 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
184 template<typename Reduce>
185 void
186 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
187 handle_multiget_reply
188 (int source, int tag,
189 const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
190 trigger_receive_context)
191 {
192 shared_ptr<data_t> data(data_ptr);
193 BOOST_ASSERT(data);
194
195 // Index by key
196 ghost_cells_key_index_type const& key_index
197 = data->ghost_cells->template get<1>();
198
199 std::size_t n = msg.size();
200 for (std::size_t i = 0; i < n; ++i) {
201 // Search for the ghost cell by key, and project back to the sequence
202 iterator position
203 = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
204
205 if (position != data->ghost_cells->end())
206 const_cast<value_type&>(position->second) = msg[i].second;
207 }
208 }
209
210 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
211 template<typename Reduce>
212 void
213 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
214 handle_multiput
215 (int source, int tag,
216 const std::vector<unsafe_pair<local_key_type, value_type> >& values,
217 trigger_receive_context)
218 {
219 using boost::get;
220
221 shared_ptr<data_t> data(data_ptr);
222 BOOST_ASSERT(data);
223
224 std::size_t n = values.size();
225 for (std::size_t i = 0; i < n; ++i) {
226 local_key_type local_key = values[i].first;
227 value_type local_value = get(data->storage, local_key);
228 detail::maybe_put(data->storage, values[i].first,
229 reduce(values[i].first,
230 local_value,
231 values[i].second));
232 }
233 }
234
235 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
236 template<typename Reduce>
237 void
238 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
239 setup_triggers(process_group_type& pg)
240 {
241 using boost::parallel::simple_trigger;
242
243 simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
244 simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
245 simple_trigger(pg, property_map_multiget, this,
246 &handle_message::handle_multiget);
247 simple_trigger(pg, property_map_multiget_reply, this,
248 &handle_message::handle_multiget_reply);
249 simple_trigger(pg, property_map_multiput, this,
250 &handle_message::handle_multiput);
251 }
252
253 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
254 void
255 PBGL_DISTRIB_PMAP
256 ::on_synchronize::operator()()
257 {
258 int stage=0; // we only get called at the start now
259 shared_ptr<data_t> data(data_ptr);
260 BOOST_ASSERT(data);
261
262 // Determine in which stage backward consistency messages should be sent.
263 int backward_stage = -1;
264 if (data->model & cm_backward) {
265 if (data->model & cm_flush) backward_stage = 1;
266 else backward_stage = 0;
267 }
268
269 // Flush results in first stage
270 if (stage == 0 && data->model & cm_flush)
271 data->flush();
272
273 // Backward consistency
274 if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
275 data->refresh_ghost_cells();
276
277 // Optionally clear results
278 if (data->model & cm_clear)
279 data->clear();
280
281 // Optionally reset results
282 if (data->model & cm_reset) {
283 if (data->reset) ((*data).*data->reset)();
284 }
285 }
286
287
288 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
289 void
290 PBGL_DISTRIB_PMAP::set_consistency_model(int model)
291 {
292 data->model = model;
293
294 bool need_on_synchronize = (model != cm_forward);
295
296 // Backward consistency is a two-stage process.
297 if (model & cm_backward) {
298 // For backward consistency to work, we absolutely cannot throw
299 // away any ghost cells.
300 data->max_ghost_cells = 0;
301 }
302
303 // attach the on_synchronize handler.
304 if (need_on_synchronize)
305 data->process_group.replace_on_synchronize_handler(on_synchronize(data));
306 }
307
308 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
309 void
310 PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
311 {
312 if ((data->model & cm_backward) && max_ghost_cells > 0)
313 boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
314 "cannot limit ghost-cell usage with a backward "
315 "consistency model"));
316
317 if (max_ghost_cells == 1)
318 // It is not safe to have only 1 ghost cell; the cell() method
319 // will fail.
320 max_ghost_cells = 2;
321
322 data->max_ghost_cells = max_ghost_cells;
323 prune_ghost_cells();
324 }
325
326 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
327 void PBGL_DISTRIB_PMAP::clear()
328 {
329 data->clear();
330 }
331
332 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
333 void PBGL_DISTRIB_PMAP::data_t::clear()
334 {
335 ghost_cells->clear();
336 }
337
338 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
339 void PBGL_DISTRIB_PMAP::reset()
340 {
341 if (data->reset) ((*data).*data->reset)();
342 }
343
344 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
345 void PBGL_DISTRIB_PMAP::flush()
346 {
347 data->flush();
348 }
349
350 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
351 void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
352 {
353 using boost::get;
354
355 std::vector<std::vector<key_type> > keys;
356 keys.resize(num_processes(process_group));
357
358 // Collect the set of keys for which we will request values
359 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
360 keys[get(global, i->first).first].push_back(i->first);
361
362 // Send multiget requests to each of the other processors
363 typedef typename ProcessGroup::process_size_type process_size_type;
364 process_size_type n = num_processes(process_group);
365 process_id_type id = process_id(process_group);
366 for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
367 if (!keys[p].empty())
368 send(process_group, p, property_map_multiget, keys[p]);
369 }
370 }
371
372 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
373 void PBGL_DISTRIB_PMAP::data_t::flush()
374 {
375 using boost::get;
376
377 int n = num_processes(process_group);
378 std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
379 values.resize(n);
380
381 // Collect all of the flushed values
382 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
383 std::pair<int, local_key_type> g = get(global, i->first);
384 values[g.first].push_back(std::make_pair(g.second, i->second));
385 }
386
387 // Transmit flushed values
388 for (int p = 0; p < n; ++p) {
389 if (!values[p].empty())
390 send(process_group, p, property_map_multiput, values[p]);
391 }
392 }
393
394 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
395 void PBGL_DISTRIB_PMAP::do_synchronize()
396 {
397 if (data->model & cm_backward) {
398 synchronize(data->process_group);
399 return;
400 }
401
402 // Request refreshes of the values of our ghost cells
403 data->refresh_ghost_cells();
404
405 // Allows all of the multigets to get to their destinations
406 synchronize(data->process_group);
407
408 // Allows all of the multiget responses to get to their destinations
409 synchronize(data->process_group);
410 }
411
412 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
413 template<typename Resolver>
414 void PBGL_DISTRIB_PMAP::data_t::do_reset()
415 {
416 Resolver* resolver = get_default_value.template target<Resolver>();
417 BOOST_ASSERT(resolver);
418
419 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
420 const_cast<value_type&>(i->second) = (*resolver)(i->first);
421 }
422
423 } } // end namespace boost::parallel