]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2004-2006 The Trustees of Indiana University. |
2 | ||
3 | // Use, modification and distribution is subject to the Boost Software | |
4 | // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | ||
7 | // Authors: Douglas Gregor | |
8 | // Andrew Lumsdaine | |
9 | #include <boost/optional.hpp> | |
10 | #include <cassert> | |
11 | #include <boost/graph/parallel/algorithm.hpp> | |
12 | #include <boost/graph/parallel/process_group.hpp> | |
13 | #include <functional> | |
14 | #include <algorithm> | |
15 | #include <boost/graph/parallel/simple_trigger.hpp> | |
16 | ||
17 | #ifndef BOOST_GRAPH_USE_MPI | |
18 | #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" | |
19 | #endif | |
20 | ||
21 | namespace boost { namespace graph { namespace distributed { | |
22 | ||
23 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
24 | BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
25 | distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
26 | const Buffer& buffer, bool polling) | |
27 | : process_group(process_group, attach_distributed_object()), | |
28 | owner(owner), | |
29 | buffer(buffer), | |
30 | polling(polling) | |
31 | { | |
32 | if (!polling) | |
33 | outgoing_buffers.reset( | |
34 | new outgoing_buffers_t(num_processes(process_group))); | |
35 | ||
36 | setup_triggers(); | |
37 | } | |
38 | ||
39 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
40 | BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
41 | distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
42 | const Buffer& buffer, const UnaryPredicate& pred, | |
43 | bool polling) | |
44 | : process_group(process_group, attach_distributed_object()), | |
45 | owner(owner), | |
46 | buffer(buffer), | |
47 | pred(pred), | |
48 | polling(polling) | |
49 | { | |
50 | if (!polling) | |
51 | outgoing_buffers.reset( | |
52 | new outgoing_buffers_t(num_processes(process_group))); | |
53 | ||
54 | setup_triggers(); | |
55 | } | |
56 | ||
57 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
58 | BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
59 | distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
60 | const UnaryPredicate& pred, bool polling) | |
61 | : process_group(process_group, attach_distributed_object()), | |
62 | owner(owner), | |
63 | pred(pred), | |
64 | polling(polling) | |
65 | { | |
66 | if (!polling) | |
67 | outgoing_buffers.reset( | |
68 | new outgoing_buffers_t(num_processes(process_group))); | |
69 | ||
70 | setup_triggers(); | |
71 | } | |
72 | ||
73 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
74 | void | |
75 | BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x) | |
76 | { | |
77 | typename ProcessGroup::process_id_type dest = get(owner, x); | |
78 | if (outgoing_buffers) | |
79 | outgoing_buffers->at(dest).push_back(x); | |
80 | else if (dest == process_id(process_group)) | |
81 | buffer.push(x); | |
82 | else | |
83 | send(process_group, get(owner, x), msg_push, x); | |
84 | } | |
85 | ||
86 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
87 | bool | |
88 | BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const | |
89 | { | |
90 | /* Processes will stay here until the buffer is nonempty or | |
91 | synchronization with the other processes indicates that all local | |
92 | buffers are empty (and no messages are in transit). | |
93 | */ | |
94 | while (buffer.empty() && !do_synchronize()) ; | |
95 | ||
96 | return buffer.empty(); | |
97 | } | |
98 | ||
99 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
100 | typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type | |
101 | BOOST_DISTRIBUTED_QUEUE_TYPE::size() const | |
102 | { | |
103 | empty(); | |
104 | return buffer.size(); | |
105 | } | |
106 | ||
107 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
108 | void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers() | |
109 | { | |
110 | using boost::graph::parallel::simple_trigger; | |
111 | ||
112 | simple_trigger(process_group, msg_push, this, | |
113 | &distributed_queue::handle_push); | |
114 | simple_trigger(process_group, msg_multipush, this, | |
115 | &distributed_queue::handle_multipush); | |
116 | } | |
117 | ||
118 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
119 | void | |
120 | BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
121 | handle_push(int /*source*/, int /*tag*/, const value_type& value, | |
122 | trigger_receive_context) | |
123 | { | |
124 | if (pred(value)) buffer.push(value); | |
125 | } | |
126 | ||
127 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
128 | void | |
129 | BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
130 | handle_multipush(int /*source*/, int /*tag*/, | |
131 | const std::vector<value_type>& values, | |
132 | trigger_receive_context) | |
133 | { | |
134 | for (std::size_t i = 0; i < values.size(); ++i) | |
135 | if (pred(values[i])) buffer.push(values[i]); | |
136 | } | |
137 | ||
138 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
139 | bool | |
140 | BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const | |
141 | { | |
142 | #ifdef PBGL_ACCOUNTING | |
143 | ++num_synchronizations; | |
144 | #endif | |
145 | ||
146 | using boost::parallel::all_reduce; | |
147 | using std::swap; | |
148 | ||
149 | typedef typename ProcessGroup::process_id_type process_id_type; | |
150 | ||
151 | if (outgoing_buffers) { | |
152 | // Transfer all of the push requests | |
153 | process_id_type id = process_id(process_group); | |
154 | process_id_type np = num_processes(process_group); | |
155 | for (process_id_type dest = 0; dest < np; ++dest) { | |
156 | outgoing_buffer_t& outgoing = outgoing_buffers->at(dest); | |
157 | std::size_t size = outgoing.size(); | |
158 | if (size != 0) { | |
159 | if (dest != id) { | |
160 | send(process_group, dest, msg_multipush, outgoing); | |
161 | } else { | |
162 | for (std::size_t i = 0; i < size; ++i) | |
163 | buffer.push(outgoing[i]); | |
164 | } | |
165 | outgoing.clear(); | |
166 | } | |
167 | } | |
168 | } | |
169 | synchronize(process_group); | |
170 | ||
171 | unsigned local_size = buffer.size(); | |
172 | unsigned global_size = | |
173 | all_reduce(process_group, local_size, std::plus<unsigned>()); | |
174 | return global_size == 0; | |
175 | } | |
176 | ||
177 | } } } // end namespace boost::graph::distributed |