]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2005-2006 The Trustees of Indiana University. |
2 | ||
3 | // Use, modification and distribution is subject to the Boost Software | |
4 | // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | ||
7 | // Authors: Douglas Gregor | |
8 | // Andrew Lumsdaine | |
9 | #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP | |
10 | #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP | |
11 | ||
12 | #ifndef BOOST_GRAPH_USE_MPI | |
13 | #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" | |
14 | #endif | |
15 | ||
16 | #include <boost/graph/parallel/process_group.hpp> | |
17 | #include <boost/type_traits/is_convertible.hpp> | |
18 | #include <vector> | |
19 | #include <boost/assert.hpp> | |
20 | #include <boost/optional.hpp> | |
21 | #include <queue> | |
22 | ||
23 | namespace boost { namespace graph { namespace detail { | |
24 | ||
25 | template<typename ProcessGroup> | |
26 | void do_synchronize(ProcessGroup& pg) | |
27 | { | |
28 | using boost::parallel::synchronize; | |
29 | synchronize(pg); | |
30 | } | |
31 | ||
32 | struct remote_set_queued {}; | |
33 | struct remote_set_immediate {}; | |
34 | ||
35 | template<typename ProcessGroup> | |
36 | class remote_set_semantics | |
37 | { | |
38 | BOOST_STATIC_CONSTANT | |
39 | (bool, | |
40 | queued = (is_convertible< | |
41 | typename ProcessGroup::communication_category, | |
42 | boost::parallel::bsp_process_group_tag>::value)); | |
43 | ||
44 | public: | |
45 | typedef typename mpl::if_c<queued, | |
46 | remote_set_queued, | |
47 | remote_set_immediate>::type type; | |
48 | }; | |
49 | ||
50 | ||
51 | template<typename Derived, typename ProcessGroup, typename Value, | |
52 | typename OwnerMap, | |
53 | typename Semantics = typename remote_set_semantics<ProcessGroup>::type> | |
54 | class remote_update_set; | |
55 | ||
56 | /********************************************************************** | |
57 | * Remote updating set that queues messages until synchronization * | |
58 | **********************************************************************/ | |
59 | template<typename Derived, typename ProcessGroup, typename Value, | |
60 | typename OwnerMap> | |
61 | class remote_update_set<Derived, ProcessGroup, Value, OwnerMap, | |
62 | remote_set_queued> | |
63 | { | |
64 | typedef typename property_traits<OwnerMap>::key_type Key; | |
65 | typedef std::vector<std::pair<Key, Value> > Updates; | |
66 | typedef typename Updates::size_type updates_size_type; | |
67 | typedef typename Updates::value_type updates_pair_type; | |
68 | ||
69 | public: | |
70 | ||
71 | private: | |
72 | typedef typename ProcessGroup::process_id_type process_id_type; | |
73 | ||
74 | enum message_kind { | |
75 | /** Message containing the number of updates that will be sent in | |
76 | * a msg_updates message that will immediately follow. This | |
77 | * message will contain a single value of type | |
78 | * updates_size_type. | |
79 | */ | |
80 | msg_num_updates, | |
81 | ||
82 | /** Contains (key, value) pairs with all of the updates from a | |
83 | * particular source. The number of updates is variable, but will | |
84 | * be provided in a msg_num_updates message that immediately | |
85 | * preceeds this message. | |
86 | * | |
87 | */ | |
88 | msg_updates | |
89 | }; | |
90 | ||
91 | struct handle_messages | |
92 | { | |
93 | explicit | |
94 | handle_messages(remote_update_set* self, const ProcessGroup& pg) | |
95 | : self(self), update_sizes(num_processes(pg), 0) { } | |
96 | ||
97 | void operator()(process_id_type source, int tag) | |
98 | { | |
99 | switch(tag) { | |
100 | case msg_num_updates: | |
101 | { | |
102 | // Receive the # of updates | |
103 | updates_size_type num_updates; | |
104 | receive(self->process_group, source, tag, num_updates); | |
105 | ||
106 | update_sizes[source] = num_updates; | |
107 | } | |
108 | break; | |
109 | ||
110 | case msg_updates: | |
111 | { | |
112 | updates_size_type num_updates = update_sizes[source]; | |
113 | BOOST_ASSERT(num_updates); | |
114 | ||
115 | // Receive the actual updates | |
116 | std::vector<updates_pair_type> updates(num_updates); | |
117 | receive(self->process_group, source, msg_updates, &updates[0], | |
118 | num_updates); | |
119 | ||
120 | // Send updates to derived "receive_update" member | |
121 | Derived* derived = static_cast<Derived*>(self); | |
122 | for (updates_size_type u = 0; u < num_updates; ++u) | |
123 | derived->receive_update(source, updates[u].first, updates[u].second); | |
124 | ||
125 | update_sizes[source] = 0; | |
126 | } | |
127 | break; | |
128 | }; | |
129 | } | |
130 | ||
131 | private: | |
132 | remote_update_set* self; | |
133 | std::vector<updates_size_type> update_sizes; | |
134 | }; | |
135 | friend struct handle_messages; | |
136 | ||
137 | protected: | |
138 | remote_update_set(const ProcessGroup& pg, const OwnerMap& owner) | |
139 | : process_group(pg, handle_messages(this, pg)), | |
140 | updates(num_processes(pg)), owner(owner) { | |
141 | } | |
142 | ||
143 | ||
144 | void update(const Key& key, const Value& value) | |
145 | { | |
146 | if (get(owner, key) == process_id(process_group)) { | |
147 | Derived* derived = static_cast<Derived*>(this); | |
148 | derived->receive_update(get(owner, key), key, value); | |
149 | } | |
150 | else { | |
151 | updates[get(owner, key)].push_back(std::make_pair(key, value)); | |
152 | } | |
153 | } | |
154 | ||
155 | void collect() { } | |
156 | ||
157 | void synchronize() | |
158 | { | |
159 | // Emit all updates and then remove them | |
160 | process_id_type num_processes = updates.size(); | |
161 | for (process_id_type p = 0; p < num_processes; ++p) { | |
162 | if (!updates[p].empty()) { | |
163 | send(process_group, p, msg_num_updates, updates[p].size()); | |
164 | send(process_group, p, msg_updates, | |
165 | &updates[p].front(), updates[p].size()); | |
166 | updates[p].clear(); | |
167 | } | |
168 | } | |
169 | ||
170 | do_synchronize(process_group); | |
171 | } | |
172 | ||
173 | ProcessGroup process_group; | |
174 | ||
175 | private: | |
176 | std::vector<Updates> updates; | |
177 | OwnerMap owner; | |
178 | }; | |
179 | ||
180 | /********************************************************************** | |
181 | * Remote updating set that sends messages immediately * | |
182 | **********************************************************************/ | |
183 | template<typename Derived, typename ProcessGroup, typename Value, | |
184 | typename OwnerMap> | |
185 | class remote_update_set<Derived, ProcessGroup, Value, OwnerMap, | |
186 | remote_set_immediate> | |
187 | { | |
188 | typedef typename property_traits<OwnerMap>::key_type Key; | |
189 | typedef std::pair<Key, Value> update_pair_type; | |
190 | typedef typename std::vector<update_pair_type>::size_type updates_size_type; | |
191 | ||
192 | public: | |
193 | typedef typename ProcessGroup::process_id_type process_id_type; | |
194 | ||
195 | private: | |
196 | enum message_kind { | |
197 | /** Contains a (key, value) pair that will be updated. */ | |
198 | msg_update | |
199 | }; | |
200 | ||
201 | struct handle_messages | |
202 | { | |
203 | explicit handle_messages(remote_update_set* self, const ProcessGroup& pg) | |
204 | : self(self) | |
205 | { update_sizes.resize(num_processes(pg), 0); } | |
206 | ||
207 | void operator()(process_id_type source, int tag) | |
208 | { | |
209 | // Receive the # of updates | |
210 | BOOST_ASSERT(tag == msg_update); | |
211 | update_pair_type update; | |
212 | receive(self->process_group, source, tag, update); | |
213 | ||
214 | // Send update to derived "receive_update" member | |
215 | Derived* derived = static_cast<Derived*>(self); | |
216 | derived->receive_update(source, update.first, update.second); | |
217 | } | |
218 | ||
219 | private: | |
220 | std::vector<updates_size_type> update_sizes; | |
221 | remote_update_set* self; | |
222 | }; | |
223 | friend struct handle_messages; | |
224 | ||
225 | protected: | |
226 | remote_update_set(const ProcessGroup& pg, const OwnerMap& owner) | |
227 | : process_group(pg, handle_messages(this, pg)), owner(owner) { } | |
228 | ||
229 | void update(const Key& key, const Value& value) | |
230 | { | |
231 | if (get(owner, key) == process_id(process_group)) { | |
232 | Derived* derived = static_cast<Derived*>(this); | |
233 | derived->receive_update(get(owner, key), key, value); | |
234 | } | |
235 | else | |
236 | send(process_group, get(owner, key), msg_update, | |
237 | update_pair_type(key, value)); | |
238 | } | |
239 | ||
240 | void collect() | |
241 | { | |
242 | typedef std::pair<process_id_type, int> probe_type; | |
243 | handle_messages handler(this, process_group); | |
244 | while (optional<probe_type> stp = probe(process_group)) | |
245 | if (stp->second == msg_update) handler(stp->first, stp->second); | |
246 | } | |
247 | ||
248 | void synchronize() | |
249 | { | |
250 | do_synchronize(process_group); | |
251 | } | |
252 | ||
253 | ProcessGroup process_group; | |
254 | OwnerMap owner; | |
255 | }; | |
256 | ||
257 | } } } // end namespace boost::graph::detail | |
258 | ||
259 | #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP |