]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/graph_parallel/include/boost/graph/distributed/detail/remote_update_set.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / graph_parallel / include / boost / graph / distributed / detail / remote_update_set.hpp
CommitLineData
7c673cae
FG
1// Copyright (C) 2005-2006 The Trustees of Indiana University.
2
3// Use, modification and distribution is subject to the Boost Software
4// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7// Authors: Douglas Gregor
8// Andrew Lumsdaine
9#ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
10#define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
11
12#ifndef BOOST_GRAPH_USE_MPI
13#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
14#endif
15
16#include <boost/graph/parallel/process_group.hpp>
17#include <boost/type_traits/is_convertible.hpp>
18#include <vector>
19#include <boost/assert.hpp>
20#include <boost/optional.hpp>
21#include <queue>
22
23namespace boost { namespace graph { namespace detail {
24
25template<typename ProcessGroup>
26void do_synchronize(ProcessGroup& pg)
27{
28 using boost::parallel::synchronize;
29 synchronize(pg);
30}
31
32struct remote_set_queued {};
33struct remote_set_immediate {};
34
35template<typename ProcessGroup>
36class remote_set_semantics
37{
38 BOOST_STATIC_CONSTANT
39 (bool,
40 queued = (is_convertible<
41 typename ProcessGroup::communication_category,
42 boost::parallel::bsp_process_group_tag>::value));
43
44 public:
45 typedef typename mpl::if_c<queued,
46 remote_set_queued,
47 remote_set_immediate>::type type;
48};
49
50
51template<typename Derived, typename ProcessGroup, typename Value,
52 typename OwnerMap,
53 typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
54class remote_update_set;
55
56/**********************************************************************
57 * Remote updating set that queues messages until synchronization *
58 **********************************************************************/
59template<typename Derived, typename ProcessGroup, typename Value,
60 typename OwnerMap>
61class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
62 remote_set_queued>
63{
64 typedef typename property_traits<OwnerMap>::key_type Key;
65 typedef std::vector<std::pair<Key, Value> > Updates;
66 typedef typename Updates::size_type updates_size_type;
67 typedef typename Updates::value_type updates_pair_type;
68
69public:
70
71private:
72 typedef typename ProcessGroup::process_id_type process_id_type;
73
74 enum message_kind {
75 /** Message containing the number of updates that will be sent in
76 * a msg_updates message that will immediately follow. This
77 * message will contain a single value of type
78 * updates_size_type.
79 */
80 msg_num_updates,
81
82 /** Contains (key, value) pairs with all of the updates from a
83 * particular source. The number of updates is variable, but will
84 * be provided in a msg_num_updates message that immediately
85 * preceeds this message.
86 *
87 */
88 msg_updates
89 };
90
91 struct handle_messages
92 {
93 explicit
94 handle_messages(remote_update_set* self, const ProcessGroup& pg)
95 : self(self), update_sizes(num_processes(pg), 0) { }
96
97 void operator()(process_id_type source, int tag)
98 {
99 switch(tag) {
100 case msg_num_updates:
101 {
102 // Receive the # of updates
103 updates_size_type num_updates;
104 receive(self->process_group, source, tag, num_updates);
105
106 update_sizes[source] = num_updates;
107 }
108 break;
109
110 case msg_updates:
111 {
112 updates_size_type num_updates = update_sizes[source];
113 BOOST_ASSERT(num_updates);
114
115 // Receive the actual updates
116 std::vector<updates_pair_type> updates(num_updates);
117 receive(self->process_group, source, msg_updates, &updates[0],
118 num_updates);
119
120 // Send updates to derived "receive_update" member
121 Derived* derived = static_cast<Derived*>(self);
122 for (updates_size_type u = 0; u < num_updates; ++u)
123 derived->receive_update(source, updates[u].first, updates[u].second);
124
125 update_sizes[source] = 0;
126 }
127 break;
128 };
129 }
130
131 private:
132 remote_update_set* self;
133 std::vector<updates_size_type> update_sizes;
134 };
135 friend struct handle_messages;
136
137 protected:
138 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
139 : process_group(pg, handle_messages(this, pg)),
140 updates(num_processes(pg)), owner(owner) {
141 }
142
143
144 void update(const Key& key, const Value& value)
145 {
146 if (get(owner, key) == process_id(process_group)) {
147 Derived* derived = static_cast<Derived*>(this);
148 derived->receive_update(get(owner, key), key, value);
149 }
150 else {
151 updates[get(owner, key)].push_back(std::make_pair(key, value));
152 }
153 }
154
155 void collect() { }
156
157 void synchronize()
158 {
159 // Emit all updates and then remove them
160 process_id_type num_processes = updates.size();
161 for (process_id_type p = 0; p < num_processes; ++p) {
162 if (!updates[p].empty()) {
163 send(process_group, p, msg_num_updates, updates[p].size());
164 send(process_group, p, msg_updates,
165 &updates[p].front(), updates[p].size());
166 updates[p].clear();
167 }
168 }
169
170 do_synchronize(process_group);
171 }
172
173 ProcessGroup process_group;
174
175 private:
176 std::vector<Updates> updates;
177 OwnerMap owner;
178};
179
180/**********************************************************************
181 * Remote updating set that sends messages immediately *
182 **********************************************************************/
183template<typename Derived, typename ProcessGroup, typename Value,
184 typename OwnerMap>
185class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
186 remote_set_immediate>
187{
188 typedef typename property_traits<OwnerMap>::key_type Key;
189 typedef std::pair<Key, Value> update_pair_type;
190 typedef typename std::vector<update_pair_type>::size_type updates_size_type;
191
192public:
193 typedef typename ProcessGroup::process_id_type process_id_type;
194
195private:
196 enum message_kind {
197 /** Contains a (key, value) pair that will be updated. */
198 msg_update
199 };
200
201 struct handle_messages
202 {
203 explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
204 : self(self)
205 { update_sizes.resize(num_processes(pg), 0); }
206
207 void operator()(process_id_type source, int tag)
208 {
209 // Receive the # of updates
210 BOOST_ASSERT(tag == msg_update);
211 update_pair_type update;
212 receive(self->process_group, source, tag, update);
213
214 // Send update to derived "receive_update" member
215 Derived* derived = static_cast<Derived*>(self);
216 derived->receive_update(source, update.first, update.second);
217 }
218
219 private:
220 std::vector<updates_size_type> update_sizes;
221 remote_update_set* self;
222 };
223 friend struct handle_messages;
224
225 protected:
226 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
227 : process_group(pg, handle_messages(this, pg)), owner(owner) { }
228
229 void update(const Key& key, const Value& value)
230 {
231 if (get(owner, key) == process_id(process_group)) {
232 Derived* derived = static_cast<Derived*>(this);
233 derived->receive_update(get(owner, key), key, value);
234 }
235 else
236 send(process_group, get(owner, key), msg_update,
237 update_pair_type(key, value));
238 }
239
240 void collect()
241 {
242 typedef std::pair<process_id_type, int> probe_type;
243 handle_messages handler(this, process_group);
244 while (optional<probe_type> stp = probe(process_group))
245 if (stp->second == msg_update) handler(stp->first, stp->second);
246 }
247
248 void synchronize()
249 {
250 do_synchronize(process_group);
251 }
252
253 ProcessGroup process_group;
254 OwnerMap owner;
255};
256
257} } } // end namespace boost::graph::detail
258
259#endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP