]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright (C) 2004-2006 The Trustees of Indiana University. |
2 | ||
3 | // Use, modification and distribution is subject to the Boost Software | |
4 | // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
5 | // http://www.boost.org/LICENSE_1_0.txt) | |
6 | ||
7 | // Authors: Douglas Gregor | |
8 | // Andrew Lumsdaine | |
9 | #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP | |
10 | #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP | |
11 | ||
12 | #ifndef BOOST_GRAPH_USE_MPI | |
13 | #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" | |
14 | #endif | |
15 | ||
16 | #include <boost/graph/parallel/process_group.hpp> | |
17 | #include <boost/optional.hpp> | |
18 | #include <boost/shared_ptr.hpp> | |
19 | #include <vector> | |
20 | ||
21 | namespace boost { namespace graph { namespace distributed { | |
22 | ||
23 | /// A unary predicate that always returns "true". | |
24 | struct always_push | |
25 | { | |
26 | template<typename T> bool operator()(const T&) const { return true; } | |
27 | }; | |
28 | ||
29 | ||
30 | ||
31 | /** A distributed queue adaptor. | |
32 | * | |
33 | * Class template @c distributed_queue implements a distributed queue | |
34 | * across a process group. The distributed queue is an adaptor over an | |
35 | * existing (local) queue, which must model the @ref Buffer | |
36 | * concept. Each process stores a distinct copy of the local queue, | |
37 | * from which it draws or removes elements via the @ref pop and @ref | |
38 | * top members. | |
39 | * | |
40 | * The value type of the local queue must be a model of the @ref | |
41 | * GlobalDescriptor concept. The @ref push operation of the | |
42 | * distributed queue passes (via a message) the value to its owning | |
43 | * processor. Thus, the elements within a particular local queue are | |
44 | * guaranteed to have the process owning that local queue as an owner. | |
45 | * | |
46 | * Synchronization of distributed queues occurs in the @ref empty and | |
47 | * @ref size functions, which will only return "empty" values (true or | |
48 | * 0, respectively) when the entire distributed queue is empty. If the | |
49 | * local queue is empty but the distributed queue is not, the | |
50 | * operation will block until either condition changes. When the @ref | |
51 | * size function of a nonempty queue returns, it returns the size of | |
52 | * the local queue. These semantics were selected so that sequential | |
53 | * code that processes elements in the queue via the following idiom | |
54 | * can be parallelized via introduction of a distributed queue: | |
55 | * | |
56 | * distributed_queue<...> Q; | |
57 | * Q.push(x); | |
58 | * while (!Q.empty()) { | |
59 | * // do something, that may push a value onto Q | |
60 | * } | |
61 | * | |
62 | * In the parallel version, the initial @ref push operation will place | |
63 | * the value @c x onto its owner's queue. All processes will | |
64 | * synchronize at the call to empty, and only the process owning @c x | |
65 | * will be allowed to execute the loop (@ref Q.empty() returns | |
66 | * false). This iteration may in turn push values onto other remote | |
67 | * queues, so when that process finishes execution of the loop body | |
68 | * and all processes synchronize again in @ref empty, more processes | |
69 | * may have nonempty local queues to execute. Once all local queues | |
70 | * are empty, @ref Q.empty() returns @c false for all processes. | |
71 | * | |
72 | * The distributed queue can receive messages at two different times: | |
73 | * during synchronization and when polling @ref empty. Messages are | |
74 | * always received during synchronization, to ensure that accurate | |
75 | * local queue sizes can be determines. However, whether @ref empty | |
76 | * should poll for messages is specified as an option to the | |
77 | * constructor. Polling may be desired when the order in which | |
78 | * elements in the queue are processed is not important, because it | |
79 | * permits fewer synchronization steps and less communication | |
80 | * overhead. However, when more strict ordering guarantees are | |
81 | * required, polling may be semantically incorrect. By disabling | |
82 | * polling, one ensures that parallel execution using the idiom above | |
83 | * will not process an element at a later "level" before an earlier | |
84 | * "level". | |
85 | * | |
86 | * The distributed queue nearly models the @ref Buffer | |
87 | * concept. However, the @ref push routine does not necessarily | |
88 | * increase the result of @c size() by one (although the size of the | |
89 | * global queue does increase by one). | |
90 | */ | |
91 | template<typename ProcessGroup, typename OwnerMap, typename Buffer, | |
92 | typename UnaryPredicate = always_push> | |
93 | class distributed_queue | |
94 | { | |
95 | typedef distributed_queue self_type; | |
96 | ||
97 | enum { | |
98 | /** Message indicating a remote push. The message contains a | |
99 | * single value x of type value_type that is to be pushed on the | |
100 | * receiver's queue. | |
101 | */ | |
102 | msg_push, | |
103 | /** Push many elements at once. */ | |
104 | msg_multipush | |
105 | }; | |
106 | ||
107 | public: | |
108 | typedef ProcessGroup process_group_type; | |
109 | typedef Buffer buffer_type; | |
110 | typedef typename buffer_type::value_type value_type; | |
111 | typedef typename buffer_type::size_type size_type; | |
112 | ||
113 | /** Construct a new distributed queue. | |
114 | * | |
115 | * Build a new distributed queue that communicates over the given @p | |
116 | * process_group, whose local queue is initialized via @p buffer and | |
117 | * which may or may not poll for messages. | |
118 | */ | |
119 | explicit | |
120 | distributed_queue(const ProcessGroup& process_group, | |
121 | const OwnerMap& owner, | |
122 | const Buffer& buffer, | |
123 | bool polling = false); | |
124 | ||
125 | /** Construct a new distributed queue. | |
126 | * | |
127 | * Build a new distributed queue that communicates over the given @p | |
128 | * process_group, whose local queue is initialized via @p buffer and | |
129 | * which may or may not poll for messages. | |
130 | */ | |
131 | explicit | |
132 | distributed_queue(const ProcessGroup& process_group = ProcessGroup(), | |
133 | const OwnerMap& owner = OwnerMap(), | |
134 | const Buffer& buffer = Buffer(), | |
135 | const UnaryPredicate& pred = UnaryPredicate(), | |
136 | bool polling = false); | |
137 | ||
138 | /** Construct a new distributed queue. | |
139 | * | |
140 | * Build a new distributed queue that communicates over the given @p | |
141 | * process_group, whose local queue is default-initalized and which | |
142 | * may or may not poll for messages. | |
143 | */ | |
144 | distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
145 | const UnaryPredicate& pred, bool polling = false); | |
146 | ||
147 | /** Virtual destructor required with virtual functions. | |
148 | * | |
149 | */ | |
150 | virtual ~distributed_queue() {} | |
151 | ||
152 | /** Push an element onto the distributed queue. | |
153 | * | |
154 | * The element will be sent to its owner process to be added to that | |
155 | * process's local queue. If polling is enabled for this queue and | |
156 | * the owner process is the current process, the value will be | |
157 | * immediately pushed onto the local queue. | |
158 | * | |
159 | * Complexity: O(1) messages of size O(sizeof(value_type)) will be | |
160 | * transmitted. | |
161 | */ | |
162 | void push(const value_type& x); | |
163 | ||
164 | /** Pop an element off the local queue. | |
165 | * | |
166 | * @p @c !empty() | |
167 | */ | |
168 | void pop() { buffer.pop(); } | |
169 | ||
170 | /** | |
171 | * Return the element at the top of the local queue. | |
172 | * | |
173 | * @p @c !empty() | |
174 | */ | |
175 | value_type& top() { return buffer.top(); } | |
176 | ||
177 | /** | |
178 | * \overload | |
179 | */ | |
180 | const value_type& top() const { return buffer.top(); } | |
181 | ||
182 | /** Determine if the queue is empty. | |
183 | * | |
184 | * When the local queue is nonempty, returns @c true. If the local | |
185 | * queue is empty, synchronizes with all other processes in the | |
186 | * process group until either (1) the local queue is nonempty | |
187 | * (returns @c true) (2) the entire distributed queue is empty | |
188 | * (returns @c false). | |
189 | */ | |
190 | bool empty() const; | |
191 | ||
192 | /** Determine the size of the local queue. | |
193 | * | |
194 | * The behavior of this routine is equivalent to the behavior of | |
195 | * @ref empty, except that when @ref empty returns true this | |
196 | * function returns the size of the local queue and when @ref empty | |
197 | * returns false this function returns zero. | |
198 | */ | |
199 | size_type size() const; | |
200 | ||
201 | // private: | |
202 | /** Synchronize the distributed queue and determine if all queues | |
203 | * are empty. | |
204 | * | |
205 | * \returns \c true when all local queues are empty, or false if at least | |
206 | * one of the local queues is nonempty. | |
207 | * Defined as virtual for derived classes like depth_limited_distributed_queue. | |
208 | */ | |
209 | virtual bool do_synchronize() const; | |
210 | ||
211 | private: | |
212 | // Setup triggers | |
213 | void setup_triggers(); | |
214 | ||
215 | // Message handlers | |
216 | void | |
217 | handle_push(int source, int tag, const value_type& value, | |
218 | trigger_receive_context); | |
219 | ||
220 | void | |
221 | handle_multipush(int source, int tag, const std::vector<value_type>& values, | |
222 | trigger_receive_context); | |
223 | ||
224 | mutable ProcessGroup process_group; | |
225 | OwnerMap owner; | |
226 | mutable Buffer buffer; | |
227 | UnaryPredicate pred; | |
228 | bool polling; | |
229 | ||
230 | typedef std::vector<value_type> outgoing_buffer_t; | |
231 | typedef std::vector<outgoing_buffer_t> outgoing_buffers_t; | |
232 | shared_ptr<outgoing_buffers_t> outgoing_buffers; | |
233 | }; | |
234 | ||
235 | /// Helper macro containing the normal names for the template | |
236 | /// parameters to distributed_queue. | |
237 | #define BOOST_DISTRIBUTED_QUEUE_PARMS \ | |
238 | typename ProcessGroup, typename OwnerMap, typename Buffer, \ | |
239 | typename UnaryPredicate | |
240 | ||
241 | /// Helper macro containing the normal template-id for | |
242 | /// distributed_queue. | |
243 | #define BOOST_DISTRIBUTED_QUEUE_TYPE \ | |
244 | distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate> | |
245 | ||
246 | /** Synchronize all processes involved with the given distributed queue. | |
247 | * | |
248 | * This function will synchronize all of the local queues for a given | |
249 | * distributed queue, by ensuring that no additional messages are in | |
250 | * transit. It is rarely required by the user, because most | |
251 | * synchronization of distributed queues occurs via the @c empty or @c | |
252 | * size methods. | |
253 | */ | |
254 | template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
255 | inline void | |
256 | synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q) | |
257 | { Q.do_synchronize(); } | |
258 | ||
259 | /// Construct a new distributed queue. | |
260 | template<typename ProcessGroup, typename OwnerMap, typename Buffer> | |
261 | inline distributed_queue<ProcessGroup, OwnerMap, Buffer> | |
262 | make_distributed_queue(const ProcessGroup& process_group, | |
263 | const OwnerMap& owner, | |
264 | const Buffer& buffer, | |
265 | bool polling = false) | |
266 | { | |
267 | typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type; | |
268 | return result_type(process_group, owner, buffer, polling); | |
269 | } | |
270 | ||
271 | } } } // end namespace boost::graph::distributed | |
272 | ||
273 | #include <boost/graph/distributed/detail/queue.ipp> | |
274 | ||
275 | #undef BOOST_DISTRIBUTED_QUEUE_TYPE | |
276 | #undef BOOST_DISTRIBUTED_QUEUE_PARMS | |
277 | ||
278 | #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP |