// -*- C++ -*- // Copyright (C) 2004-2008 The Trustees of Indiana University. // Copyright (C) 2007 Douglas Gregor // Copyright (C) 2007 Matthias Troyer // Use, modification and distribution is subject to the Boost Software // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // Authors: Douglas Gregor // Andrew Lumsdaine // Matthias Troyer //#define PBGL_PROCESS_GROUP_DEBUG #ifndef BOOST_GRAPH_USE_MPI #error "Parallel BGL files should not be included unless has been included" #endif #include #include #include #include #include #include #include #include #include #include #include #include // #define PBGL_PROCESS_GROUP_DEBUG #ifdef PBGL_PROCESS_GROUP_DEBUG # include #endif namespace boost { namespace graph { namespace distributed { struct mpi_process_group::impl { typedef mpi_process_group::message_header message_header; typedef mpi_process_group::outgoing_messages outgoing_messages; /** * Stores the incoming messages from a particular processor. * * @todo Evaluate whether we should use a deque instance, which * would reduce could reduce the cost of "receiving" messages and allow us to deallocate memory earlier, but increases the time spent in the synchronization step. */ struct incoming_messages { incoming_messages(); ~incoming_messages() {} std::vector headers; buffer_type buffer; std::vector::iterator> next_header; }; struct batch_request { MPI_Request request; buffer_type buffer; }; // send once we have a certain number of messages or bytes in the buffer // these numbers need to be tuned, we keep them small at first for testing std::size_t batch_header_number; std::size_t batch_buffer_size; std::size_t batch_message_size; /** * The actual MPI communicator used to transmit data. */ boost::mpi::communicator comm; /** * The MPI communicator used to transmit out-of-band replies. */ boost::mpi::communicator oob_reply_comm; /// Outgoing message information, indexed by destination processor. std::vector outgoing; /// Incoming message information, indexed by source processor. std::vector incoming; /// The numbers of processors that have entered a synchronization stage std::vector processors_synchronizing_stage; /// The synchronization stage of a processor std::vector synchronizing_stage; /// Number of processors still sending messages std::vector synchronizing_unfinished; /// Number of batches sent since last synchronization stage std::vector number_sent_batches; /// Number of batches received minus number of expected batches std::vector number_received_batches; /// The context of the currently-executing trigger, or @c trc_none /// if no trigger is executing. trigger_receive_context trigger_context; /// Non-zero indicates that we're processing batches /// Increment this when processing patches, /// decrement it when you're done. int processing_batches; /** * Contains all of the active blocks corresponding to attached * distributed data structures. */ blocks_type blocks; /// Whether we are currently synchronizing bool synchronizing; /// The MPI requests for posted sends of oob messages std::vector requests; /// The MPI buffers for posted irecvs of oob messages std::map buffers; /// Queue for message batches received while already processing messages std::queue > new_batches; /// Maximum encountered size of the new_batches queue std::size_t max_received; /// The MPI requests and buffers for batchess being sent std::list sent_batches; /// Maximum encountered size of the sent_batches list std::size_t max_sent; /// Pre-allocated requests in a pool std::vector batch_pool; /// A stack controlling which batches are available std::stack free_batches; void free_sent_batches(); // Tag allocator detail::tag_allocator allocated_tags; impl(std::size_t num_headers, std::size_t buffers_size, communicator_type parent_comm); ~impl(); private: void set_batch_size(std::size_t header_num, std::size_t buffer_sz); }; inline trigger_receive_context mpi_process_group::trigger_context() const { return impl_->trigger_context; } template void mpi_process_group::send_impl(int dest, int tag, const T& value, mpl::true_ /*is_mpi_datatype*/) const { BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); impl::outgoing_messages& outgoing = impl_->outgoing[dest]; // Start constructing the message header impl::message_header header; header.source = process_id(*this); header.tag = tag; header.offset = outgoing.buffer.size(); boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer); oa << value; #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " << tag << ", bytes = " << packed_size << std::endl; #endif // Store the header header.bytes = outgoing.buffer.size() - header.offset; outgoing.headers.push_back(header); maybe_send_batch(dest); } template void mpi_process_group::send_impl(int dest, int tag, const T& value, mpl::false_ /*is_mpi_datatype*/) const { BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); impl::outgoing_messages& outgoing = impl_->outgoing[dest]; // Start constructing the message header impl::message_header header; header.source = process_id(*this); header.tag = tag; header.offset = outgoing.buffer.size(); // Serialize into the buffer boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); out << value; // Store the header header.bytes = outgoing.buffer.size() - header.offset; outgoing.headers.push_back(header); maybe_send_batch(dest); #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " << tag << ", bytes = " << header.bytes << std::endl; #endif } template inline void send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, int tag, const T& value) { pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value, boost::mpi::is_mpi_datatype()); } template typename enable_if, void>::type send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, int tag, const T values[], std::size_t n) { pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), boost::serialization::make_array(values,n), boost::mpl::true_()); } template typename disable_if, void>::type mpi_process_group:: array_send_impl(int dest, int tag, const T values[], std::size_t n) const { BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last); impl::outgoing_messages& outgoing = impl_->outgoing[dest]; // Start constructing the message header impl::message_header header; header.source = process_id(*this); header.tag = tag; header.offset = outgoing.buffer.size(); // Serialize into the buffer boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer); out << n; for (std::size_t i = 0; i < n; ++i) out << values[i]; // Store the header header.bytes = outgoing.buffer.size() - header.offset; outgoing.headers.push_back(header); maybe_send_batch(dest); #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = " << tag << ", bytes = " << header.bytes << std::endl; #endif } template typename disable_if, void>::type send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, int tag, const T values[], std::size_t n) { pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), values, n); } template void send(const mpi_process_group& pg, mpi_process_group::process_id_type dest, int tag, InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type value_type; std::vector values(first, last); if (values.empty()) send(pg, dest, tag, static_cast(0), 0); else send(pg, dest, tag, &values[0], values.size()); } template bool mpi_process_group::receive_impl(int source, int tag, T& value, mpl::true_ /*is_mpi_datatype*/) const { #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = " << tag << std::endl; #endif impl::incoming_messages& incoming = impl_->incoming[source]; // Find the next header with the right tag std::vector::iterator header = incoming.next_header[my_block_number()]; while (header != incoming.headers.end() && header->tag != tag) ++header; // If no header is found, notify the caller if (header == incoming.headers.end()) return false; // Unpack the data if (header->bytes > 0) { boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer, archive::no_header, header->offset); ia >> value; } // Mark this message as received header->tag = -1; // Move the "next header" indicator to the next unreceived message while (incoming.next_header[my_block_number()] != incoming.headers.end() && incoming.next_header[my_block_number()]->tag == -1) ++incoming.next_header[my_block_number()]; if (incoming.next_header[my_block_number()] == incoming.headers.end()) { bool finished = true; for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { if (incoming.next_header[i] != incoming.headers.end()) finished = false; } if (finished) { std::vector no_headers; incoming.headers.swap(no_headers); buffer_type empty_buffer; incoming.buffer.swap(empty_buffer); for (std::size_t i = 0; i < incoming.next_header.size(); ++i) incoming.next_header[i] = incoming.headers.end(); } } return true; } template bool mpi_process_group::receive_impl(int source, int tag, T& value, mpl::false_ /*is_mpi_datatype*/) const { impl::incoming_messages& incoming = impl_->incoming[source]; // Find the next header with the right tag std::vector::iterator header = incoming.next_header[my_block_number()]; while (header != incoming.headers.end() && header->tag != tag) ++header; // If no header is found, notify the caller if (header == incoming.headers.end()) return false; // Deserialize the data boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, archive::no_header, header->offset); in >> value; // Mark this message as received header->tag = -1; // Move the "next header" indicator to the next unreceived message while (incoming.next_header[my_block_number()] != incoming.headers.end() && incoming.next_header[my_block_number()]->tag == -1) ++incoming.next_header[my_block_number()]; if (incoming.next_header[my_block_number()] == incoming.headers.end()) { bool finished = true; for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { if (incoming.next_header[i] != incoming.headers.end()) finished = false; } if (finished) { std::vector no_headers; incoming.headers.swap(no_headers); buffer_type empty_buffer; incoming.buffer.swap(empty_buffer); for (std::size_t i = 0; i < incoming.next_header.size(); ++i) incoming.next_header[i] = incoming.headers.end(); } } return true; } template typename disable_if, bool>::type mpi_process_group:: array_receive_impl(int source, int tag, T* values, std::size_t& n) const { impl::incoming_messages& incoming = impl_->incoming[source]; // Find the next header with the right tag std::vector::iterator header = incoming.next_header[my_block_number()]; while (header != incoming.headers.end() && header->tag != tag) ++header; // If no header is found, notify the caller if (header == incoming.headers.end()) return false; // Deserialize the data boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, archive::no_header, header->offset); std::size_t num_sent; in >> num_sent; if (num_sent > n) std::cerr << "ERROR: Have " << num_sent << " items but only space for " << n << " items\n"; for (std::size_t i = 0; i < num_sent; ++i) in >> values[i]; n = num_sent; // Mark this message as received header->tag = -1; // Move the "next header" indicator to the next unreceived message while (incoming.next_header[my_block_number()] != incoming.headers.end() && incoming.next_header[my_block_number()]->tag == -1) ++incoming.next_header[my_block_number()]; if (incoming.next_header[my_block_number()] == incoming.headers.end()) { bool finished = true; for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) { if (incoming.next_header[i] != incoming.headers.end()) finished = false; } if (finished) { std::vector no_headers; incoming.headers.swap(no_headers); buffer_type empty_buffer; incoming.buffer.swap(empty_buffer); for (std::size_t i = 0; i < incoming.next_header.size(); ++i) incoming.next_header[i] = incoming.headers.end(); } } return true; } // Construct triggers template void mpi_process_group::trigger(int tag, const Handler& handler) { BOOST_ASSERT(block_num); install_trigger(tag,my_block_number(),shared_ptr( new trigger_launcher(*this, tag, handler))); } template void mpi_process_group::trigger_with_reply(int tag, const Handler& handler) { BOOST_ASSERT(block_num); install_trigger(tag,my_block_number(),shared_ptr( new reply_trigger_launcher(*this, tag, handler))); } template void mpi_process_group::global_trigger(int tag, const Handler& handler, std::size_t sz) { if (sz==0) // normal trigger install_trigger(tag,0,shared_ptr( new global_trigger_launcher(*this, tag, handler))); else // trigger with irecv install_trigger(tag,0,shared_ptr( new global_irecv_trigger_launcher(*this, tag, handler,sz))); } namespace detail { template void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) { using boost::mpi::get_mpi_datatype; //self.impl_->comm.recv(source,tag,data); MPI_Recv(&data, 1, get_mpi_datatype(data), source, tag, self.impl_->comm, MPI_STATUS_IGNORE); } template void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) { // self.impl_->comm.recv(source,tag,data); // Receive the size of the data packet boost::mpi::status status; status = self.impl_->comm.probe(source, tag); #if BOOST_VERSION >= 103600 int size = status.count().get(); #else int size; MPI_Status& mpi_status = status; MPI_Get_count(&mpi_status, MPI_PACKED, &size); #endif // Receive the data packed itself boost::mpi::packed_iarchive in(self.impl_->comm); in.resize(size); MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm, MPI_STATUS_IGNORE); // Deserialize the data in >> data; } template void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) { do_oob_receive(self, source, tag, data, boost::mpi::is_mpi_datatype()); } } // namespace detail template void mpi_process_group::trigger_launcher:: receive(mpi_process_group const&, int source, int tag, trigger_receive_context context, int block) const { #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << (out_of_band? "OOB trigger" : "Trigger") << " receive from source " << source << " and tag " << tag << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; #endif Type data; if (context == trc_out_of_band) { // Receive the message directly off the wire int realtag = self.encode_tag( block == -1 ? self.my_block_number() : block, tag); detail::do_oob_receive(self,source,realtag,data); } else // Receive the message out of the local buffer boost::graph::distributed::receive(self, source, tag, data); // Pass the message off to the handler handler(source, tag, data, context); } template void mpi_process_group::reply_trigger_launcher:: receive(mpi_process_group const&, int source, int tag, trigger_receive_context context, int block) const { #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger") << " receive from source " << source << " and tag " << tag << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; #endif BOOST_ASSERT(context == trc_out_of_band); boost::parallel::detail::untracked_pair data; // Receive the message directly off the wire int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block, tag); detail::do_oob_receive(self, source, realtag, data); // Pass the message off to the handler and send the result back to // the source. send_oob(self, source, data.first, handler(source, tag, data.second, context), -2); } template void mpi_process_group::global_trigger_launcher:: receive(mpi_process_group const& self, int source, int tag, trigger_receive_context context, int block) const { #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << (out_of_band? "OOB trigger" : "Trigger") << " receive from source " << source << " and tag " << tag << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; #endif Type data; if (context == trc_out_of_band) { // Receive the message directly off the wire int realtag = self.encode_tag( block == -1 ? self.my_block_number() : block, tag); detail::do_oob_receive(self,source,realtag,data); } else // Receive the message out of the local buffer boost::graph::distributed::receive(self, source, tag, data); // Pass the message off to the handler handler(self, source, tag, data, context); } template void mpi_process_group::global_irecv_trigger_launcher:: receive(mpi_process_group const& self, int source, int tag, trigger_receive_context context, int block) const { #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << (out_of_band? "OOB trigger" : "Trigger") << " receive from source " << source << " and tag " << tag << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl; #endif Type data; if (context == trc_out_of_band) { return; } BOOST_ASSERT (context == trc_irecv_out_of_band); // force posting of new MPI_Irecv, even though buffer is already allocated boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]); ia >> data; // Start a new receive prepare_receive(self,tag,true); // Pass the message off to the handler handler(self, source, tag, data, context); } template void mpi_process_group::global_irecv_trigger_launcher:: prepare_receive(mpi_process_group const& self, int tag, bool force) const { #ifdef PBGL_PROCESS_GROUP_DEBUG std::cerr << ("Posting Irecv for trigger") << " receive with tag " << tag << std::endl; #endif if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) { self.impl_->buffers[tag].resize(buffer_size); force = true; } BOOST_ASSERT(static_cast(self.impl_->buffers[tag].size()) >= buffer_size); //BOOST_MPL_ASSERT(mpl::not_ >); if (force) { self.impl_->requests.push_back(MPI_Request()); MPI_Request* request = &self.impl_->requests.back(); MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size, MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request); } } template inline mpi_process_group::process_id_type receive(const mpi_process_group& pg, int tag, T& value) { for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), value, boost::mpi::is_mpi_datatype())) return source; } BOOST_ASSERT (false); } template typename enable_if, std::pair >::type receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) { for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { bool result = pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), boost::serialization::make_array(values,n), boost::mpl::true_()); if (result) return std::make_pair(source, n); } BOOST_ASSERT(false); } template typename disable_if, std::pair >::type receive(const mpi_process_group& pg, int tag, T values[], std::size_t n) { for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) { if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), values, n)) return std::make_pair(source, n); } BOOST_ASSERT(false); } template mpi_process_group::process_id_type receive(const mpi_process_group& pg, mpi_process_group::process_id_type source, int tag, T& value) { if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), value, boost::mpi::is_mpi_datatype())) return source; else { fprintf(stderr, "Process %d failed to receive a message from process %d with tag %d in block %d.\n", process_id(pg), source, tag, pg.my_block_number()); BOOST_ASSERT(false); abort(); } } template typename enable_if, std::pair >::type receive(const mpi_process_group& pg, int source, int tag, T values[], std::size_t n) { if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), boost::serialization::make_array(values,n), boost::mpl::true_())) return std::make_pair(source,n); else { fprintf(stderr, "Process %d failed to receive a message from process %d with tag %d in block %d.\n", process_id(pg), source, tag, pg.my_block_number()); BOOST_ASSERT(false); abort(); } } template typename disable_if, std::pair >::type receive(const mpi_process_group& pg, int source, int tag, T values[], std::size_t n) { pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag), values, n); return std::make_pair(source, n); } template T* all_reduce(const mpi_process_group& pg, T* first, T* last, T* out, BinaryOperation bin_op) { synchronize(pg); bool inplace = first == out; if (inplace) out = new T [last-first]; boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg), boost::mpi::comm_attach), first, last-first, out, bin_op); if (inplace) { std::copy(out, out + (last-first), first); delete [] out; return last; } return out; } template void broadcast(const mpi_process_group& pg, T& val, mpi_process_group::process_id_type root) { // broadcast the seed boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach); boost::mpi::broadcast(comm,val,root); } template T* scan(const mpi_process_group& pg, T* first, T* last, T* out, BinaryOperation bin_op) { synchronize(pg); bool inplace = first == out; if (inplace) out = new T [last-first]; boost::mpi::scan(communicator(pg), first, last-first, out, bin_op); if (inplace) { std::copy(out, out + (last-first), first); delete [] out; return last; } return out; } template void all_gather(const mpi_process_group& pg, InputIterator first, InputIterator last, std::vector& out) { synchronize(pg); // Stick a copy of the local values into a vector, so we can broadcast it std::vector local_values(first, last); // Collect the number of vertices stored in each process int size = local_values.size(); std::vector sizes(num_processes(pg)); int result = MPI_Allgather(&size, 1, MPI_INT, &sizes[0], 1, MPI_INT, communicator(pg)); BOOST_ASSERT(result == MPI_SUCCESS); // Adjust sizes based on the number of bytes std::transform(sizes.begin(), sizes.end(), sizes.begin(), std::bind2nd(std::multiplies(), sizeof(T))); // Compute displacements std::vector displacements; displacements.reserve(sizes.size() + 1); displacements.push_back(0); std::partial_sum(sizes.begin(), sizes.end(), std::back_inserter(displacements)); // Gather all of the values out.resize(displacements.back() / sizeof(T)); if (!out.empty()) { result = MPI_Allgatherv(local_values.empty()? (void*)&local_values /* local results */: (void*)&local_values[0], local_values.size() * sizeof(T), MPI_BYTE, &out[0], &sizes[0], &displacements[0], MPI_BYTE, communicator(pg)); } BOOST_ASSERT(result == MPI_SUCCESS); } template mpi_process_group process_subgroup(const mpi_process_group& pg, InputIterator first, InputIterator last) { /* boost::mpi::group current_group = communicator(pg).group(); boost::mpi::group new_group = current_group.include(first,last); boost::mpi::communicator new_comm(communicator(pg),new_group); return mpi_process_group(new_comm); */ std::vector ranks(first, last); MPI_Group current_group; int result = MPI_Comm_group(communicator(pg), ¤t_group); BOOST_ASSERT(result == MPI_SUCCESS); MPI_Group new_group; result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group); BOOST_ASSERT(result == MPI_SUCCESS); MPI_Comm new_comm; result = MPI_Comm_create(communicator(pg), new_group, &new_comm); BOOST_ASSERT(result == MPI_SUCCESS); result = MPI_Group_free(&new_group); BOOST_ASSERT(result == MPI_SUCCESS); result = MPI_Group_free(¤t_group); BOOST_ASSERT(result == MPI_SUCCESS); if (new_comm != MPI_COMM_NULL) { mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach)); result = MPI_Comm_free(&new_comm); BOOST_ASSERT(result == 0); return result_pg; } else { return mpi_process_group(mpi_process_group::create_empty()); } } template Receiver* mpi_process_group::get_receiver() { return impl_->blocks[my_block_number()]->on_receive .template target(); } template typename enable_if >::type receive_oob(const mpi_process_group& pg, mpi_process_group::process_id_type source, int tag, T& value, int block) { using boost::mpi::get_mpi_datatype; // Determine the actual message we expect to receive, and which // communicator it will come by. std::pair actual = pg.actual_communicator_and_tag(tag, block); // Post a non-blocking receive that waits until we complete this request. MPI_Request request; MPI_Irecv(&value, 1, get_mpi_datatype(value), source, actual.second, actual.first, &request); int done = 0; do { MPI_Test(&request, &done, MPI_STATUS_IGNORE); if (!done) pg.poll(/*wait=*/false, block); } while (!done); } template typename disable_if >::type receive_oob(const mpi_process_group& pg, mpi_process_group::process_id_type source, int tag, T& value, int block) { // Determine the actual message we expect to receive, and which // communicator it will come by. std::pair actual = pg.actual_communicator_and_tag(tag, block); boost::optional status; do { status = actual.first.iprobe(source, actual.second); if (!status) pg.poll(); } while (!status); //actual.first.recv(status->source(), status->tag(),value); // Allocate the receive buffer boost::mpi::packed_iarchive in(actual.first); #if BOOST_VERSION >= 103600 in.resize(status->count().get()); #else int size; MPI_Status mpi_status = *status; MPI_Get_count(&mpi_status, MPI_PACKED, &size); in.resize(size); #endif // Receive the message data MPI_Recv(in.address(), in.size(), MPI_PACKED, status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE); // Unpack the message data in >> value; } template typename enable_if >::type send_oob_with_reply(const mpi_process_group& pg, mpi_process_group::process_id_type dest, int tag, const SendT& send_value, ReplyT& reply_value, int block) { detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair( (int)reply_tag, send_value), block); receive_oob(pg, dest, reply_tag, reply_value); } template typename disable_if >::type send_oob_with_reply(const mpi_process_group& pg, mpi_process_group::process_id_type dest, int tag, const SendT& send_value, ReplyT& reply_value, int block) { detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag(); send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair((int)reply_tag, send_value), block); receive_oob(pg, dest, reply_tag, reply_value); } } } } // end namespace boost::graph::distributed