]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/graph/distributed/detail/mpi_process_group.ipp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / boost / graph / distributed / detail / mpi_process_group.ipp
1 // -*- C++ -*-
2
3 // Copyright (C) 2004-2008 The Trustees of Indiana University.
4 // Copyright (C) 2007 Douglas Gregor <doug.gregor@gmail.com>
5 // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
6
7 // Use, modification and distribution is subject to the Boost Software
8 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
9 // http://www.boost.org/LICENSE_1_0.txt)
10
11 // Authors: Douglas Gregor
12 // Andrew Lumsdaine
13 // Matthias Troyer
14
15 //#define PBGL_PROCESS_GROUP_DEBUG
16
17 #ifndef BOOST_GRAPH_USE_MPI
18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
19 #endif
20
21 #include <boost/assert.hpp>
22 #include <algorithm>
23 #include <boost/graph/parallel/detail/untracked_pair.hpp>
24 #include <numeric>
25 #include <iterator>
26 #include <functional>
27 #include <vector>
28 #include <queue>
29 #include <stack>
30 #include <list>
31 #include <boost/graph/distributed/detail/tag_allocator.hpp>
32 #include <stdio.h>
33
34 // #define PBGL_PROCESS_GROUP_DEBUG
35
36 #ifdef PBGL_PROCESS_GROUP_DEBUG
37 # include <iostream>
38 #endif
39
40 namespace boost { namespace graph { namespace distributed {
41
42 struct mpi_process_group::impl
43 {
44
45 typedef mpi_process_group::message_header message_header;
46 typedef mpi_process_group::outgoing_messages outgoing_messages;
47
48 /**
49 * Stores the incoming messages from a particular processor.
50 *
51 * @todo Evaluate whether we should use a deque instance, which
52 * would reduce could reduce the cost of "receiving" messages and
53 allow us to deallocate memory earlier, but increases the time
54 spent in the synchronization step.
55 */
56 struct incoming_messages {
57 incoming_messages();
58 ~incoming_messages() {}
59
60 std::vector<message_header> headers;
61 buffer_type buffer;
62 std::vector<std::vector<message_header>::iterator> next_header;
63 };
64
65 struct batch_request {
66 MPI_Request request;
67 buffer_type buffer;
68 };
69
70 // send once we have a certain number of messages or bytes in the buffer
71 // these numbers need to be tuned, we keep them small at first for testing
72 std::size_t batch_header_number;
73 std::size_t batch_buffer_size;
74 std::size_t batch_message_size;
75
76 /**
77 * The actual MPI communicator used to transmit data.
78 */
79 boost::mpi::communicator comm;
80
81 /**
82 * The MPI communicator used to transmit out-of-band replies.
83 */
84 boost::mpi::communicator oob_reply_comm;
85
86 /// Outgoing message information, indexed by destination processor.
87 std::vector<outgoing_messages> outgoing;
88
89 /// Incoming message information, indexed by source processor.
90 std::vector<incoming_messages> incoming;
91
92 /// The numbers of processors that have entered a synchronization stage
93 std::vector<int> processors_synchronizing_stage;
94
95 /// The synchronization stage of a processor
96 std::vector<int> synchronizing_stage;
97
98 /// Number of processors still sending messages
99 std::vector<int> synchronizing_unfinished;
100
101 /// Number of batches sent since last synchronization stage
102 std::vector<int> number_sent_batches;
103
104 /// Number of batches received minus number of expected batches
105 std::vector<int> number_received_batches;
106
107
108 /// The context of the currently-executing trigger, or @c trc_none
109 /// if no trigger is executing.
110 trigger_receive_context trigger_context;
111
112 /// Non-zero indicates that we're processing batches
113 /// Increment this when processing patches,
114 /// decrement it when you're done.
115 int processing_batches;
116
117 /**
118 * Contains all of the active blocks corresponding to attached
119 * distributed data structures.
120 */
121 blocks_type blocks;
122
123 /// Whether we are currently synchronizing
124 bool synchronizing;
125
126 /// The MPI requests for posted sends of oob messages
127 std::vector<MPI_Request> requests;
128
129 /// The MPI buffers for posted irecvs of oob messages
130 std::map<int,buffer_type> buffers;
131
132 /// Queue for message batches received while already processing messages
133 std::queue<std::pair<int,outgoing_messages> > new_batches;
134 /// Maximum encountered size of the new_batches queue
135 std::size_t max_received;
136
137 /// The MPI requests and buffers for batchess being sent
138 std::list<batch_request> sent_batches;
139 /// Maximum encountered size of the sent_batches list
140 std::size_t max_sent;
141
142 /// Pre-allocated requests in a pool
143 std::vector<batch_request> batch_pool;
144 /// A stack controlling which batches are available
145 std::stack<std::size_t> free_batches;
146
147 void free_sent_batches();
148
149 // Tag allocator
150 detail::tag_allocator allocated_tags;
151
152 impl(std::size_t num_headers, std::size_t buffers_size,
153 communicator_type parent_comm);
154 ~impl();
155
156 private:
157 void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
158 };
159
160 inline trigger_receive_context mpi_process_group::trigger_context() const
161 {
162 return impl_->trigger_context;
163 }
164
165 template<typename T>
166 void
167 mpi_process_group::send_impl(int dest, int tag, const T& value,
168 mpl::true_ /*is_mpi_datatype*/) const
169 {
170 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
171
172 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
173
174 // Start constructing the message header
175 impl::message_header header;
176 header.source = process_id(*this);
177 header.tag = tag;
178 header.offset = outgoing.buffer.size();
179
180 boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
181 oa << value;
182
183 #ifdef PBGL_PROCESS_GROUP_DEBUG
184 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
185 << tag << ", bytes = " << packed_size << std::endl;
186 #endif
187
188 // Store the header
189 header.bytes = outgoing.buffer.size() - header.offset;
190 outgoing.headers.push_back(header);
191
192 maybe_send_batch(dest);
193 }
194
195
196 template<typename T>
197 void
198 mpi_process_group::send_impl(int dest, int tag, const T& value,
199 mpl::false_ /*is_mpi_datatype*/) const
200 {
201 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
202
203 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
204
205 // Start constructing the message header
206 impl::message_header header;
207 header.source = process_id(*this);
208 header.tag = tag;
209 header.offset = outgoing.buffer.size();
210
211 // Serialize into the buffer
212 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
213 out << value;
214
215 // Store the header
216 header.bytes = outgoing.buffer.size() - header.offset;
217 outgoing.headers.push_back(header);
218 maybe_send_batch(dest);
219
220 #ifdef PBGL_PROCESS_GROUP_DEBUG
221 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
222 << tag << ", bytes = " << header.bytes << std::endl;
223 #endif
224 }
225
226 template<typename T>
227 inline void
228 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
229 int tag, const T& value)
230 {
231 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
232 boost::mpi::is_mpi_datatype<T>());
233 }
234
235 template<typename T>
236 typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
237 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
238 int tag, const T values[], std::size_t n)
239 {
240 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
241 boost::serialization::make_array(values,n),
242 boost::mpl::true_());
243 }
244
245 template<typename T>
246 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
247 mpi_process_group::
248 array_send_impl(int dest, int tag, const T values[], std::size_t n) const
249 {
250 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
251
252 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
253
254 // Start constructing the message header
255 impl::message_header header;
256 header.source = process_id(*this);
257 header.tag = tag;
258 header.offset = outgoing.buffer.size();
259
260 // Serialize into the buffer
261 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
262 out << n;
263
264 for (std::size_t i = 0; i < n; ++i)
265 out << values[i];
266
267 // Store the header
268 header.bytes = outgoing.buffer.size() - header.offset;
269 outgoing.headers.push_back(header);
270 maybe_send_batch(dest);
271
272 #ifdef PBGL_PROCESS_GROUP_DEBUG
273 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
274 << tag << ", bytes = " << header.bytes << std::endl;
275 #endif
276 }
277
278 template<typename T>
279 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
280 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
281 int tag, const T values[], std::size_t n)
282 {
283 pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
284 values, n);
285 }
286
287 template<typename InputIterator>
288 void
289 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
290 int tag, InputIterator first, InputIterator last)
291 {
292 typedef typename std::iterator_traits<InputIterator>::value_type value_type;
293 std::vector<value_type> values(first, last);
294 if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
295 else send(pg, dest, tag, &values[0], values.size());
296 }
297
298 template<typename T>
299 bool
300 mpi_process_group::receive_impl(int source, int tag, T& value,
301 mpl::true_ /*is_mpi_datatype*/) const
302 {
303 #ifdef PBGL_PROCESS_GROUP_DEBUG
304 std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
305 << tag << std::endl;
306 #endif
307
308 impl::incoming_messages& incoming = impl_->incoming[source];
309
310 // Find the next header with the right tag
311 std::vector<impl::message_header>::iterator header =
312 incoming.next_header[my_block_number()];
313 while (header != incoming.headers.end() && header->tag != tag) ++header;
314
315 // If no header is found, notify the caller
316 if (header == incoming.headers.end()) return false;
317
318 // Unpack the data
319 if (header->bytes > 0) {
320 boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
321 archive::no_header, header->offset);
322 ia >> value;
323 }
324
325 // Mark this message as received
326 header->tag = -1;
327
328 // Move the "next header" indicator to the next unreceived message
329 while (incoming.next_header[my_block_number()] != incoming.headers.end()
330 && incoming.next_header[my_block_number()]->tag == -1)
331 ++incoming.next_header[my_block_number()];
332
333 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
334 bool finished = true;
335 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
336 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
337 }
338
339 if (finished) {
340 std::vector<impl::message_header> no_headers;
341 incoming.headers.swap(no_headers);
342 buffer_type empty_buffer;
343 incoming.buffer.swap(empty_buffer);
344 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
345 incoming.next_header[i] = incoming.headers.end();
346 }
347 }
348
349 return true;
350 }
351
352 template<typename T>
353 bool
354 mpi_process_group::receive_impl(int source, int tag, T& value,
355 mpl::false_ /*is_mpi_datatype*/) const
356 {
357 impl::incoming_messages& incoming = impl_->incoming[source];
358
359 // Find the next header with the right tag
360 std::vector<impl::message_header>::iterator header =
361 incoming.next_header[my_block_number()];
362 while (header != incoming.headers.end() && header->tag != tag) ++header;
363
364 // If no header is found, notify the caller
365 if (header == incoming.headers.end()) return false;
366
367 // Deserialize the data
368 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
369 archive::no_header, header->offset);
370 in >> value;
371
372 // Mark this message as received
373 header->tag = -1;
374
375 // Move the "next header" indicator to the next unreceived message
376 while (incoming.next_header[my_block_number()] != incoming.headers.end()
377 && incoming.next_header[my_block_number()]->tag == -1)
378 ++incoming.next_header[my_block_number()];
379
380 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
381 bool finished = true;
382 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
383 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
384 }
385
386 if (finished) {
387 std::vector<impl::message_header> no_headers;
388 incoming.headers.swap(no_headers);
389 buffer_type empty_buffer;
390 incoming.buffer.swap(empty_buffer);
391 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
392 incoming.next_header[i] = incoming.headers.end();
393 }
394 }
395
396 return true;
397 }
398
399 template<typename T>
400 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
401 mpi_process_group::
402 array_receive_impl(int source, int tag, T* values, std::size_t& n) const
403 {
404 impl::incoming_messages& incoming = impl_->incoming[source];
405
406 // Find the next header with the right tag
407 std::vector<impl::message_header>::iterator header =
408 incoming.next_header[my_block_number()];
409 while (header != incoming.headers.end() && header->tag != tag) ++header;
410
411 // If no header is found, notify the caller
412 if (header == incoming.headers.end()) return false;
413
414 // Deserialize the data
415 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
416 archive::no_header, header->offset);
417 std::size_t num_sent;
418 in >> num_sent;
419 if (num_sent > n)
420 std::cerr << "ERROR: Have " << num_sent << " items but only space for "
421 << n << " items\n";
422
423 for (std::size_t i = 0; i < num_sent; ++i)
424 in >> values[i];
425 n = num_sent;
426
427 // Mark this message as received
428 header->tag = -1;
429
430 // Move the "next header" indicator to the next unreceived message
431 while (incoming.next_header[my_block_number()] != incoming.headers.end()
432 && incoming.next_header[my_block_number()]->tag == -1)
433 ++incoming.next_header[my_block_number()];
434
435 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
436 bool finished = true;
437 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
438 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
439 }
440
441 if (finished) {
442 std::vector<impl::message_header> no_headers;
443 incoming.headers.swap(no_headers);
444 buffer_type empty_buffer;
445 incoming.buffer.swap(empty_buffer);
446 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
447 incoming.next_header[i] = incoming.headers.end();
448 }
449 }
450
451 return true;
452 }
453
454 // Construct triggers
455 template<typename Type, typename Handler>
456 void mpi_process_group::trigger(int tag, const Handler& handler)
457 {
458 BOOST_ASSERT(block_num);
459 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
460 new trigger_launcher<Type, Handler>(*this, tag, handler)));
461 }
462
463 template<typename Type, typename Handler>
464 void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
465 {
466 BOOST_ASSERT(block_num);
467 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
468 new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
469 }
470
471 template<typename Type, typename Handler>
472 void mpi_process_group::global_trigger(int tag, const Handler& handler,
473 std::size_t sz)
474 {
475 if (sz==0) // normal trigger
476 install_trigger(tag,0,shared_ptr<trigger_base>(
477 new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
478 else // trigger with irecv
479 install_trigger(tag,0,shared_ptr<trigger_base>(
480 new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
481
482 }
483
484 namespace detail {
485
486 template<typename Type>
487 void do_oob_receive(mpi_process_group const& self,
488 int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/)
489 {
490 using boost::mpi::get_mpi_datatype;
491
492 //self.impl_->comm.recv(source,tag,data);
493 MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
494 MPI_STATUS_IGNORE);
495 }
496
497 template<typename Type>
498 void do_oob_receive(mpi_process_group const& self,
499 int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/)
500 {
501 // self.impl_->comm.recv(source,tag,data);
502 // Receive the size of the data packet
503 boost::mpi::status status;
504 status = self.impl_->comm.probe(source, tag);
505
506 #if BOOST_VERSION >= 103600
507 int size = status.count<boost::mpi::packed>().get();
508 #else
509 int size;
510 MPI_Status& mpi_status = status;
511 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
512 #endif
513
514 // Receive the data packed itself
515 boost::mpi::packed_iarchive in(self.impl_->comm);
516 in.resize(size);
517 MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
518 MPI_STATUS_IGNORE);
519
520 // Deserialize the data
521 in >> data;
522 }
523
524 template<typename Type>
525 void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
526 {
527 do_oob_receive(self, source, tag, data,
528 boost::mpi::is_mpi_datatype<Type>());
529 }
530
531
532 } // namespace detail
533
534
535 template<typename Type, typename Handler>
536 void
537 mpi_process_group::trigger_launcher<Type, Handler>::
538 receive(mpi_process_group const&, int source, int tag,
539 trigger_receive_context context, int block) const
540 {
541 #ifdef PBGL_PROCESS_GROUP_DEBUG
542 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
543 << " receive from source " << source << " and tag " << tag
544 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
545 #endif
546
547 Type data;
548
549 if (context == trc_out_of_band) {
550 // Receive the message directly off the wire
551 int realtag = self.encode_tag(
552 block == -1 ? self.my_block_number() : block, tag);
553 detail::do_oob_receive(self,source,realtag,data);
554 }
555 else
556 // Receive the message out of the local buffer
557 boost::graph::distributed::receive(self, source, tag, data);
558
559 // Pass the message off to the handler
560 handler(source, tag, data, context);
561 }
562
563 template<typename Type, typename Handler>
564 void
565 mpi_process_group::reply_trigger_launcher<Type, Handler>::
566 receive(mpi_process_group const&, int source, int tag,
567 trigger_receive_context context, int block) const
568 {
569 #ifdef PBGL_PROCESS_GROUP_DEBUG
570 std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
571 << " receive from source " << source << " and tag " << tag
572 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
573 #endif
574 BOOST_ASSERT(context == trc_out_of_band);
575
576 boost::parallel::detail::untracked_pair<int, Type> data;
577
578 // Receive the message directly off the wire
579 int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block,
580 tag);
581 detail::do_oob_receive(self, source, realtag, data);
582
583 // Pass the message off to the handler and send the result back to
584 // the source.
585 send_oob(self, source, data.first,
586 handler(source, tag, data.second, context), -2);
587 }
588
589 template<typename Type, typename Handler>
590 void
591 mpi_process_group::global_trigger_launcher<Type, Handler>::
592 receive(mpi_process_group const& self, int source, int tag,
593 trigger_receive_context context, int block) const
594 {
595 #ifdef PBGL_PROCESS_GROUP_DEBUG
596 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
597 << " receive from source " << source << " and tag " << tag
598 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
599 #endif
600
601 Type data;
602
603 if (context == trc_out_of_band) {
604 // Receive the message directly off the wire
605 int realtag = self.encode_tag(
606 block == -1 ? self.my_block_number() : block, tag);
607 detail::do_oob_receive(self,source,realtag,data);
608 }
609 else
610 // Receive the message out of the local buffer
611 boost::graph::distributed::receive(self, source, tag, data);
612
613 // Pass the message off to the handler
614 handler(self, source, tag, data, context);
615 }
616
617
618 template<typename Type, typename Handler>
619 void
620 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
621 receive(mpi_process_group const& self, int source, int tag,
622 trigger_receive_context context, int block) const
623 {
624 #ifdef PBGL_PROCESS_GROUP_DEBUG
625 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
626 << " receive from source " << source << " and tag " << tag
627 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
628 #endif
629
630 Type data;
631
632 if (context == trc_out_of_band) {
633 return;
634 }
635 BOOST_ASSERT (context == trc_irecv_out_of_band);
636
637 // force posting of new MPI_Irecv, even though buffer is already allocated
638 boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
639 ia >> data;
640 // Start a new receive
641 prepare_receive(self,tag,true);
642 // Pass the message off to the handler
643 handler(self, source, tag, data, context);
644 }
645
646
647 template<typename Type, typename Handler>
648 void
649 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
650 prepare_receive(mpi_process_group const& self, int tag, bool force) const
651 {
652 #ifdef PBGL_PROCESS_GROUP_DEBUG
653 std::cerr << ("Posting Irecv for trigger")
654 << " receive with tag " << tag << std::endl;
655 #endif
656 if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
657 self.impl_->buffers[tag].resize(buffer_size);
658 force = true;
659 }
660 BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
661
662 //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
663 if (force) {
664 self.impl_->requests.push_back(MPI_Request());
665 MPI_Request* request = &self.impl_->requests.back();
666 MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
667 MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
668 }
669 }
670
671
672 template<typename T>
673 inline mpi_process_group::process_id_type
674 receive(const mpi_process_group& pg, int tag, T& value)
675 {
676 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
677 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
678 value, boost::mpi::is_mpi_datatype<T>()))
679 return source;
680 }
681 BOOST_ASSERT (false);
682 }
683
684 template<typename T>
685 typename
686 enable_if<boost::mpi::is_mpi_datatype<T>,
687 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
688 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
689 {
690 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
691 bool result =
692 pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
693 boost::serialization::make_array(values,n),
694 boost::mpl::true_());
695 if (result)
696 return std::make_pair(source, n);
697 }
698 BOOST_ASSERT(false);
699 }
700
701 template<typename T>
702 typename
703 disable_if<boost::mpi::is_mpi_datatype<T>,
704 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
705 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
706 {
707 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
708 if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
709 values, n))
710 return std::make_pair(source, n);
711 }
712 BOOST_ASSERT(false);
713 }
714
715 template<typename T>
716 mpi_process_group::process_id_type
717 receive(const mpi_process_group& pg,
718 mpi_process_group::process_id_type source, int tag, T& value)
719 {
720 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
721 value, boost::mpi::is_mpi_datatype<T>()))
722 return source;
723 else {
724 fprintf(stderr,
725 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
726 process_id(pg), source, tag, pg.my_block_number());
727
728 BOOST_ASSERT(false);
729 abort();
730 }
731 }
732
733 template<typename T>
734 typename
735 enable_if<boost::mpi::is_mpi_datatype<T>,
736 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
737 receive(const mpi_process_group& pg, int source, int tag, T values[],
738 std::size_t n)
739 {
740 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
741 boost::serialization::make_array(values,n),
742 boost::mpl::true_()))
743 return std::make_pair(source,n);
744 else {
745 fprintf(stderr,
746 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
747 process_id(pg), source, tag, pg.my_block_number());
748
749 BOOST_ASSERT(false);
750 abort();
751 }
752 }
753
754 template<typename T>
755 typename
756 disable_if<boost::mpi::is_mpi_datatype<T>,
757 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
758 receive(const mpi_process_group& pg, int source, int tag, T values[],
759 std::size_t n)
760 {
761 pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
762 values, n);
763
764 return std::make_pair(source, n);
765 }
766
767 template<typename T, typename BinaryOperation>
768 T*
769 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
770 BinaryOperation bin_op)
771 {
772 synchronize(pg);
773
774 bool inplace = first == out;
775
776 if (inplace) out = new T [last-first];
777
778 boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
779 boost::mpi::comm_attach),
780 first, last-first, out, bin_op);
781
782 if (inplace) {
783 std::copy(out, out + (last-first), first);
784 delete [] out;
785 return last;
786 }
787
788 return out;
789 }
790
791 template<typename T>
792 void
793 broadcast(const mpi_process_group& pg, T& val,
794 mpi_process_group::process_id_type root)
795 {
796 // broadcast the seed
797 boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
798 boost::mpi::broadcast(comm,val,root);
799 }
800
801
802 template<typename T, typename BinaryOperation>
803 T*
804 scan(const mpi_process_group& pg, T* first, T* last, T* out,
805 BinaryOperation bin_op)
806 {
807 synchronize(pg);
808
809 bool inplace = first == out;
810
811 if (inplace) out = new T [last-first];
812
813 boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
814
815 if (inplace) {
816 std::copy(out, out + (last-first), first);
817 delete [] out;
818 return last;
819 }
820
821 return out;
822 }
823
824
825 template<typename InputIterator, typename T>
826 void
827 all_gather(const mpi_process_group& pg, InputIterator first,
828 InputIterator last, std::vector<T>& out)
829 {
830 synchronize(pg);
831
832 // Stick a copy of the local values into a vector, so we can broadcast it
833 std::vector<T> local_values(first, last);
834
835 // Collect the number of vertices stored in each process
836 int size = local_values.size();
837 std::vector<int> sizes(num_processes(pg));
838 int result = MPI_Allgather(&size, 1, MPI_INT,
839 &sizes[0], 1, MPI_INT,
840 communicator(pg));
841 BOOST_ASSERT(result == MPI_SUCCESS);
842 (void)result;
843
844 // Adjust sizes based on the number of bytes
845 //
846 // std::transform(sizes.begin(), sizes.end(), sizes.begin(),
847 // std::bind2nd(std::multiplies<int>(), sizeof(T)));
848 //
849 // std::bind2nd has been removed from C++17
850
851 for( std::size_t i = 0, n = sizes.size(); i < n; ++i )
852 {
853 sizes[ i ] *= sizeof( T );
854 }
855
856 // Compute displacements
857 std::vector<int> displacements;
858 displacements.reserve(sizes.size() + 1);
859 displacements.push_back(0);
860 std::partial_sum(sizes.begin(), sizes.end(),
861 std::back_inserter(displacements));
862
863 // Gather all of the values
864 out.resize(displacements.back() / sizeof(T));
865 if (!out.empty()) {
866 result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
867 /* local results */: (void*)&local_values[0],
868 local_values.size() * sizeof(T),
869 MPI_BYTE,
870 &out[0], &sizes[0], &displacements[0], MPI_BYTE,
871 communicator(pg));
872 }
873 BOOST_ASSERT(result == MPI_SUCCESS);
874 }
875
876 template<typename InputIterator>
877 mpi_process_group
878 process_subgroup(const mpi_process_group& pg,
879 InputIterator first, InputIterator last)
880 {
881 /*
882 boost::mpi::group current_group = communicator(pg).group();
883 boost::mpi::group new_group = current_group.include(first,last);
884 boost::mpi::communicator new_comm(communicator(pg),new_group);
885 return mpi_process_group(new_comm);
886 */
887 std::vector<int> ranks(first, last);
888
889 MPI_Group current_group;
890 int result = MPI_Comm_group(communicator(pg), &current_group);
891 BOOST_ASSERT(result == MPI_SUCCESS);
892 (void)result;
893
894 MPI_Group new_group;
895 result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
896 BOOST_ASSERT(result == MPI_SUCCESS);
897
898 MPI_Comm new_comm;
899 result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
900 BOOST_ASSERT(result == MPI_SUCCESS);
901
902 result = MPI_Group_free(&new_group);
903 BOOST_ASSERT(result == MPI_SUCCESS);
904 result = MPI_Group_free(&current_group);
905 BOOST_ASSERT(result == MPI_SUCCESS);
906
907 if (new_comm != MPI_COMM_NULL) {
908 mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
909 result = MPI_Comm_free(&new_comm);
910 BOOST_ASSERT(result == 0);
911 return result_pg;
912 } else {
913 return mpi_process_group(mpi_process_group::create_empty());
914 }
915
916 }
917
918
919 template<typename Receiver>
920 Receiver* mpi_process_group::get_receiver()
921 {
922 return impl_->blocks[my_block_number()]->on_receive
923 .template target<Receiver>();
924 }
925
926 template<typename T>
927 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
928 receive_oob(const mpi_process_group& pg,
929 mpi_process_group::process_id_type source, int tag, T& value, int block)
930 {
931 using boost::mpi::get_mpi_datatype;
932
933 // Determine the actual message we expect to receive, and which
934 // communicator it will come by.
935 std::pair<boost::mpi::communicator, int> actual
936 = pg.actual_communicator_and_tag(tag, block);
937
938 // Post a non-blocking receive that waits until we complete this request.
939 MPI_Request request;
940 MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
941 source, actual.second, actual.first, &request);
942
943 int done = 0;
944 do {
945 MPI_Test(&request, &done, MPI_STATUS_IGNORE);
946 if (!done)
947 pg.poll(/*wait=*/false, block);
948 } while (!done);
949 }
950
951 template<typename T>
952 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
953 receive_oob(const mpi_process_group& pg,
954 mpi_process_group::process_id_type source, int tag, T& value, int block)
955 {
956 // Determine the actual message we expect to receive, and which
957 // communicator it will come by.
958 std::pair<boost::mpi::communicator, int> actual
959 = pg.actual_communicator_and_tag(tag, block);
960
961 boost::optional<boost::mpi::status> status;
962 do {
963 status = actual.first.iprobe(source, actual.second);
964 if (!status)
965 pg.poll();
966 } while (!status);
967
968 //actual.first.recv(status->source(), status->tag(),value);
969
970 // Allocate the receive buffer
971 boost::mpi::packed_iarchive in(actual.first);
972
973 #if BOOST_VERSION >= 103600
974 in.resize(status->count<boost::mpi::packed>().get());
975 #else
976 int size;
977 MPI_Status mpi_status = *status;
978 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
979 in.resize(size);
980 #endif
981
982 // Receive the message data
983 MPI_Recv(in.address(), in.size(), MPI_PACKED,
984 status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
985
986 // Unpack the message data
987 in >> value;
988 }
989
990
991 template<typename SendT, typename ReplyT>
992 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
993 send_oob_with_reply(const mpi_process_group& pg,
994 mpi_process_group::process_id_type dest,
995 int tag, const SendT& send_value, ReplyT& reply_value,
996 int block)
997 {
998 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
999 send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
1000 (int)reply_tag, send_value), block);
1001 receive_oob(pg, dest, reply_tag, reply_value);
1002 }
1003
1004 template<typename SendT, typename ReplyT>
1005 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
1006 send_oob_with_reply(const mpi_process_group& pg,
1007 mpi_process_group::process_id_type dest,
1008 int tag, const SendT& send_value, ReplyT& reply_value,
1009 int block)
1010 {
1011 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
1012 send_oob(pg, dest, tag,
1013 boost::parallel::detail::make_untracked_pair((int)reply_tag,
1014 send_value), block);
1015 receive_oob(pg, dest, reply_tag, reply_value);
1016 }
1017
1018 } } } // end namespace boost::graph::distributed