1 // Copyright (C) 2004-2006 The Trustees of Indiana University.
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 // Authors: Douglas Gregor
9 #include <boost/optional.hpp>
11 #include <boost/graph/parallel/algorithm.hpp>
12 #include <boost/graph/parallel/process_group.hpp>
15 #include <boost/graph/parallel/simple_trigger.hpp>
17 #ifndef BOOST_GRAPH_USE_MPI
18 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
21 namespace boost { namespace graph { namespace distributed {
23 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
24 BOOST_DISTRIBUTED_QUEUE_TYPE::
25 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
26 const Buffer& buffer, bool polling)
27 : process_group(process_group, attach_distributed_object()),
33 outgoing_buffers.reset(
34 new outgoing_buffers_t(num_processes(process_group)));
39 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
40 BOOST_DISTRIBUTED_QUEUE_TYPE::
41 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
42 const Buffer& buffer, const UnaryPredicate& pred,
44 : process_group(process_group, attach_distributed_object()),
51 outgoing_buffers.reset(
52 new outgoing_buffers_t(num_processes(process_group)));
57 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
58 BOOST_DISTRIBUTED_QUEUE_TYPE::
59 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
60 const UnaryPredicate& pred, bool polling)
61 : process_group(process_group, attach_distributed_object()),
67 outgoing_buffers.reset(
68 new outgoing_buffers_t(num_processes(process_group)));
73 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
75 BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
77 typename ProcessGroup::process_id_type dest = get(owner, x);
79 outgoing_buffers->at(dest).push_back(x);
80 else if (dest == process_id(process_group))
83 send(process_group, get(owner, x), msg_push, x);
86 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
88 BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
90 /* Processes will stay here until the buffer is nonempty or
91 synchronization with the other processes indicates that all local
92 buffers are empty (and no messages are in transit).
94 while (buffer.empty() && !do_synchronize()) ;
96 return buffer.empty();
99 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
100 typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
101 BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
104 return buffer.size();
107 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
108 void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
110 using boost::graph::parallel::simple_trigger;
112 simple_trigger(process_group, msg_push, this,
113 &distributed_queue::handle_push);
114 simple_trigger(process_group, msg_multipush, this,
115 &distributed_queue::handle_multipush);
118 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
120 BOOST_DISTRIBUTED_QUEUE_TYPE::
121 handle_push(int /*source*/, int /*tag*/, const value_type& value,
122 trigger_receive_context)
124 if (pred(value)) buffer.push(value);
127 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
129 BOOST_DISTRIBUTED_QUEUE_TYPE::
130 handle_multipush(int /*source*/, int /*tag*/,
131 const std::vector<value_type>& values,
132 trigger_receive_context)
134 for (std::size_t i = 0; i < values.size(); ++i)
135 if (pred(values[i])) buffer.push(values[i]);
138 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
140 BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
142 #ifdef PBGL_ACCOUNTING
143 ++num_synchronizations;
146 using boost::parallel::all_reduce;
149 typedef typename ProcessGroup::process_id_type process_id_type;
151 if (outgoing_buffers) {
152 // Transfer all of the push requests
153 process_id_type id = process_id(process_group);
154 process_id_type np = num_processes(process_group);
155 for (process_id_type dest = 0; dest < np; ++dest) {
156 outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
157 std::size_t size = outgoing.size();
160 send(process_group, dest, msg_multipush, outgoing);
162 for (std::size_t i = 0; i < size; ++i)
163 buffer.push(outgoing[i]);
169 synchronize(process_group);
171 unsigned local_size = buffer.size();
172 unsigned global_size =
173 all_reduce(process_group, local_size, std::plus<unsigned>());
174 return global_size == 0;
177 } } } // end namespace boost::graph::distributed