]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- C++ -*- |
2 | ||
3 | // Copyright (C) 2004-2008 The Trustees of Indiana University. | |
4 | // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com> | |
5 | // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com> | |
6 | ||
7 | // Use, modification and distribution is subject to the Boost Software | |
8 | // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
9 | // http://www.boost.org/LICENSE_1_0.txt) | |
10 | ||
11 | // Authors: Douglas Gregor | |
12 | // Andrew Lumsdaine | |
13 | // Matthias Troyer | |
14 | ||
15 | //#define PBGL_PROCESS_GROUP_DEBUG | |
16 | ||
17 | #ifndef BOOST_GRAPH_USE_MPI | |
18 | #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" | |
19 | #endif | |
20 | ||
21 | #include <boost/assert.hpp> | |
22 | #include <algorithm> | |
23 | #include <boost/graph/parallel/detail/untracked_pair.hpp> | |
24 | #include <numeric> | |
25 | #include <iterator> | |
26 | #include <functional> | |
27 | #include <vector> | |
28 | #include <queue> | |
29 | #include <stack> | |
30 | #include <list> | |
31 | #include <boost/graph/distributed/detail/tag_allocator.hpp> | |
32 | #include <stdio.h> | |
33 | ||
34 | // #define PBGL_PROCESS_GROUP_DEBUG | |
35 | ||
36 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
37 | # include <iostream> | |
38 | #endif | |
39 | ||
40 | namespace boost { namespace graph { namespace distributed { | |
41 | ||
42 | struct mpi_process_group::impl | |
43 | { | |
44 | ||
45 | typedef mpi_process_group::message_header message_header; | |
46 | typedef mpi_process_group::outgoing_messages outgoing_messages; | |
47 | ||
48 | /** | |
49 | * Stores the incoming messages from a particular processor. | |
50 | * | |
51 | * @todo Evaluate whether we should use a deque instance, which | |
52 | * would reduce could reduce the cost of "receiving" messages and | |
53 | allow us to deallocate memory earlier, but increases the time | |
54 | spent in the synchronization step. | |
55 | */ | |
56 | struct incoming_messages { | |
57 | incoming_messages(); | |
58 | ~incoming_messages() {} | |
59 | ||
60 | std::vector<message_header> headers; | |
61 | buffer_type buffer; | |
62 | std::vector<std::vector<message_header>::iterator> next_header; | |
63 | }; | |
64 | ||
65 | struct batch_request { | |
66 | MPI_Request request; | |
67 | buffer_type buffer; | |
68 | }; | |
69 | ||
70 | // send once we have a certain number of messages or bytes in the buffer | |
71 | // these numbers need to be tuned, we keep them small at first for testing | |
72 | std::size_t batch_header_number; | |
73 | std::size_t batch_buffer_size; | |
74 | std::size_t batch_message_size; | |
75 | ||
76 | /** | |
77 | * The actual MPI communicator used to transmit data. | |
78 | */ | |
79 | boost::mpi::communicator comm; | |
80 | ||
81 | /** | |
82 | * The MPI communicator used to transmit out-of-band replies. | |
83 | */ | |
84 | boost::mpi::communicator oob_reply_comm; | |
85 | ||
86 | /// Outgoing message information, indexed by destination processor. | |
87 | std::vector<outgoing_messages> outgoing; | |
88 | ||
89 | /// Incoming message information, indexed by source processor. | |
90 | std::vector<incoming_messages> incoming; | |
91 | ||
92 | /// The numbers of processors that have entered a synchronization stage | |
93 | std::vector<int> processors_synchronizing_stage; | |
94 | ||
95 | /// The synchronization stage of a processor | |
96 | std::vector<int> synchronizing_stage; | |
97 | ||
98 | /// Number of processors still sending messages | |
99 | std::vector<int> synchronizing_unfinished; | |
100 | ||
101 | /// Number of batches sent since last synchronization stage | |
102 | std::vector<int> number_sent_batches; | |
103 | ||
104 | /// Number of batches received minus number of expected batches | |
105 | std::vector<int> number_received_batches; | |
106 | ||
107 | ||
108 | /// The context of the currently-executing trigger, or @c trc_none | |
109 | /// if no trigger is executing. | |
110 | trigger_receive_context trigger_context; | |
111 | ||
112 | /// Non-zero indicates that we're processing batches | |
113 | /// Increment this when processing patches, | |
114 | /// decrement it when you're done. | |
115 | int processing_batches; | |
116 | ||
117 | /** | |
118 | * Contains all of the active blocks corresponding to attached | |
119 | * distributed data structures. | |
120 | */ | |
121 | blocks_type blocks; | |
122 | ||
123 | /// Whether we are currently synchronizing | |
124 | bool synchronizing; | |
125 | ||
126 | /// The MPI requests for posted sends of oob messages | |
127 | std::vector<MPI_Request> requests; | |
128 | ||
129 | /// The MPI buffers for posted irecvs of oob messages | |
130 | std::map<int,buffer_type> buffers; | |
131 | ||
132 | /// Queue for message batches received while already processing messages | |
133 | std::queue<std::pair<int,outgoing_messages> > new_batches; | |
134 | /// Maximum encountered size of the new_batches queue | |
135 | std::size_t max_received; | |
136 | ||
137 | /// The MPI requests and buffers for batchess being sent | |
138 | std::list<batch_request> sent_batches; | |
139 | /// Maximum encountered size of the sent_batches list | |
140 | std::size_t max_sent; | |
141 | ||
142 | /// Pre-allocated requests in a pool | |
143 | std::vector<batch_request> batch_pool; | |
144 | /// A stack controlling which batches are available | |
145 | std::stack<std::size_t> free_batches; | |
146 | ||
147 | void free_sent_batches(); | |
148 | ||
149 | // Tag allocator | |
150 | detail::tag_allocator allocated_tags; | |
151 | ||
152 | impl(std::size_t num_headers, std::size_t buffers_size, | |
153 | communicator_type parent_comm); | |
154 | ~impl(); | |
155 | ||
156 | private: | |
157 | void set_batch_size(std::size_t header_num, std::size_t buffer_sz); | |
158 | }; | |
159 | ||
160 | inline trigger_receive_context mpi_process_group::trigger_context() const | |
161 | { | |
162 | return impl_->trigger_context; | |
163 | } | |
164 | ||
165 | template<typename T> | |
166 | void | |
167 | mpi_process_group::send_impl(int dest, int tag, const T& value, | |
168 | mpl::true_ /*is_mpi_datatype*/) const | |
169 | { | |
170 | BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); | |
171 | ||
172 | impl::outgoing_messages& outgoing = impl_->outgoing[dest]; | |
173 | ||
174 | // Start constructing the message header | |
175 | impl::message_header header; | |
176 | header.source = process_id(*this); | |
177 | header.tag = tag; | |
178 | header.offset = outgoing.buffer.size(); | |
179 | ||
180 | boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer); | |
181 | oa << value; | |
182 | ||
183 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
184 | std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " | |
185 | << tag << ", bytes = " << packed_size << std::endl; | |
186 | #endif | |
187 | ||
188 | // Store the header | |
189 | header.bytes = outgoing.buffer.size() - header.offset; | |
190 | outgoing.headers.push_back(header); | |
191 | ||
192 | maybe_send_batch(dest); | |
193 | } | |
194 | ||
195 | ||
196 | template<typename T> | |
197 | void | |
198 | mpi_process_group::send_impl(int dest, int tag, const T& value, | |
199 | mpl::false_ /*is_mpi_datatype*/) const | |
200 | { | |
201 | BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); | |
202 | ||
203 | impl::outgoing_messages& outgoing = impl_->outgoing[dest]; | |
204 | ||
205 | // Start constructing the message header | |
206 | impl::message_header header; | |
207 | header.source = process_id(*this); | |
208 | header.tag = tag; | |
209 | header.offset = outgoing.buffer.size(); | |
210 | ||
211 | // Serialize into the buffer | |
212 | boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); | |
213 | out << value; | |
214 | ||
215 | // Store the header | |
216 | header.bytes = outgoing.buffer.size() - header.offset; | |
217 | outgoing.headers.push_back(header); | |
218 | maybe_send_batch(dest); | |
219 | ||
220 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
221 | std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " | |
222 | << tag << ", bytes = " << header.bytes << std::endl; | |
223 | #endif | |
224 | } | |
225 | ||
226 | template<typename T> | |
227 | inline void | |
228 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
229 | int tag, const T& value) | |
230 | { | |
231 | pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value, | |
232 | boost::mpi::is_mpi_datatype<T>()); | |
233 | } | |
234 | ||
235 | template<typename T> | |
236 | typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type | |
237 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
238 | int tag, const T values[], std::size_t n) | |
239 | { | |
240 | pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), | |
241 | boost::serialization::make_array(values,n), | |
242 | boost::mpl::true_()); | |
243 | } | |
244 | ||
245 | template<typename T> | |
246 | typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type | |
247 | mpi_process_group:: | |
248 | array_send_impl(int dest, int tag, const T values[], std::size_t n) const | |
249 | { | |
250 | BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); | |
251 | ||
252 | impl::outgoing_messages& outgoing = impl_->outgoing[dest]; | |
253 | ||
254 | // Start constructing the message header | |
255 | impl::message_header header; | |
256 | header.source = process_id(*this); | |
257 | header.tag = tag; | |
258 | header.offset = outgoing.buffer.size(); | |
259 | ||
260 | // Serialize into the buffer | |
261 | boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); | |
262 | out << n; | |
263 | ||
264 | for (std::size_t i = 0; i < n; ++i) | |
265 | out << values[i]; | |
266 | ||
267 | // Store the header | |
268 | header.bytes = outgoing.buffer.size() - header.offset; | |
269 | outgoing.headers.push_back(header); | |
270 | maybe_send_batch(dest); | |
271 | ||
272 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
273 | std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " | |
274 | << tag << ", bytes = " << header.bytes << std::endl; | |
275 | #endif | |
276 | } | |
277 | ||
278 | template<typename T> | |
279 | typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type | |
280 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
281 | int tag, const T values[], std::size_t n) | |
282 | { | |
283 | pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), | |
284 | values, n); | |
285 | } | |
286 | ||
287 | template<typename InputIterator> | |
288 | void | |
289 | send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, | |
290 | int tag, InputIterator first, InputIterator last) | |
291 | { | |
292 | typedef typename std::iterator_traits<InputIterator>::value_type value_type; | |
293 | std::vector<value_type> values(first, last); | |
294 | if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0); | |
295 | else send(pg, dest, tag, &values[0], values.size()); | |
296 | } | |
297 | ||
298 | template<typename T> | |
299 | bool | |
300 | mpi_process_group::receive_impl(int source, int tag, T& value, | |
301 | mpl::true_ /*is_mpi_datatype*/) const | |
302 | { | |
303 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
304 | std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = " | |
305 | << tag << std::endl; | |
306 | #endif | |
307 | ||
308 | impl::incoming_messages& incoming = impl_->incoming[source]; | |
309 | ||
310 | // Find the next header with the right tag | |
311 | std::vector<impl::message_header>::iterator header = | |
312 | incoming.next_header[my_block_number()]; | |
313 | while (header != incoming.headers.end() && header->tag != tag) ++header; | |
314 | ||
315 | // If no header is found, notify the caller | |
316 | if (header == incoming.headers.end()) return false; | |
317 | ||
318 | // Unpack the data | |
319 | if (header->bytes > 0) { | |
320 | boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer, | |
321 | archive::no_header, header->offset); | |
322 | ia >> value; | |
323 | } | |
324 | ||
325 | // Mark this message as received | |
326 | header->tag = -1; | |
327 | ||
328 | // Move the "next header" indicator to the next unreceived message | |
329 | while (incoming.next_header[my_block_number()] != incoming.headers.end() | |
330 | && incoming.next_header[my_block_number()]->tag == -1) | |
331 | ++incoming.next_header[my_block_number()]; | |
332 | ||
333 | if (incoming.next_header[my_block_number()] == incoming.headers.end()) { | |
334 | bool finished = true; | |
335 | for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { | |
336 | if (incoming.next_header[i] != incoming.headers.end()) finished = false; | |
337 | } | |
338 | ||
339 | if (finished) { | |
340 | std::vector<impl::message_header> no_headers; | |
341 | incoming.headers.swap(no_headers); | |
342 | buffer_type empty_buffer; | |
343 | incoming.buffer.swap(empty_buffer); | |
344 | for (std::size_t i = 0; i < incoming.next_header.size(); ++i) | |
345 | incoming.next_header[i] = incoming.headers.end(); | |
346 | } | |
347 | } | |
348 | ||
349 | return true; | |
350 | } | |
351 | ||
352 | template<typename T> | |
353 | bool | |
354 | mpi_process_group::receive_impl(int source, int tag, T& value, | |
355 | mpl::false_ /*is_mpi_datatype*/) const | |
356 | { | |
357 | impl::incoming_messages& incoming = impl_->incoming[source]; | |
358 | ||
359 | // Find the next header with the right tag | |
360 | std::vector<impl::message_header>::iterator header = | |
361 | incoming.next_header[my_block_number()]; | |
362 | while (header != incoming.headers.end() && header->tag != tag) ++header; | |
363 | ||
364 | // If no header is found, notify the caller | |
365 | if (header == incoming.headers.end()) return false; | |
366 | ||
367 | // Deserialize the data | |
368 | boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, | |
369 | archive::no_header, header->offset); | |
370 | in >> value; | |
371 | ||
372 | // Mark this message as received | |
373 | header->tag = -1; | |
374 | ||
375 | // Move the "next header" indicator to the next unreceived message | |
376 | while (incoming.next_header[my_block_number()] != incoming.headers.end() | |
377 | && incoming.next_header[my_block_number()]->tag == -1) | |
378 | ++incoming.next_header[my_block_number()]; | |
379 | ||
380 | if (incoming.next_header[my_block_number()] == incoming.headers.end()) { | |
381 | bool finished = true; | |
382 | for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { | |
383 | if (incoming.next_header[i] != incoming.headers.end()) finished = false; | |
384 | } | |
385 | ||
386 | if (finished) { | |
387 | std::vector<impl::message_header> no_headers; | |
388 | incoming.headers.swap(no_headers); | |
389 | buffer_type empty_buffer; | |
390 | incoming.buffer.swap(empty_buffer); | |
391 | for (std::size_t i = 0; i < incoming.next_header.size(); ++i) | |
392 | incoming.next_header[i] = incoming.headers.end(); | |
393 | } | |
394 | } | |
395 | ||
396 | return true; | |
397 | } | |
398 | ||
399 | template<typename T> | |
400 | typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type | |
401 | mpi_process_group:: | |
402 | array_receive_impl(int source, int tag, T* values, std::size_t& n) const | |
403 | { | |
404 | impl::incoming_messages& incoming = impl_->incoming[source]; | |
405 | ||
406 | // Find the next header with the right tag | |
407 | std::vector<impl::message_header>::iterator header = | |
408 | incoming.next_header[my_block_number()]; | |
409 | while (header != incoming.headers.end() && header->tag != tag) ++header; | |
410 | ||
411 | // If no header is found, notify the caller | |
412 | if (header == incoming.headers.end()) return false; | |
413 | ||
414 | // Deserialize the data | |
415 | boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, | |
416 | archive::no_header, header->offset); | |
417 | std::size_t num_sent; | |
418 | in >> num_sent; | |
419 | if (num_sent > n) | |
420 | std::cerr << "ERROR: Have " << num_sent << " items but only space for " | |
421 | << n << " items\n"; | |
422 | ||
423 | for (std::size_t i = 0; i < num_sent; ++i) | |
424 | in >> values[i]; | |
425 | n = num_sent; | |
426 | ||
427 | // Mark this message as received | |
428 | header->tag = -1; | |
429 | ||
430 | // Move the "next header" indicator to the next unreceived message | |
431 | while (incoming.next_header[my_block_number()] != incoming.headers.end() | |
432 | && incoming.next_header[my_block_number()]->tag == -1) | |
433 | ++incoming.next_header[my_block_number()]; | |
434 | ||
435 | if (incoming.next_header[my_block_number()] == incoming.headers.end()) { | |
436 | bool finished = true; | |
437 | for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { | |
438 | if (incoming.next_header[i] != incoming.headers.end()) finished = false; | |
439 | } | |
440 | ||
441 | if (finished) { | |
442 | std::vector<impl::message_header> no_headers; | |
443 | incoming.headers.swap(no_headers); | |
444 | buffer_type empty_buffer; | |
445 | incoming.buffer.swap(empty_buffer); | |
446 | for (std::size_t i = 0; i < incoming.next_header.size(); ++i) | |
447 | incoming.next_header[i] = incoming.headers.end(); | |
448 | } | |
449 | } | |
450 | ||
451 | return true; | |
452 | } | |
453 | ||
454 | // Construct triggers | |
455 | template<typename Type, typename Handler> | |
456 | void mpi_process_group::trigger(int tag, const Handler& handler) | |
457 | { | |
458 | BOOST_ASSERT(block_num); | |
459 | install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( | |
460 | new trigger_launcher<Type, Handler>(*this, tag, handler))); | |
461 | } | |
462 | ||
463 | template<typename Type, typename Handler> | |
464 | void mpi_process_group::trigger_with_reply(int tag, const Handler& handler) | |
465 | { | |
466 | BOOST_ASSERT(block_num); | |
467 | install_trigger(tag,my_block_number(),shared_ptr<trigger_base>( | |
468 | new reply_trigger_launcher<Type, Handler>(*this, tag, handler))); | |
469 | } | |
470 | ||
471 | template<typename Type, typename Handler> | |
472 | void mpi_process_group::global_trigger(int tag, const Handler& handler, | |
473 | std::size_t sz) | |
474 | { | |
475 | if (sz==0) // normal trigger | |
476 | install_trigger(tag,0,shared_ptr<trigger_base>( | |
477 | new global_trigger_launcher<Type, Handler>(*this, tag, handler))); | |
478 | else // trigger with irecv | |
479 | install_trigger(tag,0,shared_ptr<trigger_base>( | |
480 | new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz))); | |
481 | ||
482 | } | |
483 | ||
484 | namespace detail { | |
485 | ||
486 | template<typename Type> | |
487 | void do_oob_receive(mpi_process_group const& self, | |
488 | int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) | |
489 | { | |
490 | using boost::mpi::get_mpi_datatype; | |
491 | ||
492 | //self.impl_->comm.recv(source,tag,data); | |
493 | MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm, | |
494 | MPI_STATUS_IGNORE); | |
495 | } | |
496 | ||
497 | template<typename Type> | |
498 | void do_oob_receive(mpi_process_group const& self, | |
499 | int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) | |
500 | { | |
501 | // self.impl_->comm.recv(source,tag,data); | |
502 | // Receive the size of the data packet | |
503 | boost::mpi::status status; | |
504 | status = self.impl_->comm.probe(source, tag); | |
505 | ||
506 | #if BOOST_VERSION >= 103600 | |
507 | int size = status.count<boost::mpi::packed>().get(); | |
508 | #else | |
509 | int size; | |
510 | MPI_Status& mpi_status = status; | |
511 | MPI_Get_count(&mpi_status, MPI_PACKED, &size); | |
512 | #endif | |
513 | ||
514 | // Receive the data packed itself | |
515 | boost::mpi::packed_iarchive in(self.impl_->comm); | |
516 | in.resize(size); | |
517 | MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm, | |
518 | MPI_STATUS_IGNORE); | |
519 | ||
520 | // Deserialize the data | |
521 | in >> data; | |
522 | } | |
523 | ||
524 | template<typename Type> | |
525 | void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) | |
526 | { | |
527 | do_oob_receive(self, source, tag, data, | |
528 | boost::mpi::is_mpi_datatype<Type>()); | |
529 | } | |
530 | ||
531 | ||
532 | } // namespace detail | |
533 | ||
534 | ||
535 | template<typename Type, typename Handler> | |
536 | void | |
537 | mpi_process_group::trigger_launcher<Type, Handler>:: | |
538 | receive(mpi_process_group const&, int source, int tag, | |
539 | trigger_receive_context context, int block) const | |
540 | { | |
541 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
542 | std::cerr << (out_of_band? "OOB trigger" : "Trigger") | |
543 | << " receive from source " << source << " and tag " << tag | |
544 | << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; | |
545 | #endif | |
546 | ||
547 | Type data; | |
548 | ||
549 | if (context == trc_out_of_band) { | |
550 | // Receive the message directly off the wire | |
551 | int realtag = self.encode_tag( | |
552 | block == -1 ? self.my_block_number() : block, tag); | |
553 | detail::do_oob_receive(self,source,realtag,data); | |
554 | } | |
555 | else | |
556 | // Receive the message out of the local buffer | |
557 | boost::graph::distributed::receive(self, source, tag, data); | |
558 | ||
559 | // Pass the message off to the handler | |
560 | handler(source, tag, data, context); | |
561 | } | |
562 | ||
563 | template<typename Type, typename Handler> | |
564 | void | |
565 | mpi_process_group::reply_trigger_launcher<Type, Handler>:: | |
566 | receive(mpi_process_group const&, int source, int tag, | |
567 | trigger_receive_context context, int block) const | |
568 | { | |
569 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
570 | std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger") | |
571 | << " receive from source " << source << " and tag " << tag | |
572 | << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; | |
573 | #endif | |
574 | BOOST_ASSERT(context == trc_out_of_band); | |
575 | ||
576 | boost::parallel::detail::untracked_pair<int, Type> data; | |
577 | ||
578 | // Receive the message directly off the wire | |
579 | int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block, | |
580 | tag); | |
581 | detail::do_oob_receive(self, source, realtag, data); | |
582 | ||
583 | // Pass the message off to the handler and send the result back to | |
584 | // the source. | |
585 | send_oob(self, source, data.first, | |
586 | handler(source, tag, data.second, context), -2); | |
587 | } | |
588 | ||
589 | template<typename Type, typename Handler> | |
590 | void | |
591 | mpi_process_group::global_trigger_launcher<Type, Handler>:: | |
592 | receive(mpi_process_group const& self, int source, int tag, | |
593 | trigger_receive_context context, int block) const | |
594 | { | |
595 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
596 | std::cerr << (out_of_band? "OOB trigger" : "Trigger") | |
597 | << " receive from source " << source << " and tag " << tag | |
598 | << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; | |
599 | #endif | |
600 | ||
601 | Type data; | |
602 | ||
603 | if (context == trc_out_of_band) { | |
604 | // Receive the message directly off the wire | |
605 | int realtag = self.encode_tag( | |
606 | block == -1 ? self.my_block_number() : block, tag); | |
607 | detail::do_oob_receive(self,source,realtag,data); | |
608 | } | |
609 | else | |
610 | // Receive the message out of the local buffer | |
611 | boost::graph::distributed::receive(self, source, tag, data); | |
612 | ||
613 | // Pass the message off to the handler | |
614 | handler(self, source, tag, data, context); | |
615 | } | |
616 | ||
617 | ||
618 | template<typename Type, typename Handler> | |
619 | void | |
620 | mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: | |
621 | receive(mpi_process_group const& self, int source, int tag, | |
622 | trigger_receive_context context, int block) const | |
623 | { | |
624 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
625 | std::cerr << (out_of_band? "OOB trigger" : "Trigger") | |
626 | << " receive from source " << source << " and tag " << tag | |
627 | << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; | |
628 | #endif | |
629 | ||
630 | Type data; | |
631 | ||
632 | if (context == trc_out_of_band) { | |
633 | return; | |
634 | } | |
635 | BOOST_ASSERT (context == trc_irecv_out_of_band); | |
636 | ||
637 | // force posting of new MPI_Irecv, even though buffer is already allocated | |
638 | boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]); | |
639 | ia >> data; | |
640 | // Start a new receive | |
641 | prepare_receive(self,tag,true); | |
642 | // Pass the message off to the handler | |
643 | handler(self, source, tag, data, context); | |
644 | } | |
645 | ||
646 | ||
647 | template<typename Type, typename Handler> | |
648 | void | |
649 | mpi_process_group::global_irecv_trigger_launcher<Type, Handler>:: | |
650 | prepare_receive(mpi_process_group const& self, int tag, bool force) const | |
651 | { | |
652 | #ifdef PBGL_PROCESS_GROUP_DEBUG | |
653 | std::cerr << ("Posting Irecv for trigger") | |
654 | << " receive with tag " << tag << std::endl; | |
655 | #endif | |
656 | if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) { | |
657 | self.impl_->buffers[tag].resize(buffer_size); | |
658 | force = true; | |
659 | } | |
660 | BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size); | |
661 | ||
662 | //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >); | |
663 | if (force) { | |
664 | self.impl_->requests.push_back(MPI_Request()); | |
665 | MPI_Request* request = &self.impl_->requests.back(); | |
666 | MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size, | |
667 | MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request); | |
668 | } | |
669 | } | |
670 | ||
671 | ||
672 | template<typename T> | |
673 | inline mpi_process_group::process_id_type | |
674 | receive(const mpi_process_group& pg, int tag, T& value) | |
675 | { | |
676 | for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { | |
677 | if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), | |
678 | value, boost::mpi::is_mpi_datatype<T>())) | |
679 | return source; | |
680 | } | |
681 | BOOST_ASSERT (false); | |
682 | } | |
683 | ||
684 | template<typename T> | |
685 | typename | |
686 | enable_if<boost::mpi::is_mpi_datatype<T>, | |
687 | std::pair<mpi_process_group::process_id_type, std::size_t> >::type | |
688 | receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) | |
689 | { | |
690 | for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { | |
691 | bool result = | |
692 | pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), | |
693 | boost::serialization::make_array(values,n), | |
694 | boost::mpl::true_()); | |
695 | if (result) | |
696 | return std::make_pair(source, n); | |
697 | } | |
698 | BOOST_ASSERT(false); | |
699 | } | |
700 | ||
701 | template<typename T> | |
702 | typename | |
703 | disable_if<boost::mpi::is_mpi_datatype<T>, | |
704 | std::pair<mpi_process_group::process_id_type, std::size_t> >::type | |
705 | receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) | |
706 | { | |
707 | for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { | |
708 | if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), | |
709 | values, n)) | |
710 | return std::make_pair(source, n); | |
711 | } | |
712 | BOOST_ASSERT(false); | |
713 | } | |
714 | ||
715 | template<typename T> | |
716 | mpi_process_group::process_id_type | |
717 | receive(const mpi_process_group& pg, | |
718 | mpi_process_group::process_id_type source, int tag, T& value) | |
719 | { | |
720 | if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), | |
721 | value, boost::mpi::is_mpi_datatype<T>())) | |
722 | return source; | |
723 | else { | |
724 | fprintf(stderr, | |
725 | "Process %d failed to receive a message from process %d with tag %d in block %d.\n", | |
726 | process_id(pg), source, tag, pg.my_block_number()); | |
727 | ||
728 | BOOST_ASSERT(false); | |
729 | abort(); | |
730 | } | |
731 | } | |
732 | ||
733 | template<typename T> | |
734 | typename | |
735 | enable_if<boost::mpi::is_mpi_datatype<T>, | |
736 | std::pair<mpi_process_group::process_id_type, std::size_t> >::type | |
737 | receive(const mpi_process_group& pg, int source, int tag, T values[], | |
738 | std::size_t n) | |
739 | { | |
740 | if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), | |
741 | boost::serialization::make_array(values,n), | |
742 | boost::mpl::true_())) | |
743 | return std::make_pair(source,n); | |
744 | else { | |
745 | fprintf(stderr, | |
746 | "Process %d failed to receive a message from process %d with tag %d in block %d.\n", | |
747 | process_id(pg), source, tag, pg.my_block_number()); | |
748 | ||
749 | BOOST_ASSERT(false); | |
750 | abort(); | |
751 | } | |
752 | } | |
753 | ||
754 | template<typename T> | |
755 | typename | |
756 | disable_if<boost::mpi::is_mpi_datatype<T>, | |
757 | std::pair<mpi_process_group::process_id_type, std::size_t> >::type | |
758 | receive(const mpi_process_group& pg, int source, int tag, T values[], | |
759 | std::size_t n) | |
760 | { | |
761 | pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), | |
762 | values, n); | |
763 | ||
764 | return std::make_pair(source, n); | |
765 | } | |
766 | ||
767 | template<typename T, typename BinaryOperation> | |
768 | T* | |
769 | all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, | |
770 | BinaryOperation bin_op) | |
771 | { | |
772 | synchronize(pg); | |
773 | ||
774 | bool inplace = first == out; | |
775 | ||
776 | if (inplace) out = new T [last-first]; | |
777 | ||
778 | boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg), | |
779 | boost::mpi::comm_attach), | |
780 | first, last-first, out, bin_op); | |
781 | ||
782 | if (inplace) { | |
783 | std::copy(out, out + (last-first), first); | |
784 | delete [] out; | |
785 | return last; | |
786 | } | |
787 | ||
788 | return out; | |
789 | } | |
790 | ||
791 | template<typename T> | |
792 | void | |
793 | broadcast(const mpi_process_group& pg, T& val, | |
794 | mpi_process_group::process_id_type root) | |
795 | { | |
796 | // broadcast the seed | |
797 | boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach); | |
798 | boost::mpi::broadcast(comm,val,root); | |
799 | } | |
800 | ||
801 | ||
802 | template<typename T, typename BinaryOperation> | |
803 | T* | |
804 | scan(const mpi_process_group& pg, T* first, T* last, T* out, | |
805 | BinaryOperation bin_op) | |
806 | { | |
807 | synchronize(pg); | |
808 | ||
809 | bool inplace = first == out; | |
810 | ||
811 | if (inplace) out = new T [last-first]; | |
812 | ||
813 | boost::mpi::scan(communicator(pg), first, last-first, out, bin_op); | |
814 | ||
815 | if (inplace) { | |
816 | std::copy(out, out + (last-first), first); | |
817 | delete [] out; | |
818 | return last; | |
819 | } | |
820 | ||
821 | return out; | |
822 | } | |
823 | ||
824 | ||
825 | template<typename InputIterator, typename T> | |
826 | void | |
827 | all_gather(const mpi_process_group& pg, InputIterator first, | |
828 | InputIterator last, std::vector<T>& out) | |
829 | { | |
830 | synchronize(pg); | |
831 | ||
832 | // Stick a copy of the local values into a vector, so we can broadcast it | |
833 | std::vector<T> local_values(first, last); | |
834 | ||
835 | // Collect the number of vertices stored in each process | |
836 | int size = local_values.size(); | |
837 | std::vector<int> sizes(num_processes(pg)); | |
838 | int result = MPI_Allgather(&size, 1, MPI_INT, | |
839 | &sizes[0], 1, MPI_INT, | |
840 | communicator(pg)); | |
841 | BOOST_ASSERT(result == MPI_SUCCESS); | |
842 | ||
843 | // Adjust sizes based on the number of bytes | |
844 | std::transform(sizes.begin(), sizes.end(), sizes.begin(), | |
845 | std::bind2nd(std::multiplies<int>(), sizeof(T))); | |
846 | ||
847 | // Compute displacements | |
848 | std::vector<int> displacements; | |
849 | displacements.reserve(sizes.size() + 1); | |
850 | displacements.push_back(0); | |
851 | std::partial_sum(sizes.begin(), sizes.end(), | |
852 | std::back_inserter(displacements)); | |
853 | ||
854 | // Gather all of the values | |
855 | out.resize(displacements.back() / sizeof(T)); | |
856 | if (!out.empty()) { | |
857 | result = MPI_Allgatherv(local_values.empty()? (void*)&local_values | |
858 | /* local results */: (void*)&local_values[0], | |
859 | local_values.size() * sizeof(T), | |
860 | MPI_BYTE, | |
861 | &out[0], &sizes[0], &displacements[0], MPI_BYTE, | |
862 | communicator(pg)); | |
863 | } | |
864 | BOOST_ASSERT(result == MPI_SUCCESS); | |
865 | } | |
866 | ||
867 | template<typename InputIterator> | |
868 | mpi_process_group | |
869 | process_subgroup(const mpi_process_group& pg, | |
870 | InputIterator first, InputIterator last) | |
871 | { | |
872 | /* | |
873 | boost::mpi::group current_group = communicator(pg).group(); | |
874 | boost::mpi::group new_group = current_group.include(first,last); | |
875 | boost::mpi::communicator new_comm(communicator(pg),new_group); | |
876 | return mpi_process_group(new_comm); | |
877 | */ | |
878 | std::vector<int> ranks(first, last); | |
879 | ||
880 | MPI_Group current_group; | |
881 | int result = MPI_Comm_group(communicator(pg), ¤t_group); | |
882 | BOOST_ASSERT(result == MPI_SUCCESS); | |
883 | ||
884 | MPI_Group new_group; | |
885 | result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group); | |
886 | BOOST_ASSERT(result == MPI_SUCCESS); | |
887 | ||
888 | MPI_Comm new_comm; | |
889 | result = MPI_Comm_create(communicator(pg), new_group, &new_comm); | |
890 | BOOST_ASSERT(result == MPI_SUCCESS); | |
891 | ||
892 | result = MPI_Group_free(&new_group); | |
893 | BOOST_ASSERT(result == MPI_SUCCESS); | |
894 | result = MPI_Group_free(¤t_group); | |
895 | BOOST_ASSERT(result == MPI_SUCCESS); | |
896 | ||
897 | if (new_comm != MPI_COMM_NULL) { | |
898 | mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach)); | |
899 | result = MPI_Comm_free(&new_comm); | |
900 | BOOST_ASSERT(result == 0); | |
901 | return result_pg; | |
902 | } else { | |
903 | return mpi_process_group(mpi_process_group::create_empty()); | |
904 | } | |
905 | ||
906 | } | |
907 | ||
908 | ||
909 | template<typename Receiver> | |
910 | Receiver* mpi_process_group::get_receiver() | |
911 | { | |
912 | return impl_->blocks[my_block_number()]->on_receive | |
913 | .template target<Receiver>(); | |
914 | } | |
915 | ||
916 | template<typename T> | |
917 | typename enable_if<boost::mpi::is_mpi_datatype<T> >::type | |
918 | receive_oob(const mpi_process_group& pg, | |
919 | mpi_process_group::process_id_type source, int tag, T& value, int block) | |
920 | { | |
921 | using boost::mpi::get_mpi_datatype; | |
922 | ||
923 | // Determine the actual message we expect to receive, and which | |
924 | // communicator it will come by. | |
925 | std::pair<boost::mpi::communicator, int> actual | |
926 | = pg.actual_communicator_and_tag(tag, block); | |
927 | ||
928 | // Post a non-blocking receive that waits until we complete this request. | |
929 | MPI_Request request; | |
930 | MPI_Irecv(&value, 1, get_mpi_datatype<T>(value), | |
931 | source, actual.second, actual.first, &request); | |
932 | ||
933 | int done = 0; | |
934 | do { | |
935 | MPI_Test(&request, &done, MPI_STATUS_IGNORE); | |
936 | if (!done) | |
937 | pg.poll(/*wait=*/false, block); | |
938 | } while (!done); | |
939 | } | |
940 | ||
941 | template<typename T> | |
942 | typename disable_if<boost::mpi::is_mpi_datatype<T> >::type | |
943 | receive_oob(const mpi_process_group& pg, | |
944 | mpi_process_group::process_id_type source, int tag, T& value, int block) | |
945 | { | |
946 | // Determine the actual message we expect to receive, and which | |
947 | // communicator it will come by. | |
948 | std::pair<boost::mpi::communicator, int> actual | |
949 | = pg.actual_communicator_and_tag(tag, block); | |
950 | ||
951 | boost::optional<boost::mpi::status> status; | |
952 | do { | |
953 | status = actual.first.iprobe(source, actual.second); | |
954 | if (!status) | |
955 | pg.poll(); | |
956 | } while (!status); | |
957 | ||
958 | //actual.first.recv(status->source(), status->tag(),value); | |
959 | ||
960 | // Allocate the receive buffer | |
961 | boost::mpi::packed_iarchive in(actual.first); | |
962 | ||
963 | #if BOOST_VERSION >= 103600 | |
964 | in.resize(status->count<boost::mpi::packed>().get()); | |
965 | #else | |
966 | int size; | |
967 | MPI_Status mpi_status = *status; | |
968 | MPI_Get_count(&mpi_status, MPI_PACKED, &size); | |
969 | in.resize(size); | |
970 | #endif | |
971 | ||
972 | // Receive the message data | |
973 | MPI_Recv(in.address(), in.size(), MPI_PACKED, | |
974 | status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE); | |
975 | ||
976 | // Unpack the message data | |
977 | in >> value; | |
978 | } | |
979 | ||
980 | ||
981 | template<typename SendT, typename ReplyT> | |
982 | typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type | |
983 | send_oob_with_reply(const mpi_process_group& pg, | |
984 | mpi_process_group::process_id_type dest, | |
985 | int tag, const SendT& send_value, ReplyT& reply_value, | |
986 | int block) | |
987 | { | |
988 | detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); | |
989 | send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair( | |
990 | (int)reply_tag, send_value), block); | |
991 | receive_oob(pg, dest, reply_tag, reply_value); | |
992 | } | |
993 | ||
994 | template<typename SendT, typename ReplyT> | |
995 | typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type | |
996 | send_oob_with_reply(const mpi_process_group& pg, | |
997 | mpi_process_group::process_id_type dest, | |
998 | int tag, const SendT& send_value, ReplyT& reply_value, | |
999 | int block) | |
1000 | { | |
1001 | detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); | |
1002 | send_oob(pg, dest, tag, | |
1003 | boost::parallel::detail::make_untracked_pair((int)reply_tag, | |
1004 | send_value), block); | |
1005 | receive_oob(pg, dest, reply_tag, reply_value); | |
1006 | } | |
1007 | ||
1008 | } } } // end namespace boost::graph::distributed |